Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions roborock/web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import math
import secrets
import string
import time

import aiohttp
Expand Down Expand Up @@ -190,6 +191,113 @@ async def request_code(self) -> None:
else:
raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")

async def request_code_v4(self) -> None:
"""Request a code using the v4 endpoint."""
try:
self._login_limiter.try_acquire("login")
except BucketFullException as ex:
_LOGGER.info(ex.meta_info)
raise RoborockRateLimit("Reached maximum requests for login. Please try again later.") from ex
base_url = await self._get_base_url()
header_clientid = self._get_header_client_id()
code_request = PreparedRequest(
base_url,
self.session,
{
"header_clientid": header_clientid,
"Content-Type": "application/x-www-form-urlencoded",
"header_clientlang": "en",
},
)

code_response = await code_request.request(
"post",
"/api/v4/email/code/send",
params={"email": self._username, "type": "login", "platform": ""},
)
if code_response is None:
raise RoborockException("Failed to get a response from send email code")
response_code = code_response.get("code")
if response_code != 200:
_LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
if response_code == 2008:
raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
elif response_code == 9002:
raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
else:
raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")

async def sign_key_v3(self, s: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be _sign_key_v3 to keep it private?

"""Sign a randomly generated string."""
base_url = await self._get_base_url()
header_clientid = self._get_header_client_id()
code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})

code_response = await code_request.request(
"post",
"/api/v3/key/sign",
params={"s": s},
)

if not code_response or "data" not in code_response or "k" not in code_response["data"]:
raise RoborockException("Failed to get a response from sign key")
response_code = code_response.get("code")

if response_code != 200:
_LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")

return code_response["data"]["k"]

async def code_login_v4(self, code: int | str, country: str, country_code: int) -> UserData:
"""
Login via code authentication.
:param code: The code from the email.
:param country: The two-character representation of the country, i.e. "US"
:param country_code: the country phone number code i.e. 1 for US.
"""
base_url = await self._get_base_url()
header_clientid = self._get_header_client_id()
x_mercy_ks = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16))
x_mercy_k = await self.sign_key_v3(x_mercy_ks)
login_request = PreparedRequest(
base_url,
self.session,
{"header_clientid": header_clientid, "x-mercy-ks": x_mercy_ks, "x-mercy-k": x_mercy_k},
)
login_response = await login_request.request(
"post",
"/api/v4/auth/email/login/code",
params={
"country": country,
"countryCode": country_code,
"email": self._username,
"code": code,
# Major and minor version are the user agreement version, we will need to see if this needs to be
# dynamic https://usiot.roborock.com/api/v3/app/agreement/latest?country=US
"majorVersion": 14,
"minorVersion": 0,
},
)
if login_response is None:
raise RoborockException("Login request response is None")
response_code = login_response.get("code")
if response_code != 200:
_LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
if response_code == 2018:
raise RoborockInvalidCode("Invalid code - check your code and try again.")
if response_code == 3009:
raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
if response_code == 3006:
raise RoborockInvalidUserAgreement(
"User agreement must be accepted again - or you are attempting to use the Mi Home app account."
)
raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
user_data = login_response.get("data")
if not isinstance(user_data, dict):
raise RoborockException("Got unexpected data type for user_data")
return UserData.from_dict(user_data)

async def pass_login(self, password: str) -> UserData:
try:
self._login_limiter.try_acquire("login")
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ def mock_rest() -> aioresponses:
status=200,
payload={"api": None, "code": 200, "result": None, "status": "ok", "success": True},
)
mocked.post(
re.compile(r"https://.*iot\.roborock\.com/api/v4/email/code/send.*"),
status=200,
payload={"code": 200, "data": None, "msg": "success"},
)
mocked.post(
re.compile(r"https://.*iot\.roborock\.com/api/v3/key/sign.*"),
status=200,
payload={"code": 200, "data": {"k": "mock_k"}, "msg": "success"},
)
mocked.post(
re.compile(r"https://.*iot\.roborock\.com/api/v4/auth/email/login/code.*"),
status=200,
payload={"code": 200, "data": USER_DATA, "msg": "success"},
)
yield mocked


Expand Down
8 changes: 8 additions & 0 deletions tests/test_web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ async def test_execute_scene(mock_rest):
ud = await api.pass_login("password")
await api.execute_scene(ud, 123456)
mock_rest.assert_any_call("https://api-us.roborock.com/user/scene/123456/execute", "post")


async def test_code_login_v4_flow(mock_rest) -> None:
"""Test that we can login with a code and we get back the correct userdata object."""
api = RoborockApiClient(username="test_user@gmail.com")
await api.request_code_v4()
ud = await api.code_login_v4(4123, "US", 1)
assert ud == UserData.from_dict(USER_DATA)