diff --git a/roborock/protocol.py b/roborock/protocol.py index fdc52c10..6b02836e 100644 --- a/roborock/protocol.py +++ b/roborock/protocol.py @@ -149,6 +149,86 @@ def decrypt_cbc(ciphertext: bytes, token: bytes) -> bytes: return unpad(decipher.decrypt(ciphertext), AES.block_size) return ciphertext + @staticmethod + def _l01_key(local_key: str, timestamp: int) -> bytes: + """Derive key for L01 protocol.""" + hash_input = Utils.encode_timestamp(timestamp) + Utils.ensure_bytes(local_key) + SALT + return hashlib.sha256(hash_input).digest() + + @staticmethod + def _l01_iv(timestamp: int, nonce: int, sequence: int) -> bytes: + """Derive IV for L01 protocol.""" + digest_input = sequence.to_bytes(4, "big") + nonce.to_bytes(4, "big") + timestamp.to_bytes(4, "big") + digest = hashlib.sha256(digest_input).digest() + return digest[:12] + + @staticmethod + def _l01_aad(timestamp: int, nonce: int, sequence: int, connect_nonce: int, ack_nonce: int) -> bytes: + """Derive AAD for L01 protocol.""" + return ( + sequence.to_bytes(4, "big") + + connect_nonce.to_bytes(4, "big") + + ack_nonce.to_bytes(4, "big") + + nonce.to_bytes(4, "big") + + timestamp.to_bytes(4, "big") + ) + + @staticmethod + def encrypt_gcm_l01( + plaintext: bytes, + local_key: str, + timestamp: int, + sequence: int, + nonce: int, + connect_nonce: int, + ack_nonce: int, + ) -> bytes: + """Encrypt plaintext for L01 protocol using AES-256-GCM.""" + if not isinstance(plaintext, bytes): + raise TypeError("plaintext requires bytes") + + key = Utils._l01_key(local_key, timestamp) + iv = Utils._l01_iv(timestamp, nonce, sequence) + aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce) + + cipher = AES.new(key, AES.MODE_GCM, nonce=iv) + cipher.update(aad) + ciphertext, tag = cipher.encrypt_and_digest(plaintext) + + return ciphertext + tag + + @staticmethod + def decrypt_gcm_l01( + payload: bytes, + local_key: str, + timestamp: int, + sequence: int, + nonce: int, + connect_nonce: int, + ack_nonce: int, + ) -> bytes: + """Decrypt payload for L01 protocol using AES-256-GCM.""" + if not isinstance(payload, bytes): + raise TypeError("payload requires bytes") + + key = Utils._l01_key(local_key, timestamp) + iv = Utils._l01_iv(timestamp, nonce, sequence) + aad = Utils._l01_aad(timestamp, nonce, sequence, connect_nonce, ack_nonce) + + if len(payload) < 16: + raise ValueError("Invalid payload length for GCM decryption") + + tag = payload[-16:] + ciphertext = payload[:-16] + + cipher = AES.new(key, AES.MODE_GCM, nonce=iv) + cipher.update(aad) + + try: + return cipher.decrypt_and_verify(ciphertext, tag) + except ValueError as e: + raise RoborockException("GCM tag verification failed") from e + @staticmethod def crc(data: bytes) -> int: """Gather bytes for checksum calculation.""" diff --git a/tests/protocols/test_l01_protocol.py b/tests/protocols/test_l01_protocol.py new file mode 100644 index 00000000..5d04a4f8 --- /dev/null +++ b/tests/protocols/test_l01_protocol.py @@ -0,0 +1,63 @@ +from roborock.protocol import Utils + + +def test_encryption(): + """Tests the L01 GCM encryption logic.""" + local_key = "b8Hj5mFk3QzT7rLp" + timestamp = 1753606905 + sequence = 1 + nonce = 304251 + connect_nonce = 893563 + ack_nonce = 485592656 + payload_str = ( + '{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}' + ) + payload = payload_str.encode("utf-8") + + encrypted_data = Utils.encrypt_gcm_l01( + plaintext=payload, + local_key=local_key, + timestamp=timestamp, + sequence=sequence, + nonce=nonce, + connect_nonce=connect_nonce, + ack_nonce=ack_nonce, + ) + + expected_data = bytes.fromhex( + "fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341" + "6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450" + "f3927443" + ) + + assert encrypted_data == expected_data + + +def test_decryption(): + """Tests the L01 GCM decryption logic.""" + local_key = "b8Hj5mFk3QzT7rLp" + timestamp = 1753606905 + sequence = 1 + nonce = 304251 + connect_nonce = 893563 + ack_nonce = 485592656 + payload = bytes.fromhex( + "fd60c8daca1ccae67f6077477bfa9d37189a38d75b3c4a907c2435d3c146ee84d8f99597e3e1571a015961ceaa4d64bc3695fae024c341" + "6737d77150341de29cad2f95bfaf532358f12bbff89f140fef5b1ee284c3abfe3b83a577910a72056dab4d5a75b182d1a0cba145e3e450" + "f3927443" + ) + decrypted_data = Utils.decrypt_gcm_l01( + payload=payload, + local_key=local_key, + timestamp=timestamp, + sequence=sequence, + nonce=nonce, + connect_nonce=connect_nonce, + ack_nonce=ack_nonce, + ) + decrypted_str = decrypted_data.decode("utf-8") + + expected_str = ( + '{"dps":{"101":"{\\"id\\":1806,\\"method\\":\\"get_prop\\",\\"params\\":[\\"get_status\\"]}"},"t":1753606905}' + ) + assert decrypted_str == expected_str