From a5a2b313860d22c82c7f6d578233466de02f6393 Mon Sep 17 00:00:00 2001 From: David Knowles Date: Sat, 5 Aug 2023 21:54:59 +0000 Subject: [PATCH 1/2] Add a last_changed_by method to Lock --- pyschlage/api.py | 6 +- pyschlage/lock.py | 57 +++++++++++++++-- pyschlage/log.py | 10 +-- tests/conftest.py | 16 +++++ tests/test_lock.py | 151 ++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 218 insertions(+), 22 deletions(-) diff --git a/pyschlage/api.py b/pyschlage/api.py index 98c791f..0e08ebc 100644 --- a/pyschlage/api.py +++ b/pyschlage/api.py @@ -27,7 +27,11 @@ def locks(self) -> list[Lock]: """ path = Lock.request_path() response = self._auth.request("get", path, params={"archetype": "lock"}) - return [Lock.from_json(self._auth, d) for d in response.json()] + locks = [] + for lock_json in response.json(): + lock = Lock.from_json(self._auth, lock_json) + lock.refresh_access_codes() + return locks def users(self) -> list[User]: """Retrieves all users associated with this account's locks. diff --git a/pyschlage/lock.py b/pyschlage/lock.py index 6dd98ac..bd9706c 100644 --- a/pyschlage/lock.py +++ b/pyschlage/lock.py @@ -4,10 +4,12 @@ from dataclasses import dataclass +from .auth import Auth from .code import AccessCode from .common import Mutable from .exceptions import NotAuthenticatedError from .log import LockLog +from .user import User @dataclass @@ -56,6 +58,12 @@ class Lock(Mutable): mac_address: str | None = None """The MAC address for the lock or None if lock is unavailable.""" + users: dict[str, User] | None = None + """Users with access to this lock, keyed by their ID.""" + + access_codes: dict[str, AccessCode] | None = None + """Access codes for this lock, keyed by their ID.""" + _cat: str = "" @staticmethod @@ -70,7 +78,7 @@ def request_path(device_id: str | None = None) -> str: return path @classmethod - def from_json(cls, auth, json): + def from_json(cls, auth: Auth, json: dict) -> Lock: """Creates a Lock from a JSON object. :meta private: @@ -81,6 +89,11 @@ def from_json(cls, auth, json): is_locked = attributes["lockState"] == 1 is_jammed = attributes["lockState"] == 2 + users: dict[str, User] = {} + for user_json in json.get("users", []): + user = User.from_json(user_json) + users[user.user_id] = user + return cls( _auth=auth, device_id=json["deviceId"], @@ -95,6 +108,7 @@ def from_json(cls, auth, json): auto_lock_time=attributes.get("autoLockTime", 0), firmware_version=attributes.get("mainFirmwareVersion"), mac_address=attributes.get("macAddress"), + users=users, _cat=json["CAT"], ) @@ -104,7 +118,7 @@ def _is_wifi_lock(self) -> bool: return True return False - def refresh(self): + def refresh(self) -> None: """Refreshes the Lock state. :raise pyschlage.exceptions.NotAuthorizedError: When authentication fails. @@ -112,6 +126,7 @@ def refresh(self): """ path = self.request_path(self.device_id) self._update_with(self._auth.request("get", path).json()) + self.refresh_access_codes() def _put_attributes(self, attributes): path = self.request_path(self.device_id) @@ -154,6 +169,35 @@ def unlock(self): """ self._toggle(0) + def last_changed_by( + self, + logs: list[LockLog] | None = None, + ) -> str | None: + """Determines the last entity or user that changed the lock state. + + :param logs: Recent log entries to search through for the last change. + If None, new logs will be fetched. + :type logs: list[LockLog] or None + :rtype: str + """ + if logs is None: + logs = self.logs() + + want_prefix = "Locked by " if self.is_locked else "Unlocked by " + want_prefix_len = len(want_prefix) + for log in sorted(logs, reverse=True, key=lambda log: log.created_at): + if not log.message.startswith(want_prefix): + continue + match message := log.message[want_prefix_len:]: + case "keypad": + if code := self.access_codes.get(log.access_code_id, None): + return f"{message} - {code.name}" + case "mobile device": + if user := self.users.get(log.accessor_id, None): + return f"{message} - {user.name}" + return message + return None + def logs(self, limit: int | None = None, sort_desc: bool = False) -> list[LockLog]: """Fetches activity logs for the lock. @@ -174,7 +218,7 @@ def logs(self, limit: int | None = None, sort_desc: bool = False) -> list[LockLo resp = self._auth.request("get", path, params=params) return [LockLog.from_json(l) for l in resp.json()] - def access_codes(self) -> list[AccessCode]: + def refresh_access_codes(self) -> None: """Fetches access codes for this lock. :rtype: list[pyschlage.code.AccessCode] @@ -183,9 +227,10 @@ def access_codes(self) -> list[AccessCode]: """ path = AccessCode.request_path(self.device_id) resp = self._auth.request("get", path) - return [ - AccessCode.from_json(self._auth, ac, self.device_id) for ac in resp.json() - ] + self.access_codes = {} + for code_json in resp.json(): + code = AccessCode.from_json(self._auth, code_json, self.device_id) + self.access_codes[code.access_code_id] = code def add_access_code(self, code: AccessCode): """Adds an access code to the lock. diff --git a/pyschlage/log.py b/pyschlage/log.py index b1e43fa..02041c2 100644 --- a/pyschlage/log.py +++ b/pyschlage/log.py @@ -70,15 +70,15 @@ class LockLog: created_at: datetime """The time at which the log entry was created.""" - accessor_id: str | None + message: str + """The human-readable message associated with the log entry.""" + + accessor_id: str | None = None """Unique identifier for the user that triggered the log entry.""" - access_code_id: str | None + access_code_id: str | None = None """Unique identifier for the access code that triggered the log entry.""" - message: str - """The human-readable message associated with the log entry.""" - @staticmethod def request_path(device_id: str) -> str: """Returns the request path for the LockLog. diff --git a/tests/conftest.py b/tests/conftest.py index 73cf77c..080155a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,8 @@ from pytest import fixture from pyschlage.auth import Auth +from pyschlage.code import AccessCode +from pyschlage.lock import Lock @fixture @@ -103,6 +105,15 @@ def wifi_lock_json(lock_users_json): } +@fixture +def wifi_lock( + mock_auth: mock.Mock, wifi_lock_json: dict, access_code: AccessCode +) -> Lock: + lock = Lock.from_json(mock_auth, wifi_lock_json) + lock.access_codes = {access_code.access_code_id: access_code} + return lock + + @fixture def wifi_lock_unavailable_json(wifi_lock_json): keep = ("modelName", "serialNumber", "macAddress", "SAT", "CAT") @@ -185,6 +196,11 @@ def access_code_json(): } +@fixture +def access_code(mock_auth: mock.Mock, access_code_json: dict) -> AccessCode: + return AccessCode.from_json(mock_auth, access_code_json, "__device_uuid__") + + @fixture def log_json(): return { diff --git a/tests/test_lock.py b/tests/test_lock.py index 1ae37f0..546fa2a 100644 --- a/tests/test_lock.py +++ b/tests/test_lock.py @@ -1,8 +1,13 @@ from copy import deepcopy +from datetime import datetime from unittest import mock +import pytest + from pyschlage.code import AccessCode from pyschlage.lock import Lock +from pyschlage.log import LockLog +from pyschlage.user import User class TestLock: @@ -22,6 +27,10 @@ def test_from_json(self, mock_auth, lock_json): assert lock.auto_lock_time == 0 assert lock.firmware_version == "10.00.00264232" assert lock.mac_address == "AA:BB:CC:00:11:22" + assert lock.users == { + "user-uuid": User("asdf", "asdf@asdf.com", "user-uuid"), + "foo-bar-uuid": User("Foo Bar", "foo@bar.xyz", "foo-bar-uuid"), + } def test_from_json_is_jammed(self, mock_auth, lock_json): lock_json["attributes"]["lockState"] = 2 @@ -38,16 +47,24 @@ def test_from_json_wifi_lock_unavailable( assert lock.is_locked is None assert lock.is_jammed is None - def test_refresh(self, mock_auth, lock_json): + def test_refresh( + self, mock_auth: mock.Mock, lock_json: dict, access_code_json: dict + ) -> None: lock = Lock.from_json(mock_auth, lock_json) lock_json["name"] = "" - mock_auth.request.return_value = mock.Mock( - json=mock.Mock(return_value=lock_json) - ) + mock_auth.request.side_effect = [ + mock.Mock(json=mock.Mock(return_value=lock_json)), + mock.Mock(json=mock.Mock(return_value=[access_code_json])), + ] lock.refresh() - mock_auth.request.assert_called_once_with("get", "devices/__wifi_uuid__") + mock_auth.request.assert_has_calls( + [ + mock.call("get", "devices/__wifi_uuid__"), + mock.call("get", "devices/__wifi_uuid__/storage/accesscode"), + ] + ) assert lock.name == "" def test_lock_wifi(self, mock_auth, wifi_lock_json): @@ -122,20 +139,24 @@ def test_unlock_ble(self, mock_auth, ble_lock_json): ) assert lock.is_locked == False - def test_access_codes(self, mock_auth, lock_json, access_code_json): + def test_refresh_access_codes( + self, mock_auth: mock.Mock, lock_json: dict, access_code_json: dict + ) -> None: lock = Lock.from_json(mock_auth, lock_json) mock_auth.request.return_value = mock.Mock( json=mock.Mock(return_value=[access_code_json]) ) - codes = lock.access_codes() + lock.refresh_access_codes() mock_auth.request.assert_called_once_with( "get", "devices/__wifi_uuid__/storage/accesscode" ) - assert codes == [ - AccessCode.from_json(mock_auth, access_code_json, lock.device_id) - ] + assert lock.access_codes == { + access_code_json["accesscodeId"]: AccessCode.from_json( + mock_auth, access_code_json, lock.device_id + ) + } def test_add_access_code(self, mock_auth, lock_json, access_code_json): lock = Lock.from_json(mock_auth, lock_json) @@ -159,3 +180,113 @@ def test_add_access_code(self, mock_auth, lock_json, access_code_json): assert code._auth == mock_auth assert code.device_id == lock.device_id assert code.access_code_id == "__access_code_uuid__" + + +class TestChangedBy: + @pytest.mark.parametrize("actor", ["keypad", "thumbturn"]) + def test_lock(self, wifi_lock: Lock, actor: str) -> None: + other = { + "keypad": "thumbturn", + "thumbturn": "keypad", + }[actor] + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message=f"Unlocked by {other}", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message=f"Locked by {actor}", + ), + ] + assert wifi_lock.last_changed_by(logs) == actor + + @pytest.mark.parametrize("actor", ["keypad", "thumbturn"]) + def test_unlock(self, wifi_lock: Lock, actor: str) -> None: + other = { + "keypad": "thumbturn", + "thumbturn": "keypad", + }[actor] + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message=f"Locked by {other}", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message=f"Unlocked by {actor}", + ), + ] + wifi_lock.is_locked = False + assert wifi_lock.last_changed_by(logs) == actor + + def test_lock_keypad_code(self, wifi_lock: Lock, access_code: AccessCode) -> None: + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message="Locked by keypad", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message="Locked by keypad", + access_code_id=access_code.access_code_id, + ), + ] + assert wifi_lock.last_changed_by(logs) == "keypad - Friendly name" + + def test_lock_mobile_device(self, wifi_lock: Lock) -> None: + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message="Locked by mobile device", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message="Locked by mobile device", + accessor_id="user-uuid", + ), + ] + assert wifi_lock.last_changed_by(logs) == "mobile device - asdf" + + def test_lock_mobile_device_unknown_user(self, wifi_lock: Lock) -> None: + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message="Locked by mobile device", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message="Locked by mobile device", + accessor_id="some-other-user-uuid", + ), + ] + assert wifi_lock.last_changed_by(logs) == "mobile device" + + def test_no_useful_logs(self, wifi_lock: Lock) -> None: + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message="Lock jammed", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message="Firmware updated", + ), + ] + assert wifi_lock.last_changed_by(logs) is None + + def test_load_logs(self, wifi_lock: Lock) -> None: + logs = [ + LockLog( + created_at=datetime(2023, 1, 1, 0, 0, 0), + message="Unlocked by keypad", + ), + LockLog( + created_at=datetime(2023, 1, 1, 1, 0, 0), + message="Locked by thumbturn", + ), + ] + with mock.patch.object(wifi_lock, "logs") as mock_logs: + mock_logs.return_value = logs + assert wifi_lock.last_changed_by() == "thumbturn" + mock_logs.assert_called_once_with() From c3d304941c5bd44e43d5975efbfc26ba0c42b068 Mon Sep 17 00:00:00 2001 From: David Knowles Date: Sat, 5 Aug 2023 22:01:44 +0000 Subject: [PATCH 2/2] Replace match/case with if/elif block --- pyschlage/lock.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pyschlage/lock.py b/pyschlage/lock.py index bd9706c..9102df5 100644 --- a/pyschlage/lock.py +++ b/pyschlage/lock.py @@ -188,13 +188,13 @@ def last_changed_by( for log in sorted(logs, reverse=True, key=lambda log: log.created_at): if not log.message.startswith(want_prefix): continue - match message := log.message[want_prefix_len:]: - case "keypad": - if code := self.access_codes.get(log.access_code_id, None): - return f"{message} - {code.name}" - case "mobile device": - if user := self.users.get(log.accessor_id, None): - return f"{message} - {user.name}" + message = log.message[want_prefix_len:] + if message == "keypad": + if code := self.access_codes.get(log.access_code_id, None): + return f"{message} - {code.name}" + elif message == "mobile device": + if user := self.users.get(log.accessor_id, None): + return f"{message} - {user.name}" return message return None