Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions src/sentry/utils/appleconnect/itunes_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
from collections import namedtuple
from http import HTTPStatus
from typing import List, NewType, Optional
from typing import List, NewType, Optional, Tuple

import requests
import sentry_sdk
Expand Down Expand Up @@ -144,6 +144,9 @@ def __init__(self, service_key: Optional[ITunesServiceKey] = None):
# The scnt header as populated by :meth:`start_login_sequence`.
self._scnt: Optional[str] = None

# The trusted phone info, set by :meth:`_request_trusted_phone_info`.
self._trusted_phone: Optional[TrustedPhoneInfo] = None

@property
def session_id(self) -> str:
"""The session ID, if client already has one (after :meth:start_login_sequence).
Expand Down Expand Up @@ -191,6 +194,9 @@ def to_json(self) -> json.JSONData:
context["session_id"] = self.session_id
if self._scnt is not None:
context["scnt"] = self.scnt
if self._trusted_phone is not None:
context["phone_id"] = self._trusted_phone.id
context["phone_push_mode"] = self._trusted_phone.push_mode
if self.state is ClientState.AUTHENTICATED:
context["session_cookie"] = self.session_cookie()
return context
Expand All @@ -215,6 +221,10 @@ def from_json(cls, context: json.JSONData) -> "ITunesClient":
]:
obj._session_id = context["session_id"]
obj._scnt = context["scnt"]
if obj.state is ClientState.SMS_AUTH_REQUESTED:
obj._trusted_phone = TrustedPhoneInfo(
id=context["phone_id"], push_mode=context["phone_push_mode"]
)
if obj.state in [ClientState.AUTHENTICATED, ClientState.EXPIRED]:
obj.load_session_cookie(context["session_cookie"])
return obj
Expand Down Expand Up @@ -308,7 +318,7 @@ def two_factor_code(self, code: str) -> None:
else:
self.state = ClientState.AUTHENTICATED

def _request_trusted_phone_info(self) -> TrustedPhoneInfo:
def _request_trusted_phone_info(self) -> Tuple[TrustedPhoneInfo, bool]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not particularly married to this return type.

"""Requests the trusted phone info for the account."""
url = "https://idmsa.apple.com/appleauth/auth"
logger.debug("GET %s", url)
Expand All @@ -322,13 +332,15 @@ def _request_trusted_phone_info(self) -> TrustedPhoneInfo:
},
timeout=REQUEST_TIMEOUT,
)

if response.status_code == HTTPStatus.LOCKED:
raise SmsBlockedError
if not response.ok:
raise ITunesError(f"Unexpected response status: {response.status_code}")

try:
info = response.json()["trustedPhoneNumber"]
info = response.json()
trusted_phone_info = info["trustedPhoneNumber"]
except ValueError:
raise ITunesError(
f"Received unexpected response content, response status: {response.status_code}"
Expand All @@ -337,9 +349,18 @@ def _request_trusted_phone_info(self) -> TrustedPhoneInfo:
raise ITunesError(
f"Trusted phone info missing from response with status: {response.status_code}"
)
return TrustedPhoneInfo(
id=info["id"],
push_mode=info["pushMode"],

# The code has already been sent to the only trusted phone number
sms_automatically_sent = len(info.get("trustedPhoneNumbers", [])) == 1 and info.get(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is basically pulled from fastlane's implementation.

they also have more complex logic to handle other cases, such as when a user has multiple trusted phone numbers, etc.

in the same file, they also skip hitting https://idmsa.apple.com/appleauth/auth/verify/phone when these two checks are true.

"noTrustedDevices", False
)

return (
TrustedPhoneInfo(
id=trusted_phone_info["id"],
push_mode=trusted_phone_info["pushMode"],
),
sms_automatically_sent,
)

def request_sms_auth(self) -> None:
Expand All @@ -353,7 +374,14 @@ def request_sms_auth(self) -> None:
ClientState.AUTH_REQUESTED,
ClientState.SMS_AUTH_REQUESTED,
], f"Actual client state: {self.state}"
trusted_phone = self._request_trusted_phone_info()

trusted_phone, sms_automatically_sent = self._request_trusted_phone_info()

if sms_automatically_sent:
self.state = ClientState.SMS_AUTH_REQUESTED
self._trusted_phone = trusted_phone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit odd together with the new assert on line 378?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry about that, this is a little strange. i'm pretty sure this assert is supposed to sit inside of sms_code 🤔 i'll fix this.

return

url = "https://idmsa.apple.com/appleauth/auth/verify/phone"
logger.debug("PUT %s", url)
response = self.session.put(
Expand All @@ -375,23 +403,26 @@ def request_sms_auth(self) -> None:
raise SmsBlockedError
if response.status_code != HTTPStatus.OK:
raise ITunesError(f"Unexpected response status: {response.status_code}")

self.state = ClientState.SMS_AUTH_REQUESTED
self._trusted_phone = trusted_phone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we be already sure this is already set by now?


def sms_code(self, code: str) -> None:
"""Sends the SMS auth code, completing authentication.

:raises InvalidSmsAuthError:
"""
assert self.state is ClientState.SMS_AUTH_REQUESTED, f"Actual client state: {self.state}"
assert self._trusted_phone is not None

url = "https://idmsa.apple.com/appleauth/auth/verify/phone/securitycode"
logger.debug("PUT %s", url)
trusted_phone = self._request_trusted_phone_info()
response = self.session.post(
url,
json={
"securityCode": {"code": code},
"phoneNumber": {"id": trusted_phone.id},
"mode": trusted_phone.push_mode,
"phoneNumber": {"id": self._trusted_phone.id},
"mode": self._trusted_phone.push_mode,
},
headers={
"scnt": self.scnt,
Expand All @@ -401,6 +432,7 @@ def sms_code(self, code: str) -> None:
},
timeout=REQUEST_TIMEOUT,
)

if response.status_code != HTTPStatus.OK:
# TODO: Make invalid code distinguishable from generic error.
raise InvalidAuthCodeError
Expand Down