Skip to content

Commit

Permalink
Merge pull request #80 from dknowles2/last-changed-by
Browse files Browse the repository at this point in the history
Add a last_changed_by method to Lock
  • Loading branch information
dknowles2 committed Aug 5, 2023
2 parents 813fc23 + c3d3049 commit c28591e
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 22 deletions.
6 changes: 5 additions & 1 deletion pyschlage/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ def locks(self) -> list[Lock]:
"""
path = Lock.request_path()
response = self._auth.request("get", path, params={"archetype": "lock"})
return [Lock.from_json(self._auth, d) for d in response.json()]
locks = []
for lock_json in response.json():
lock = Lock.from_json(self._auth, lock_json)
lock.refresh_access_codes()
return locks

def users(self) -> list[User]:
"""Retrieves all users associated with this account's locks.
Expand Down
57 changes: 51 additions & 6 deletions pyschlage/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

from dataclasses import dataclass

from .auth import Auth
from .code import AccessCode
from .common import Mutable
from .exceptions import NotAuthenticatedError
from .log import LockLog
from .user import User


@dataclass
Expand Down Expand Up @@ -56,6 +58,12 @@ class Lock(Mutable):
mac_address: str | None = None
"""The MAC address for the lock or None if lock is unavailable."""

users: dict[str, User] | None = None
"""Users with access to this lock, keyed by their ID."""

access_codes: dict[str, AccessCode] | None = None
"""Access codes for this lock, keyed by their ID."""

_cat: str = ""

@staticmethod
Expand All @@ -70,7 +78,7 @@ def request_path(device_id: str | None = None) -> str:
return path

@classmethod
def from_json(cls, auth, json):
def from_json(cls, auth: Auth, json: dict) -> Lock:
"""Creates a Lock from a JSON object.
:meta private:
Expand All @@ -81,6 +89,11 @@ def from_json(cls, auth, json):
is_locked = attributes["lockState"] == 1
is_jammed = attributes["lockState"] == 2

users: dict[str, User] = {}
for user_json in json.get("users", []):
user = User.from_json(user_json)
users[user.user_id] = user

return cls(
_auth=auth,
device_id=json["deviceId"],
Expand All @@ -95,6 +108,7 @@ def from_json(cls, auth, json):
auto_lock_time=attributes.get("autoLockTime", 0),
firmware_version=attributes.get("mainFirmwareVersion"),
mac_address=attributes.get("macAddress"),
users=users,
_cat=json["CAT"],
)

Expand All @@ -104,14 +118,15 @@ def _is_wifi_lock(self) -> bool:
return True
return False

def refresh(self):
def refresh(self) -> None:
"""Refreshes the Lock state.
:raise pyschlage.exceptions.NotAuthorizedError: When authentication fails.
:raise pyschlage.exceptions.UnknownError: On other errors.
"""
path = self.request_path(self.device_id)
self._update_with(self._auth.request("get", path).json())
self.refresh_access_codes()

def _put_attributes(self, attributes):
path = self.request_path(self.device_id)
Expand Down Expand Up @@ -154,6 +169,35 @@ def unlock(self):
"""
self._toggle(0)

def last_changed_by(
self,
logs: list[LockLog] | None = None,
) -> str | None:
"""Determines the last entity or user that changed the lock state.
:param logs: Recent log entries to search through for the last change.
If None, new logs will be fetched.
:type logs: list[LockLog] or None
:rtype: str
"""
if logs is None:
logs = self.logs()

want_prefix = "Locked by " if self.is_locked else "Unlocked by "
want_prefix_len = len(want_prefix)
for log in sorted(logs, reverse=True, key=lambda log: log.created_at):
if not log.message.startswith(want_prefix):
continue
message = log.message[want_prefix_len:]
if message == "keypad":
if code := self.access_codes.get(log.access_code_id, None):
return f"{message} - {code.name}"
elif message == "mobile device":
if user := self.users.get(log.accessor_id, None):
return f"{message} - {user.name}"
return message
return None

def logs(self, limit: int | None = None, sort_desc: bool = False) -> list[LockLog]:
"""Fetches activity logs for the lock.
Expand All @@ -174,7 +218,7 @@ def logs(self, limit: int | None = None, sort_desc: bool = False) -> list[LockLo
resp = self._auth.request("get", path, params=params)
return [LockLog.from_json(l) for l in resp.json()]

def access_codes(self) -> list[AccessCode]:
def refresh_access_codes(self) -> None:
"""Fetches access codes for this lock.
:rtype: list[pyschlage.code.AccessCode]
Expand All @@ -183,9 +227,10 @@ def access_codes(self) -> list[AccessCode]:
"""
path = AccessCode.request_path(self.device_id)
resp = self._auth.request("get", path)
return [
AccessCode.from_json(self._auth, ac, self.device_id) for ac in resp.json()
]
self.access_codes = {}
for code_json in resp.json():
code = AccessCode.from_json(self._auth, code_json, self.device_id)
self.access_codes[code.access_code_id] = code

def add_access_code(self, code: AccessCode):
"""Adds an access code to the lock.
Expand Down
10 changes: 5 additions & 5 deletions pyschlage/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ class LockLog:
created_at: datetime
"""The time at which the log entry was created."""

accessor_id: str | None
message: str
"""The human-readable message associated with the log entry."""

accessor_id: str | None = None
"""Unique identifier for the user that triggered the log entry."""

access_code_id: str | None
access_code_id: str | None = None
"""Unique identifier for the access code that triggered the log entry."""

message: str
"""The human-readable message associated with the log entry."""

@staticmethod
def request_path(device_id: str) -> str:
"""Returns the request path for the LockLog.
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pytest import fixture

from pyschlage.auth import Auth
from pyschlage.code import AccessCode
from pyschlage.lock import Lock


@fixture
Expand Down Expand Up @@ -103,6 +105,15 @@ def wifi_lock_json(lock_users_json):
}


@fixture
def wifi_lock(
mock_auth: mock.Mock, wifi_lock_json: dict, access_code: AccessCode
) -> Lock:
lock = Lock.from_json(mock_auth, wifi_lock_json)
lock.access_codes = {access_code.access_code_id: access_code}
return lock


@fixture
def wifi_lock_unavailable_json(wifi_lock_json):
keep = ("modelName", "serialNumber", "macAddress", "SAT", "CAT")
Expand Down Expand Up @@ -185,6 +196,11 @@ def access_code_json():
}


@fixture
def access_code(mock_auth: mock.Mock, access_code_json: dict) -> AccessCode:
return AccessCode.from_json(mock_auth, access_code_json, "__device_uuid__")


@fixture
def log_json():
return {
Expand Down
Loading

0 comments on commit c28591e

Please sign in to comment.