diff --git a/Makefile b/Makefile index e3d2accf..e7e07a37 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ coverage: pipenv run py.test -s --verbose --cov-report term-missing --cov-report xml --cov=simplipy tests init: - pip install --upgrade pip pipenv + pip install pip==18.0 pipenv pipenv lock pipenv install --dev lint: diff --git a/README.md b/README.md index f7b817d9..c122c934 100644 --- a/README.md +++ b/README.md @@ -351,6 +351,36 @@ asyncio.get_event_loop().run_until_complete(main()) # Refreshing the Access Token +## General Notes + +During usage, `simplipy` will automatically refresh the access token as needed. +At any point, the "dirtiness" of the token can be checked: + +```python +from simplipy import API + + +async def main() -> None: + """Create the aiohttp session and run.""" + async with ClientSession() as websession: + simplisafe = API.login_via_token("", websession) + systems = await simplisafe.get_systems() + primary_system = systems[0] + + # Assuming the access token was automatically refreshed: + primary_system.api.refresh_token_dirty + # >>> True + + # Once the dirtiness is confirmed, the dirty bit resets: + primary_system.api.refresh_token_dirty + # >>> False + + +asyncio.get_event_loop().run_until_complete(main()) +``` + +## Restarting with a Refresh Token + It may be desirable to re-authenticate against the SimpliSafeā„¢ API at some point in the future (and without using a user's email and password). In that case, it is recommended that you save the `refresh_token` property somewhere; diff --git a/simplipy/api.py b/simplipy/api.py index f52ca95e..e429c7fa 100644 --- a/simplipy/api.py +++ b/simplipy/api.py @@ -1,5 +1,6 @@ """Define a SimpliSafe account.""" -# pylint: disable=import-error,protected-access,unused-import +# pylint: disable=import-error,protected-access,too-many-instance-attributes +# pylint: disable=unused-import from datetime import datetime, timedelta from typing import List, Type, TypeVar, Union # noqa @@ -26,14 +27,32 @@ class API: def __init__(self, websession: ClientSession) -> None: """Initialize.""" - self._access_token = None + self._access_token = '' self._access_token_expire = None # type: Union[None, datetime] self._actively_refreshing = False self._email = None # type: Union[None, str] self._websession = websession - self.refresh_token = None + self._refresh_token = '' + self.refresh_token_dirty = False self.user_id = None + @property + def refresh_token(self) -> str: + """Return the current refresh_token.""" + if self.refresh_token_dirty: + self.refresh_token_dirty = False + + return self._refresh_token + + @refresh_token.setter + def refresh_token(self, value: str) -> None: + """Set the refresh token if it has changed.""" + if value == self._refresh_token: + return + + self._refresh_token = value + self.refresh_token_dirty = True + @classmethod async def login_via_credentials( cls: Type[ApiType], email: str, password: str, @@ -121,8 +140,7 @@ async def request( and datetime.now() >= self._access_token_expire and not self._actively_refreshing): self._actively_refreshing = True - await self._refresh_access_token( # type: ignore - self.refresh_token) + await self._refresh_access_token(self._refresh_token) url = '{0}/{1}'.format(URL_BASE, endpoint) diff --git a/tests/test_system.py b/tests/test_system.py index ee33f3ab..d5ff40d9 100644 --- a/tests/test_system.py +++ b/tests/test_system.py @@ -40,7 +40,7 @@ async def test_bad_request(event_loop, v2_server): @pytest.mark.asyncio async def test_expired_token_refresh( api_token_json, auth_check_json, event_loop, v2_server): - """Test that the correct exception is raised when the token is expired.""" + """Test that a refresh token is used correctly.""" async with v2_server: v2_server.add( 'api.simplisafe.com', '/v1/api/token', 'post', @@ -61,6 +61,34 @@ async def test_expired_token_refresh( await system.api.request('get', 'api/authCheck') +@pytest.mark.asyncio +async def test_refresh_token_dirtiness( + api_token_json, auth_check_json, event_loop, v2_server): + """Test that the refresh token's dirtiness can be checked.""" + async with v2_server: + v2_server.add( + 'api.simplisafe.com', '/v1/api/token', 'post', + aresponses.Response(text=json.dumps(api_token_json), status=200)) + v2_server.add( + 'api.simplisafe.com', '/v1/api/authCheck', 'get', + aresponses.Response(text=json.dumps(auth_check_json), status=200)) + v2_server.add( + 'api.simplisafe.com', '/v1/api/authCheck', 'get', + aresponses.Response(text=json.dumps(auth_check_json), status=200)) + + async with aiohttp.ClientSession(loop=event_loop) as websession: + api = await API.login_via_credentials( + TEST_EMAIL, TEST_PASSWORD, websession) + [system] = await api.get_systems() + system.api._access_token_expire = datetime.now() - timedelta( + hours=1) + await system.api.request('get', 'api/authCheck') + + assert system.api.refresh_token_dirty + assert system.api.refresh_token == TEST_REFRESH_TOKEN + assert not system.api.refresh_token_dirty + + @pytest.mark.asyncio async def test_get_events(events_json, event_loop, v2_server): """Test getting events from a system.""" @@ -76,6 +104,7 @@ async def test_get_events(events_json, event_loop, v2_server): [system] = await api.get_systems() events = await system.get_events(1534725051, 2) + assert len(events) == 2 @@ -109,9 +138,11 @@ async def test_get_systems_v2( credentials_api = await API.login_via_credentials( TEST_EMAIL, TEST_PASSWORD, websession) systems = await credentials_api.get_systems() + assert len(systems) == 1 primary_system = systems[0] + assert primary_system.serial == TEST_SYSTEM_SERIAL_NO assert primary_system.system_id == TEST_SYSTEM_ID assert primary_system.api._access_token == TEST_ACCESS_TOKEN @@ -120,9 +151,11 @@ async def test_get_systems_v2( token_api = await API.login_via_token( TEST_REFRESH_TOKEN, websession) systems = await token_api.get_systems() + assert len(systems) == 1 primary_system = systems[0] + assert primary_system.serial == TEST_SYSTEM_SERIAL_NO assert primary_system.system_id == TEST_SYSTEM_ID assert primary_system.api._access_token == TEST_ACCESS_TOKEN @@ -159,9 +192,11 @@ async def test_get_systems_v3( credentials_api = await API.login_via_credentials( TEST_EMAIL, TEST_PASSWORD, websession) systems = await credentials_api.get_systems() + assert len(systems) == 1 primary_system = systems[0] + assert primary_system.serial == TEST_SYSTEM_SERIAL_NO assert primary_system.system_id == TEST_SYSTEM_ID assert primary_system.api._access_token == TEST_ACCESS_TOKEN @@ -170,9 +205,11 @@ async def test_get_systems_v3( token_api = await API.login_via_token( TEST_REFRESH_TOKEN, websession) systems = await token_api.get_systems() + assert len(systems) == 1 primary_system = systems[0] + assert primary_system.serial == TEST_SYSTEM_SERIAL_NO assert primary_system.system_id == TEST_SYSTEM_ID assert primary_system.api._access_token == TEST_ACCESS_TOKEN @@ -187,6 +224,7 @@ async def test_properties_base(event_loop, v2_server): api = await API.login_via_credentials( TEST_EMAIL, TEST_PASSWORD, websession) [system] = await api.get_systems() + assert system.address == TEST_ADDRESS assert not system.alarm_going_off assert system.serial == TEST_SYSTEM_SERIAL_NO @@ -232,15 +270,19 @@ async def test_set_states_v2( [system] = await api.get_systems() await system.set_away() + assert system.state == system.SystemStates.away await system.set_home() + assert system.state == system.SystemStates.home await system.set_off() + assert system.state == system.SystemStates.off await system.set_off() + assert system.state == system.SystemStates.off @@ -280,15 +322,19 @@ async def test_set_states_v3( [system] = await api.get_systems() await system.set_away() + assert system.state == system.SystemStates.away await system.set_home() + assert system.state == system.SystemStates.home await system.set_off() + assert system.state == system.SystemStates.off await system.set_off() + assert system.state == system.SystemStates.off @@ -306,12 +352,13 @@ async def test_unknown_initial_state(caplog, event_loop): @pytest.mark.asyncio async def test_unknown_sensor_type(caplog, event_loop, v2_server): - """Test getting a new access token from a refresh token.""" + """Test whether a message is logged upon finding an unknown sensor type.""" async with v2_server: async with aiohttp.ClientSession(loop=event_loop) as websession: api = await API.login_via_credentials( TEST_EMAIL, TEST_PASSWORD, websession) _ = await api.get_systems() # noqa + assert any('Unknown' in e.message for e in caplog.records) @@ -337,6 +384,7 @@ async def test_update_system_data_v2( [system] = await api.get_systems() await system.update() + assert system.serial == TEST_SYSTEM_SERIAL_NO assert system.system_id == TEST_SYSTEM_ID assert system.api._access_token == TEST_ACCESS_TOKEN @@ -365,6 +413,7 @@ async def test_update_system_data_v3( [system] = await api.get_systems() await system.update() + assert system.serial == TEST_SYSTEM_SERIAL_NO assert system.system_id == TEST_SYSTEM_ID assert system.api._access_token == TEST_ACCESS_TOKEN