Skip to content

Commit 50aef42

Browse files
authored
feat: add ability to encrypt and decrypt L01 (#468)
1 parent 423ae16 commit 50aef42

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

roborock/protocol.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,86 @@ def decrypt_cbc(ciphertext: bytes, token: bytes) -> bytes:
149149
return unpad(decipher.decrypt(ciphertext), AES.block_size)
150150
return ciphertext
151151

152+
@staticmethod
153+
def _l01_key(local_key: str, timestamp: int) -> bytes:
154+
"""Derive key for L01 protocol."""
155+
hash_input = Utils.encode_timestamp(timestamp) + Utils.ensure_bytes(local_key) + SALT
156+
return hashlib.sha256(hash_input).digest()
157+
158+
@staticmethod
159+
def _l01_iv(timestamp: int, nonce: int, sequence: int) -> bytes:
160+
"""Derive IV for L01 protocol."""
161+
digest_input = sequence.to_bytes(4, "big") + nonce.to_bytes(4, "big") + timestamp.to_bytes(4, "big")
162+
digest = hashlib.sha256(digest_input).digest()
163+
return digest[:12]
164+
165+
@staticmethod
166+
def _l01_aad(timestamp: int, nonce: int, sequence: int, connect_nonce: int, ack_nonce: int) -> bytes:
167+
"""Derive AAD for L01 protocol."""
168+
return (
169+
sequence.to_bytes(4, "big")
170+
+ connect_nonce.to_bytes(4, "big")
171+
+ ack_nonce.to_bytes(4, "big")
172+
+ nonce.to_bytes(4, "big")
173+
+ timestamp.to_bytes(4, "big")
174+
)
175+
176+
@staticmethod
177+
def encrypt_gcm_l01(
178+
plaintext: bytes,
179+
local_key: str,
180+
timestamp: int,
181+
sequence: int,
182+
nonce: int,
183+
connect_nonce: int,
184+
ack_nonce: int,
185+
) -> bytes:
186+
"""Encrypt plaintext for L01 protocol using AES-256-GCM."""
187+
if not isinstance(plaintext, bytes):
188+
raise TypeError("plaintext requires bytes")
189+
190+
key = Utils._l01_key(local_key, timestamp)
191+
iv = Utils._l01_iv(timestamp, nonce, sequence)
192+
aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce)
193+
194+
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
195+
cipher.update(aad)
196+
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
197+
198+
return ciphertext + tag
199+
200+
@staticmethod
201+
def decrypt_gcm_l01(
202+
payload: bytes,
203+
local_key: str,
204+
timestamp: int,
205+
sequence: int,
206+
nonce: int,
207+
connect_nonce: int,
208+
ack_nonce: int,
209+
) -> bytes:
210+
"""Decrypt payload for L01 protocol using AES-256-GCM."""
211+
if not isinstance(payload, bytes):
212+
raise TypeError("payload requires bytes")
213+
214+
key = Utils._l01_key(local_key, timestamp)
215+
iv = Utils._l01_iv(timestamp, nonce, sequence)
216+
aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce)
217+
218+
if len(payload) < 16:
219+
raise ValueError("Invalid payload length for GCM decryption")
220+
221+
tag = payload[-16:]
222+
ciphertext = payload[:-16]
223+
224+
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
225+
cipher.update(aad)
226+
227+
try:
228+
return cipher.decrypt_and_verify(ciphertext, tag)
229+
except ValueError as e:
230+
raise RoborockException("GCM tag verification failed") from e
231+
152232
@staticmethod
153233
def crc(data: bytes) -> int:
154234
"""Gather bytes for checksum calculation."""
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from roborock.protocol import Utils
2+
3+
4+
def test_encryption():
5+
"""Tests the L01 GCM encryption logic."""
6+
local_key = "b8Hj5mFk3QzT7rLp"
7+
timestamp = 1753606905
8+
sequence = 1
9+
nonce = 304251
10+
connect_nonce = 893563
11+
ack_nonce = 485592656
12+
payload_str = (
13+
'{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}'
14+
)
15+
payload = payload_str.encode("utf-8")
16+
17+
encrypted_data = Utils.encrypt_gcm_l01(
18+
plaintext=payload,
19+
local_key=local_key,
20+
timestamp=timestamp,
21+
sequence=sequence,
22+
nonce=nonce,
23+
connect_nonce=connect_nonce,
24+
ack_nonce=ack_nonce,
25+
)
26+
27+
expected_data = bytes.fromhex(
28+
"fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341"
29+
"6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450"
30+
"f3927443"
31+
)
32+
33+
assert encrypted_data == expected_data
34+
35+
36+
def test_decryption():
37+
"""Tests the L01 GCM decryption logic."""
38+
local_key = "b8Hj5mFk3QzT7rLp"
39+
timestamp = 1753606905
40+
sequence = 1
41+
nonce = 304251
42+
connect_nonce = 893563
43+
ack_nonce = 485592656
44+
payload = bytes.fromhex(
45+
"fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341"
46+
"6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450"
47+
"f3927443"
48+
)
49+
decrypted_data = Utils.decrypt_gcm_l01(
50+
payload=payload,
51+
local_key=local_key,
52+
timestamp=timestamp,
53+
sequence=sequence,
54+
nonce=nonce,
55+
connect_nonce=connect_nonce,
56+
ack_nonce=ack_nonce,
57+
)
58+
decrypted_str = decrypted_data.decode("utf-8")
59+
60+
expected_str = (
61+
'{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}'
62+
)
63+
assert decrypted_str == expected_str

0 commit comments

Comments
 (0)