Skip to content

Commit

Permalink
Rebase endpoints & auth to newest Android app (#175)
Browse files Browse the repository at this point in the history
Co-authored-by: rikroe <rikroe@users.noreply.github.com>
  • Loading branch information
rikroe and rikroe committed Apr 15, 2020
1 parent 340985d commit 7c6b0af
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 73 deletions.
130 changes: 84 additions & 46 deletions bimmer_connected/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import urllib
import os
import json
from threading import Lock
from threading import RLock
from typing import Callable, List
import requests

from bimmer_connected.country_selector import Regions, get_server_url, get_gcdm_oauth_endpoint
from bimmer_connected.country_selector import Regions, LoginType, get_server_url, get_auth_url, get_login_type
from bimmer_connected.vehicle import ConnectedDriveVehicle
from bimmer_connected.const import AUTH_URL, AUTH_URL_LEGACY, VEHICLES_URL, ERROR_CODE_MAPPING
from bimmer_connected.const import BASE_URL, VEHICLES_URL, ERROR_CODE_MAPPING

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,78 +55,116 @@ def __init__(self, username: str, password: str, region: Regions, log_responses:
self._retries_on_500_error = retries_on_500_error
#: list of vehicles associated with this account.
self._vehicles = []
self._lock = Lock()
self._lock = RLock()
self._update_listeners = []

self._get_vehicles()

def _get_oauth_token(self) -> None:
"""Get a new auth token from the server."""
"""Get a new auth token from the server, based on user GCDM tokens"""

with self._lock:
if self._token_expiration is not None and datetime.datetime.now() < self._token_expiration:
_LOGGER.debug('Old token is still valid. Not getting a new one.')
return

_LOGGER.debug('getting new oauth token')

_base_url = BASE_URL.format(server=get_server_url(self._region))
# Get GCDM tokens with username/password
gcdm_tokens = self._get_gcdm_token()

if get_login_type(self._region) == LoginType.LEGACY:
oauth_tokens = gcdm_tokens
else:
# We need a session for cross-request cookies
oauth_session = requests.Session()

# Generate a state_token for OAuth (but don't sign in yet)
oauth_headers = {"ocp-apim-subscription-key": "4be77952d6fe4f25a5e398fd84c77965"}
oauth_state_request = oauth_session.get(
url=_base_url + "/api/Account/ExternalLogin",
headers=oauth_headers,
params={
"provider": "GCDM",
"response_type": "token",
"redirect_uri": _base_url + "/",
"client_id": "self",
},
)
state_token = urllib.parse.parse_qs(oauth_state_request.url).get("state")[0]

# With state_token, GCDM access_token and GCDM refresh_token, retrieve a authenticated session cookie
oauth_session.get(
url=_base_url + "/oauth_callback.html",
headers=oauth_headers,
params={
"scope": "authenticate_user",
"refresh": gcdm_tokens["refresh_token"],
"state": state_token,
"token": gcdm_tokens["access_token"],
},
allow_redirects=False,
)

# With the session cookie, call External Login again to get OAuth access_token, refresh_token and expiry
oauth_token_request = oauth_session.get(
url=_base_url + "/api/Account/ExternalLogin",
headers=oauth_headers,
params={
"provider": "GCDM",
"response_type": "token",
"redirect_uri": _base_url + "/",
"client_id": "self",
},
)
oauth_tokens = dict(urllib.parse.parse_qsl(urllib.parse.urlparse(oauth_token_request.url).fragment))

self._oauth_token = oauth_tokens['access_token']
# not sure how to use the refresh_token, but might be useful in the future...
self._refresh_token = oauth_tokens['refresh_token']
expiration_time = int(oauth_tokens['expires_in'])
self._token_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration_time)
_LOGGER.debug('got new token %s with expiration date %s', self._oauth_token, self._token_expiration)

def _get_gcdm_token(self) -> dict:
"""Get a new GCDM (Global Customer Data Management) token from the server."""
with self._lock:

_LOGGER.debug('getting new GCDM token')
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Content-Length": "124",
"Connection": "Keep-Alive",
"Host": self.server_url,
"Accept-Encoding": "gzip",
"Authorization": "Basic blF2NkNxdHhKdVhXUDc0eGYzQ0p3VUVQOjF6REh4NnVuNGNEanli"
"TEVOTjNreWZ1bVgya0VZaWdXUGNRcGR2RFJwSUJrN3JPSg==",
"Credentials": "nQv6CqtxJuXWP74xf3CJwUEP:1zDHx6un4cDjybLENN3kyfumX2kEYigWPcQpdvDRpIBk7rOJ",
"User-Agent": "okhttp/2.60",
"Authorization": "Basic ZDc2NmI1MzctYTY1NC00Y2JkLWEzZGMtMGNhNTY3MmQ3ZjhkOjE1"
"ZjY5N2Y2LWE1ZDUtNGNhZC05OWQ5LTNhMTViYzdmMzk3Mw==",
"Credentials": "ZDc2NmI1MzctYTY1NC00Y2JkLWEzZGMtMGNhNTY3MmQ3Zjhk:"
"M2YxYzBjZGEtMDgyOC00Y2RjLWFiMmEtMDY2YzJhMjY0ODAz",
}

# we really need all of these parameters
values = {
'grant_type': 'password',
'scope': 'authenticate_user vehicle_data remote_services',
'username': self._username,
'password': self._password,
}

if self._region == Regions.REST_OF_WORLD:
values.update({
'client_id': 'dbf0a542-ebd1-4ff0-a9a7-55172fbfce35',
'response_type': 'token',
'redirect_uri': 'https://www.bmw-connecteddrive.com/app/static/external-dispatch.html',
})
else:
values.update({
'grant_type': 'password',
})

data = urllib.parse.urlencode(values)
if self._region == Regions.REST_OF_WORLD:
url = AUTH_URL.format(
gcdm_oauth_endpoint=get_gcdm_oauth_endpoint(self._region)
)
expected_response_code = 302
else:
url = AUTH_URL_LEGACY.format(server=self.server_url)
expected_response_code = 200
try:
response = self.send_request(url, data=data, headers=headers, allow_redirects=False,
expected_response=expected_response_code, post=True)
response = self.send_request(
BASE_URL.format(server=get_auth_url(self._region)),
data=data,
headers=headers,
allow_redirects=False,
expected_response=200,
post=True)
except OSError as exception:
msg = 'Authentication failed. Maybe your password is invalid?'
_LOGGER.error(msg)
_LOGGER.exception(exception)
raise OSError(msg) from exception

if self._region == Regions.REST_OF_WORLD:
response_json = dict(
urllib.parse.parse_qsl(urllib.parse.urlparse(response.headers['Location']).fragment)
)
else:
response_json = response.json()

self._oauth_token = response_json['access_token']
expiration_time = int(response_json['expires_in'])
self._token_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration_time)
_LOGGER.debug('got new token %s with expiration date %s', self._oauth_token, self._token_expiration)
response_json = response.json()
_LOGGER.debug('got new GCDM token %s', response_json['access_token'])
return response.json()

@property
def request_header(self):
Expand Down
7 changes: 3 additions & 4 deletions bimmer_connected/const.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""URLs for different services and error code mapping."""

AUTH_URL = 'https://customer.bmwgroup.com/{gcdm_oauth_endpoint}/authenticate'
AUTH_URL_LEGACY = 'https://{server}/gcdm/oauth/token'
BASE_URL = 'https://{server}/webapi/v1'
BASE_URL = 'https://{server}'
ENDPOINT_URL = BASE_URL + '/api/gateway/brs/webapi/v1'

VEHICLES_URL = BASE_URL + '/user/vehicles'
VEHICLES_URL = ENDPOINT_URL + '/user/vehicles'
VEHICLE_VIN_URL = VEHICLES_URL + '/{vin}'
VEHICLE_STATUS_URL = VEHICLE_VIN_URL + '/status'
REMOTE_SERVICE_STATUS_URL = VEHICLE_VIN_URL + '/serviceExecutionStatus?serviceType={service_type}'
Expand Down
42 changes: 29 additions & 13 deletions bimmer_connected/country_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,29 @@ class Regions(Enum):
REST_OF_WORLD = 2


#: Mapping from regions to servers
_SERVER_URLS = {
Regions.NORTH_AMERICA: 'b2vapi.bmwgroup.us',
Regions.REST_OF_WORLD: 'b2vapi.bmwgroup.com',
Regions.CHINA: 'b2vapi.bmwgroup.cn:8592'
}
class LoginType(Enum):
"""Login types of BMW API."""
GLOBAL = 0
LEGACY = 1


#: Mapping from regions to servers
_GCDM_OAUTH_ENDPOINTS = {
Regions.NORTH_AMERICA: 'gcdm/usa/oauth',
Regions.REST_OF_WORLD: 'gcdm/oauth',
Regions.CHINA: 'gcdm/oauth'
_SERVER_URLS = {
Regions.NORTH_AMERICA: {
'login_type': LoginType.GLOBAL,
'auth_url': 'customer.bmwgroup.com/gcdm/usa/oauth/token',
'server': 'myc-profile.bmwusa.com',
},
Regions.REST_OF_WORLD: {
'login_type': LoginType.GLOBAL,
'auth_url': 'customer.bmwgroup.com/gcdm/oauth/token',
'server': 'myc-profile.bmwgroup.com',
},
Regions.CHINA: {
'login_type': LoginType.LEGACY,
'auth_url': 'b2vapi.bmwgroup.cn:8592/gcdm/oauth/token',
'server': 'b2vapi.bmwgroup.cn:8592',
}
}


Expand All @@ -48,9 +59,14 @@ def get_region_from_name(name: str) -> Regions:

def get_server_url(region: Regions) -> str:
"""Get the url of the server for the region."""
return _SERVER_URLS[region]
return _SERVER_URLS[region]['server']


def get_auth_url(region: Regions) -> str:
"""Get the url of the server for the region."""
return _SERVER_URLS[region]['auth_url']


def get_gcdm_oauth_endpoint(region: Regions) -> str:
def get_login_type(region: Regions) -> str:
"""Get the url of the server for the region."""
return _GCDM_OAUTH_ENDPOINTS[region]
return _SERVER_URLS[region]['login_type']
27 changes: 19 additions & 8 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
'Max-Forwards': '20',
'Date': 'Sun, 11 Mar 2018 08:16:13 GMT',
'Content-Encoding': 'gzip',
'Location': ('https://www.bmw-connecteddrive.com/app/static/external-dispatch.html'
'#access_token=TOKEN&token_type=Bearer&expires_in=7199')
}

_AUTH_RESPONSE_REDIRECT = (
"https://myc-profile.bmwgroup.com/#access_token=ACCESS_TOKEN"
"&token_type=bearer&expires_in=3480&refresh_token=REFRESH_TOKEN"
"&state=STATE_CODE")

# VehicleState has different names than the json file. So we need to map some of the
# parameters.
ATTRIBUTE_MAPPING = {
Expand Down Expand Up @@ -132,14 +135,14 @@ def __init__(self) -> None:
"""Constructor."""
self.last_request = []
self.responses = [
MockResponse('https://.+/gcdm/oauth/token',
MockResponse('https://.+/gcdm/(.+/)?oauth/token',
headers=_AUTH_RESPONSE_HEADERS,
data_files=['G31_NBTevo/auth_response.json'],
data_files=['auth/auth_response.json'],
status_code=200),
MockResponse('https://.+/gcdm/(.+/)?oauth/authenticate',
MockResponse('https://.+/api/Account/ExternalLogin',
headers=_AUTH_RESPONSE_HEADERS,
data_files=['G31_NBTevo/auth_response.json'],
status_code=302),
redirect_url=_AUTH_RESPONSE_REDIRECT,
status_code=200),
MockResponse('https://.+/webapi/v1/user/vehicles$',
data_files=['vehicles.json']),
]
Expand Down Expand Up @@ -177,6 +180,10 @@ def setup_default_vehicles(self) -> None:
self.add_response('https://.+/webapi/v1/user/vehicles/{vin}/status$'.format(vin=vin),
data_files=['{path}/status.json'.format(path=path)])

def Session(self) -> 'BackendMock': # pylint: disable=invalid-name
"""Returns itself as a requests.Session style object"""
return self


class MockRequest: # pylint: disable=too-few-public-methods
"""Stores the attributes of a request."""
Expand All @@ -198,15 +205,19 @@ class MockResponse:
# pylint: disable=too-many-arguments

def __init__(self, regex: str, data: str = None, data_files: List[str] = None, headers: dict = None,
status_code: int = 200) -> None:
status_code: int = 200, redirect_url: str = None) -> None:
"""Constructor."""
self.regex = re.compile(regex)
self.status_code = status_code
self.url = redirect_url
self.headers = headers
self._usage_count = 0
if self.headers is None:
self.headers = dict()

if self.url is None:
self.url = regex.replace("https://.+", "")

if data_files is not None:
self._data = []
for data_file in data_files:
Expand Down
File renamed without changes.
12 changes: 10 additions & 2 deletions test/test_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,16 @@ def test_us_header(self):
backend_mock = BackendMock()
with mock.patch('bimmer_connected.account.requests', new=backend_mock):
ConnectedDriveAccount(TEST_USERNAME, TEST_PASSWORD, Regions.NORTH_AMERICA)
request = [r for r in backend_mock.last_request if 'oauth' in r.url][0]
self.assertEqual('b2vapi.bmwgroup.us', request.headers['Host'])
request = [r for r in backend_mock.last_request if 'vehicle' in r.url][0]
self.assertIn('myc-profile.bmwusa.com', request.url)

def test_china_header(self):
"""Test if the host is set correctly in the request."""
backend_mock = BackendMock()
with mock.patch('bimmer_connected.account.requests', new=backend_mock):
ConnectedDriveAccount(TEST_USERNAME, TEST_PASSWORD, Regions.CHINA)
request = [r for r in backend_mock.last_request if 'vehicle' in r.url][0]
self.assertIn('b2vapi.bmwgroup.cn', request.url)

def test_anonymize_data(self):
"""Test anonymization function."""
Expand Down

0 comments on commit 7c6b0af

Please sign in to comment.