Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
coverage:
pipenv run py.test -s --verbose --cov-report term-missing --cov-report xml --cov=simplipy tests
init:
pip install --upgrade pip pipenv
pip install pip==18.0 pipenv
pipenv lock
pipenv install --dev
lint:
Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,36 @@ asyncio.get_event_loop().run_until_complete(main())

# Refreshing the Access Token

## General Notes

During usage, `simplipy` will automatically refresh the access token as needed.
At any point, the "dirtiness" of the token can be checked:

```python
from simplipy import API


async def main() -> None:
"""Create the aiohttp session and run."""
async with ClientSession() as websession:
simplisafe = API.login_via_token("<REFRESH TOKEN>", websession)
systems = await simplisafe.get_systems()
primary_system = systems[0]

# Assuming the access token was automatically refreshed:
primary_system.api.refresh_token_dirty
# >>> True

# Once the dirtiness is confirmed, the dirty bit resets:
primary_system.api.refresh_token_dirty
# >>> False


asyncio.get_event_loop().run_until_complete(main())
```

## Restarting with a Refresh Token

It may be desirable to re-authenticate against the SimpliSafe™ API at some
point in the future (and without using a user's email and password). In that
case, it is recommended that you save the `refresh_token` property somewhere;
Expand Down
28 changes: 23 additions & 5 deletions simplipy/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Define a SimpliSafe account."""
# pylint: disable=import-error,protected-access,unused-import
# pylint: disable=import-error,protected-access,too-many-instance-attributes
# pylint: disable=unused-import

from datetime import datetime, timedelta
from typing import List, Type, TypeVar, Union # noqa
Expand All @@ -26,14 +27,32 @@ class API:

def __init__(self, websession: ClientSession) -> None:
"""Initialize."""
self._access_token = None
self._access_token = ''
self._access_token_expire = None # type: Union[None, datetime]
self._actively_refreshing = False
self._email = None # type: Union[None, str]
self._websession = websession
self.refresh_token = None
self._refresh_token = ''
self.refresh_token_dirty = False
self.user_id = None

@property
def refresh_token(self) -> str:
"""Return the current refresh_token."""
if self.refresh_token_dirty:
self.refresh_token_dirty = False

return self._refresh_token

@refresh_token.setter
def refresh_token(self, value: str) -> None:
"""Set the refresh token if it has changed."""
if value == self._refresh_token:
return

self._refresh_token = value
self.refresh_token_dirty = True

@classmethod
async def login_via_credentials(
cls: Type[ApiType], email: str, password: str,
Expand Down Expand Up @@ -121,8 +140,7 @@ async def request(
and datetime.now() >= self._access_token_expire
and not self._actively_refreshing):
self._actively_refreshing = True
await self._refresh_access_token( # type: ignore
self.refresh_token)
await self._refresh_access_token(self._refresh_token)

url = '{0}/{1}'.format(URL_BASE, endpoint)

Expand Down
53 changes: 51 additions & 2 deletions tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def test_bad_request(event_loop, v2_server):
@pytest.mark.asyncio
async def test_expired_token_refresh(
api_token_json, auth_check_json, event_loop, v2_server):
"""Test that the correct exception is raised when the token is expired."""
"""Test that a refresh token is used correctly."""
async with v2_server:
v2_server.add(
'api.simplisafe.com', '/v1/api/token', 'post',
Expand All @@ -61,6 +61,34 @@ async def test_expired_token_refresh(
await system.api.request('get', 'api/authCheck')


@pytest.mark.asyncio
async def test_refresh_token_dirtiness(
api_token_json, auth_check_json, event_loop, v2_server):
"""Test that the refresh token's dirtiness can be checked."""
async with v2_server:
v2_server.add(
'api.simplisafe.com', '/v1/api/token', 'post',
aresponses.Response(text=json.dumps(api_token_json), status=200))
v2_server.add(
'api.simplisafe.com', '/v1/api/authCheck', 'get',
aresponses.Response(text=json.dumps(auth_check_json), status=200))
v2_server.add(
'api.simplisafe.com', '/v1/api/authCheck', 'get',
aresponses.Response(text=json.dumps(auth_check_json), status=200))

async with aiohttp.ClientSession(loop=event_loop) as websession:
api = await API.login_via_credentials(
TEST_EMAIL, TEST_PASSWORD, websession)
[system] = await api.get_systems()
system.api._access_token_expire = datetime.now() - timedelta(
hours=1)
await system.api.request('get', 'api/authCheck')

assert system.api.refresh_token_dirty
assert system.api.refresh_token == TEST_REFRESH_TOKEN
assert not system.api.refresh_token_dirty


@pytest.mark.asyncio
async def test_get_events(events_json, event_loop, v2_server):
"""Test getting events from a system."""
Expand All @@ -76,6 +104,7 @@ async def test_get_events(events_json, event_loop, v2_server):
[system] = await api.get_systems()

events = await system.get_events(1534725051, 2)

assert len(events) == 2


Expand Down Expand Up @@ -109,9 +138,11 @@ async def test_get_systems_v2(
credentials_api = await API.login_via_credentials(
TEST_EMAIL, TEST_PASSWORD, websession)
systems = await credentials_api.get_systems()

assert len(systems) == 1

primary_system = systems[0]

assert primary_system.serial == TEST_SYSTEM_SERIAL_NO
assert primary_system.system_id == TEST_SYSTEM_ID
assert primary_system.api._access_token == TEST_ACCESS_TOKEN
Expand All @@ -120,9 +151,11 @@ async def test_get_systems_v2(
token_api = await API.login_via_token(
TEST_REFRESH_TOKEN, websession)
systems = await token_api.get_systems()

assert len(systems) == 1

primary_system = systems[0]

assert primary_system.serial == TEST_SYSTEM_SERIAL_NO
assert primary_system.system_id == TEST_SYSTEM_ID
assert primary_system.api._access_token == TEST_ACCESS_TOKEN
Expand Down Expand Up @@ -159,9 +192,11 @@ async def test_get_systems_v3(
credentials_api = await API.login_via_credentials(
TEST_EMAIL, TEST_PASSWORD, websession)
systems = await credentials_api.get_systems()

assert len(systems) == 1

primary_system = systems[0]

assert primary_system.serial == TEST_SYSTEM_SERIAL_NO
assert primary_system.system_id == TEST_SYSTEM_ID
assert primary_system.api._access_token == TEST_ACCESS_TOKEN
Expand All @@ -170,9 +205,11 @@ async def test_get_systems_v3(
token_api = await API.login_via_token(
TEST_REFRESH_TOKEN, websession)
systems = await token_api.get_systems()

assert len(systems) == 1

primary_system = systems[0]

assert primary_system.serial == TEST_SYSTEM_SERIAL_NO
assert primary_system.system_id == TEST_SYSTEM_ID
assert primary_system.api._access_token == TEST_ACCESS_TOKEN
Expand All @@ -187,6 +224,7 @@ async def test_properties_base(event_loop, v2_server):
api = await API.login_via_credentials(
TEST_EMAIL, TEST_PASSWORD, websession)
[system] = await api.get_systems()

assert system.address == TEST_ADDRESS
assert not system.alarm_going_off
assert system.serial == TEST_SYSTEM_SERIAL_NO
Expand Down Expand Up @@ -232,15 +270,19 @@ async def test_set_states_v2(
[system] = await api.get_systems()

await system.set_away()

assert system.state == system.SystemStates.away

await system.set_home()

assert system.state == system.SystemStates.home

await system.set_off()

assert system.state == system.SystemStates.off

await system.set_off()

assert system.state == system.SystemStates.off


Expand Down Expand Up @@ -280,15 +322,19 @@ async def test_set_states_v3(
[system] = await api.get_systems()

await system.set_away()

assert system.state == system.SystemStates.away

await system.set_home()

assert system.state == system.SystemStates.home

await system.set_off()

assert system.state == system.SystemStates.off

await system.set_off()

assert system.state == system.SystemStates.off


Expand All @@ -306,12 +352,13 @@ async def test_unknown_initial_state(caplog, event_loop):

@pytest.mark.asyncio
async def test_unknown_sensor_type(caplog, event_loop, v2_server):
"""Test getting a new access token from a refresh token."""
"""Test whether a message is logged upon finding an unknown sensor type."""
async with v2_server:
async with aiohttp.ClientSession(loop=event_loop) as websession:
api = await API.login_via_credentials(
TEST_EMAIL, TEST_PASSWORD, websession)
_ = await api.get_systems() # noqa

assert any('Unknown' in e.message for e in caplog.records)


Expand All @@ -337,6 +384,7 @@ async def test_update_system_data_v2(
[system] = await api.get_systems()

await system.update()

assert system.serial == TEST_SYSTEM_SERIAL_NO
assert system.system_id == TEST_SYSTEM_ID
assert system.api._access_token == TEST_ACCESS_TOKEN
Expand Down Expand Up @@ -365,6 +413,7 @@ async def test_update_system_data_v3(
[system] = await api.get_systems()

await system.update()

assert system.serial == TEST_SYSTEM_SERIAL_NO
assert system.system_id == TEST_SYSTEM_ID
assert system.api._access_token == TEST_ACCESS_TOKEN
Expand Down