Skip to content

Commit

Permalink
Refactor tokens to submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
felix-hilden committed Aug 25, 2020
1 parent 7414f97 commit 8e44b8a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 109 deletions.
3 changes: 2 additions & 1 deletion tekore/_auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .expiring import Credentials, Token, AccessToken
from .expiring import Credentials
from .refreshing import RefreshingCredentials, RefreshingToken
from .scope import scope, Scope
from .token import Token, AccessToken
from .util import (
parse_code_from_url,
refresh_user_token,
Expand Down
96 changes: 1 addition & 95 deletions tekore/_auth/expiring.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import time

from abc import ABC, abstractmethod
from base64 import b64encode as _b64encode
from typing import Union

from urllib.parse import urlencode

from .token import Token
from .scope import Scope
from .._error import get_error
from .._sender import Sender, Client, send_and_process, Request, Response
Expand All @@ -19,97 +16,6 @@ def b64encode(msg: str) -> str:
return _b64encode(msg.encode()).decode()


class AccessToken(ABC):
"""Access token base class."""

@property
@abstractmethod
def access_token(self) -> str:
"""
Bearer token value.
Used as the string representation of the instance.
"""
raise NotImplementedError

def __str__(self):
"""Bearer token value."""
return self.access_token


class Token(AccessToken):
"""
Expiring access token.
Represents both client and user tokens.
The refresh token of a client token is ``None``.
"""

def __init__(self, token_info: dict):
self._access_token = token_info['access_token']
self._token_type = token_info['token_type']

self._scope = Scope(*token_info['scope'].split(' '))
if str(self._scope) == '':
self._scope = Scope()

self._refresh_token = token_info.get('refresh_token', None)
self._expires_at = int(time.time()) + token_info['expires_in']

def __repr__(self):
options = [
f'access_token={self.access_token!r}',
f'refresh_token={self.refresh_token!r}',
f'expires_at={self.expires_at!r}',
f'scope={self.scope!r}',
]
return type(self).__name__ + '(' + ', '.join(options) + ')'

@property
def access_token(self) -> str:
"""Bearer token value."""
return self._access_token

@property
def refresh_token(self) -> Union[str, None]:
"""
Refresh token for generating new access tokens.
``None`` if the token is an application token.
"""
return self._refresh_token

@property
def token_type(self) -> str:
"""How the token may be used, always 'Bearer'."""
return self._token_type

@property
def scope(self) -> Scope:
"""
Privileges granted to the token.
Empty :class:`Scope` if the token is an application token
or a user token without any scopes.
"""
return self._scope

@property
def expires_in(self) -> int:
"""Seconds until token expiration."""
return self.expires_at - int(time.time())

@property
def expires_at(self) -> int:
"""When the token expires."""
return self._expires_at

@property
def is_expiring(self) -> bool:
"""Determine whether token is about to expire."""
return self.expires_in < 60


def handle_errors(request: Request, response: Response) -> None:
"""Examine response and raise errors accordingly."""
if response.status_code < 400:
Expand Down
5 changes: 3 additions & 2 deletions tekore/_auth/refreshing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Union

from .expiring import Credentials
from .token import AccessToken, Token
from .scope import Scope
from .expiring import AccessToken, Token, Credentials
from tekore._sender import Sender
from .._sender import Sender


class RefreshingToken(AccessToken):
Expand Down
97 changes: 97 additions & 0 deletions tekore/_auth/token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import time

from abc import ABC, abstractmethod
from typing import Union

from .scope import Scope


class AccessToken(ABC):
"""Access token base class."""

@property
@abstractmethod
def access_token(self) -> str:
"""
Bearer token value.
Used as the string representation of the instance.
"""
raise NotImplementedError

def __str__(self):
"""Bearer token value."""
return self.access_token


class Token(AccessToken):
"""
Expiring access token.
Represents both client and user tokens.
The refresh token of a client token is ``None``.
"""

def __init__(self, token_info: dict):
self._access_token = token_info['access_token']
self._token_type = token_info['token_type']

self._scope = Scope(*token_info['scope'].split(' '))
if str(self._scope) == '':
self._scope = Scope()

self._refresh_token = token_info.get('refresh_token', None)
self._expires_at = int(time.time()) + token_info['expires_in']

def __repr__(self):
options = [
f'access_token={self.access_token!r}',
f'refresh_token={self.refresh_token!r}',
f'expires_at={self.expires_at!r}',
f'scope={self.scope!r}',
]
return type(self).__name__ + '(' + ', '.join(options) + ')'

@property
def access_token(self) -> str:
"""Bearer token value."""
return self._access_token

@property
def refresh_token(self) -> Union[str, None]:
"""
Refresh token for generating new access tokens.
``None`` if the token is an application token.
"""
return self._refresh_token

@property
def token_type(self) -> str:
"""How the token may be used, always 'Bearer'."""
return self._token_type

@property
def scope(self) -> Scope:
"""
Privileges granted to the token.
Empty :class:`Scope` if the token is an application token
or a user token without any scopes.
"""
return self._scope

@property
def expires_in(self) -> int:
"""Seconds until token expiration."""
return self.expires_at - int(time.time())

@property
def expires_at(self) -> int:
"""When the token expires."""
return self._expires_at

@property
def is_expiring(self) -> bool:
"""Determine whether token is about to expire."""
return self.expires_in < 60
25 changes: 14 additions & 11 deletions tests/auth/expiring.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def make_token_dict():
}


module = 'tekore._auth.expiring'
time_module = 'tekore._auth.token.time'


class TestToken:
Expand All @@ -41,31 +41,31 @@ def test_access_token_returned(self):
time = MagicMock()
time.time.return_value = 0

with patch(module + '.time', time):
with patch(time_module, time):
token = Token(make_token_dict())
assert token.access_token == 'accesstoken'

def test_expires_in_set_time(self):
time = MagicMock()
time.time.return_value = 0

with patch(module + '.time', time):
with patch(time_module, time):
token = Token(make_token_dict())
assert token.expires_in == 3600

def test_expires_in_is_refreshed(self):
time = MagicMock()
time.time.side_effect = [0, 1]

with patch(module + '.time', time):
with patch(time_module, time):
token = Token(make_token_dict())
assert token.expires_in == 3599

def test_old_token_is_expiring(self):
time = MagicMock()
time.time.side_effect = [0, 3600]

with patch(module + '.time', time):
with patch(time_module, time):
token = Token(make_token_dict())
assert token.is_expiring is True

Expand Down Expand Up @@ -129,6 +129,9 @@ def test_bad_arguments_raises_error(self):
c.request_client_token()


cred_module = 'tekore._auth.expiring.Credentials'


class TestCredentialsOffline:
def test_repr(self):
c = Credentials('id', 'secret')
Expand All @@ -144,7 +147,7 @@ def test_server_error_raises_http_error(self):
c = Credentials('id', 'secret')

send = MagicMock(return_value=mock_response(500))
with patch(module + '.Credentials.send', send):
with patch(cred_module + '.send', send):
with pytest.raises(HTTPError):
c.request_client_token()

Expand All @@ -163,7 +166,7 @@ def test_user_authorisation_url_accepts_scope_list(self):
def test_request_user_token(self):
c = Credentials('id', 'secret', 'uri')
send = MagicMock(return_value=mock_response())
with patch(module + '.Credentials.send', send):
with patch(cred_module + '.send', send):
c.request_user_token('code')
send.assert_called_once()

Expand All @@ -174,7 +177,7 @@ def test_refresh_user_token_uses_old_refresh_if_not_returned(self):
response = mock_response(content=token)

send = MagicMock(return_value=response)
with patch(module + '.Credentials.send', send):
with patch(cred_module + '.send', send):
refreshed = c.refresh_user_token('refresh')
assert refreshed.refresh_token == 'refresh'

Expand All @@ -184,7 +187,7 @@ def test_refresh_user_token_refresh_replaced_if_returned(self):
response = mock_response(content=token)

send = MagicMock(return_value=response)
with patch(module + '.Credentials.send', send):
with patch(cred_module + '.send', send):
refreshed = c.refresh_user_token('refresh')
assert refreshed.refresh_token == token['refresh_token']

Expand All @@ -194,7 +197,7 @@ def test_refresh_none_refresh_interpreted_as_client_token(self):
token.refresh_token = None

mock = MagicMock()
with patch(module + '.Credentials.request_client_token', mock):
with patch(cred_module + '.request_client_token', mock):
c.refresh(token)
mock.assert_called_once()

Expand All @@ -204,6 +207,6 @@ def test_refresh_valid_refresh_interpreted_as_user_token(self):
token.refresh_token = 'refresh'

mock = MagicMock()
with patch(module + '.Credentials.refresh_user_token', mock):
with patch(cred_module + '.refresh_user_token', mock):
c.refresh(token)
mock.assert_called_once()

0 comments on commit 8e44b8a

Please sign in to comment.