Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions roborock/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,86 @@ def decrypt_cbc(ciphertext: bytes, token: bytes) -> bytes:
return unpad(decipher.decrypt(ciphertext), AES.block_size)
return ciphertext

@staticmethod
def _l01_key(local_key: str, timestamp: int) -> bytes:
"""Derive key for L01 protocol."""
hash_input = Utils.encode_timestamp(timestamp) + Utils.ensure_bytes(local_key) + SALT
return hashlib.sha256(hash_input).digest()

@staticmethod
def _l01_iv(timestamp: int, nonce: int, sequence: int) -> bytes:
"""Derive IV for L01 protocol."""
digest_input = sequence.to_bytes(4, "big") + nonce.to_bytes(4, "big") + timestamp.to_bytes(4, "big")
digest = hashlib.sha256(digest_input).digest()
return digest[:12]

@staticmethod
def _l01_aad(timestamp: int, nonce: int, sequence: int, connect_nonce: int, ack_nonce: int) -> bytes:
"""Derive AAD for L01 protocol."""
return (
sequence.to_bytes(4, "big")
+ connect_nonce.to_bytes(4, "big")
+ ack_nonce.to_bytes(4, "big")
+ nonce.to_bytes(4, "big")
+ timestamp.to_bytes(4, "big")
)

@staticmethod
def encrypt_gcm_l01(
plaintext: bytes,
local_key: str,
timestamp: int,
sequence: int,
nonce: int,
connect_nonce: int,
ack_nonce: int,
) -> bytes:
"""Encrypt plaintext for L01 protocol using AES-256-GCM."""
if not isinstance(plaintext, bytes):
raise TypeError("plaintext requires bytes")

key = Utils._l01_key(local_key, timestamp)
iv = Utils._l01_iv(timestamp, nonce, sequence)
aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce)

cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
cipher.update(aad)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)

return ciphertext + tag

@staticmethod
def decrypt_gcm_l01(
payload: bytes,
local_key: str,
timestamp: int,
sequence: int,
nonce: int,
connect_nonce: int,
ack_nonce: int,
) -> bytes:
"""Decrypt payload for L01 protocol using AES-256-GCM."""
if not isinstance(payload, bytes):
raise TypeError("payload requires bytes")

key = Utils._l01_key(local_key, timestamp)
iv = Utils._l01_iv(timestamp, nonce, sequence)
aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce)

if len(payload) < 16:
raise ValueError("Invalid payload length for GCM decryption")

tag = payload[-16:]
ciphertext = payload[:-16]

cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
cipher.update(aad)

try:
return cipher.decrypt_and_verify(ciphertext, tag)
except ValueError as e:
raise RoborockException("GCM tag verification failed") from e

@staticmethod
def crc(data: bytes) -> int:
"""Gather bytes for checksum calculation."""
Expand Down
63 changes: 63 additions & 0 deletions tests/protocols/test_l01_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from roborock.protocol import Utils


def test_encryption():
"""Tests the L01 GCM encryption logic."""
local_key = "b8Hj5mFk3QzT7rLp"
timestamp = 1753606905
sequence = 1
nonce = 304251
connect_nonce = 893563
ack_nonce = 485592656
payload_str = (
'{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}'
)
payload = payload_str.encode("utf-8")

encrypted_data = Utils.encrypt_gcm_l01(
plaintext=payload,
local_key=local_key,
timestamp=timestamp,
sequence=sequence,
nonce=nonce,
connect_nonce=connect_nonce,
ack_nonce=ack_nonce,
)

expected_data = bytes.fromhex(
"fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341"
"6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450"
"f3927443"
)

assert encrypted_data == expected_data


def test_decryption():
"""Tests the L01 GCM decryption logic."""
local_key = "b8Hj5mFk3QzT7rLp"
timestamp = 1753606905
sequence = 1
nonce = 304251
connect_nonce = 893563
ack_nonce = 485592656
payload = bytes.fromhex(
"fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341"
"6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450"
"f3927443"
)
decrypted_data = Utils.decrypt_gcm_l01(
payload=payload,
local_key=local_key,
timestamp=timestamp,
sequence=sequence,
nonce=nonce,
connect_nonce=connect_nonce,
ack_nonce=ack_nonce,
)
decrypted_str = decrypted_data.decode("utf-8")

expected_str = (
'{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}'
)
assert decrypted_str == expected_str