From 66183290e60b3ecc1e09277bb744a8a39c6ddb36 Mon Sep 17 00:00:00 2001 From: Brad Rogers Date: Wed, 1 Apr 2020 11:09:02 -0700 Subject: [PATCH 1/2] Add Support for PKCE --- dropbox/dropbox.py | 14 +-- dropbox/oauth.py | 127 +++++++++++++++++------ example/oauth/commandline-oauth-pkce.py | 29 ++++++ example/{ => oauth}/commandline-oauth.py | 1 + test/test_dropbox_unit.py | 62 +++++++++++ 5 files changed, 198 insertions(+), 35 deletions(-) create mode 100644 example/oauth/commandline-oauth-pkce.py rename example/{ => oauth}/commandline-oauth.py (98%) diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index 3b15acad..f9d7c6ad 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -168,6 +168,7 @@ def __init__(self, :param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token :param str app_key: application key of requesting application; used for token refresh :param str app_secret: application secret of requesting application; used for token refresh + Not required if PKCE was used to authorize the token :param list scope: list of scopes to request on refresh. If left blank, refresh will request all available scopes for application """ @@ -179,8 +180,8 @@ def __init__(self, 'Expected dict, got %r' % headers if oauth2_refresh_token: - assert app_key and app_secret, \ - "app_key and app_secret are required to refresh tokens" + assert app_key, \ + "app_key is required to refresh tokens" if scope is not None: assert len(scope) > 0 and isinstance(scope, list), \ @@ -346,7 +347,7 @@ def check_and_refresh_access_token(self): Checks if access token needs to be refreshed and refreshes if possible :return: """ - can_refresh = self._oauth2_refresh_token and self._app_key and self._app_secret + can_refresh = self._oauth2_refresh_token and self._app_key needs_refresh = self._oauth2_access_token_expiration and \ (datetime.utcnow() + timedelta(seconds=TOKEN_EXPIRATION_BUFFER)) >= \ self._oauth2_access_token_expiration @@ -367,9 +368,9 @@ def refresh_access_token(self, host=API_HOST, scope=None): assert len(scope) > 0 and isinstance(scope, list), \ "Scope list must be of type list" - if not (self._oauth2_refresh_token and self._app_key and self._app_secret): + if not (self._oauth2_refresh_token and self._app_key): self._logger.warning('Unable to refresh access token without \ - refresh token, app key, and app secret') + refresh token and app key') return self._logger.info('Refreshing access token.') @@ -377,8 +378,9 @@ def refresh_access_token(self, host=API_HOST, scope=None): body = {'grant_type': 'refresh_token', 'refresh_token': self._oauth2_refresh_token, 'client_id': self._app_key, - 'client_secret': self._app_secret, } + if self._app_secret: + body['client_secret'] = self._app_secret if scope: scope = " ".join(scope) body['scope'] = scope diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 974a5e1b..e10b32c4 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -1,3 +1,5 @@ +import hashlib + __all__ = [ 'BadRequestException', 'BadStateException', @@ -14,6 +16,7 @@ import os import six import urllib +import re from datetime import datetime, timedelta from .session import ( @@ -31,6 +34,8 @@ TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy'] INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team'] +PKCE_CODE_CHALLENGE_METHOD_TYPES = ['S256', 'plain'] +PKCE_VERIFIER_LENGTH = 128 class OAuth2FlowNoRedirectResult(object): """ @@ -95,7 +100,12 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token, :meth:`DropboxOAuth2Flow.start`. """ super(OAuth2FlowResult, self).__init__( - access_token, account_id, user_id, refresh_token, expires_in, scope) + access_token=access_token, + account_id=account_id, + user_id=user_id, + refresh_token=refresh_token, + expires_in=expires_in, + scope=scope) self.url_state = url_state @classmethod @@ -120,11 +130,18 @@ def __repr__(self): class DropboxOAuth2FlowBase(object): - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): + def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', + scope=None, include_granted_scopes=None, pkce_method=None): if scope is not None: assert len(scope) > 0 and isinstance(scope, list), \ "Scope list must be of type list" + if pkce_method is not None: + assert pkce_method in PKCE_CODE_CHALLENGE_METHOD_TYPES + if token_access_type is not None: + assert token_access_type in TOKEN_ACCESS_TYPES + + assert pkce_method or consumer_secret, \ + "must pass in consumer secret or pkce method" self.consumer_key = consumer_key self.consumer_secret = consumer_secret @@ -134,8 +151,20 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type self.scope = scope self.include_granted_scopes = include_granted_scopes + self.pkce_method = pkce_method + if pkce_method: + self.code_verifier = _generate_pkce_code_verifier() + if pkce_method == 'S256': + self.code_challenge = _generate_pkce_code_challenge(self.code_verifier) + else: + self.code_challenge = self.code_verifier + else: + self.code_verifier = None + self.code_challenge = None + def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, - include_granted_scopes=None): + include_granted_scopes=None, code_challenge=None, + code_challenge_method=None): params = dict(response_type='code', client_id=self.consumer_key) if redirect_uri is not None: @@ -146,22 +175,29 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, assert token_access_type in TOKEN_ACCESS_TYPES if token_access_type != 'legacy': params['token_access_type'] = token_access_type + if code_challenge and code_challenge_method: + assert code_challenge_method in PKCE_CODE_CHALLENGE_METHOD_TYPES + params['code_challenge'] = code_challenge + params['code_challenge_method'] = code_challenge_method if scope is not None: params['scope'] = " ".join(scope) - if include_granted_scopes is not None: - assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES - params['include_granted_scopes'] = str(include_granted_scopes) + if include_granted_scopes is not None: + assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES + params['include_granted_scopes'] = include_granted_scopes return self.build_url('/oauth2/authorize', params, WEB_HOST) - def _finish(self, code, redirect_uri): + def _finish(self, code, redirect_uri, code_verifier): url = self.build_url('/oauth2/token') params = {'grant_type': 'authorization_code', 'code': code, 'client_id': self.consumer_key, - 'client_secret': self.consumer_secret, } + if code_verifier: + params['code_verifier'] = code_verifier + else: + params['client_secret'] = self.consumer_secret if self.locale is not None: params['locale'] = self.locale if redirect_uri is not None: @@ -273,9 +309,8 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): dbx = Dropbox(oauth_result.access_token) """ - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): - # noqa: E501; pylint: disable=useless-super-delegation + def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', + scope=None, include_granted_scopes=None, pkce_method=None): # noqa: E501; """ Construct an instance. @@ -298,14 +333,19 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None + :param str pkce_method: method for code_challenge generation if using PKCE + From the following enum: + S256 - use SHA256 to generate the code_challenge + plain - do not generate the code_challenge, use the code_verifier """ super(DropboxOAuth2FlowNoRedirect, self).__init__( - consumer_key, - consumer_secret, - locale, - token_access_type, - scope, - include_granted_scopes, + consumer_key=consumer_key, + consumer_secret=consumer_secret, + locale=locale, + token_access_type=token_access_type, + scope=scope, + include_granted_scopes=include_granted_scopes, + pkce_method=pkce_method, ) def start(self): @@ -317,8 +357,11 @@ def start(self): access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ - return self._get_authorize_url(None, None, self.token_access_type, self.scope, - self.include_granted_scopes) + return self._get_authorize_url(None, None, self.token_access_type, + scope=self.scope, + include_granted_scopes=self.include_granted_scopes, + code_challenge=self.code_challenge, + code_challenge_method=self.pkce_method) def finish(self, code): """ @@ -331,7 +374,7 @@ def finish(self, code): :rtype: OAuth2FlowNoRedirectResult :raises: The same exceptions as :meth:`DropboxOAuth2Flow.finish()`. """ - return self._finish(code, None) + return self._finish(code, None, self.code_verifier) class DropboxOAuth2Flow(DropboxOAuth2FlowBase): @@ -379,9 +422,10 @@ def dropbox_auth_finish(web_app_session, request): """ - def __init__(self, consumer_key, consumer_secret, redirect_uri, session, - csrf_token_session_key, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None): + def __init__(self, consumer_key, redirect_uri, session, + csrf_token_session_key, consumer_secret=None, locale=None, + token_access_type='legacy', scope=None, + include_granted_scopes=None, pkce_method=None): """ Construct an instance. @@ -412,10 +456,19 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None + :param str pkce_method: method for code_challenge generation if using PKCE + From the following enum: + S256 - use SHA256 to generate the code_challenge + plain - do not generate the code_challenge, use the code_verifier """ - super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale, - token_access_type, scope, - include_granted_scopes) + super(DropboxOAuth2Flow, self).__init__( + consumer_key=consumer_key, + consumer_secret=consumer_secret, + locale=locale, + token_access_type=token_access_type, + scope=scope, + include_granted_scopes=include_granted_scopes, + pkce_method=pkce_method) self.redirect_uri = redirect_uri self.session = session self.csrf_token_session_key = csrf_token_session_key @@ -450,7 +503,10 @@ def start(self, url_state=None): self.session[self.csrf_token_session_key] = csrf_token return self._get_authorize_url(self.redirect_uri, state, self.token_access_type, - self.scope, self.include_granted_scopes) + scope=self.scope, + include_granted_scopes=self.include_granted_scopes, + code_challenge=self.code_challenge, + code_challenge_method=self.pkce_method) def finish(self, query_params): """ @@ -534,7 +590,7 @@ def finish(self, query_params): # If everything went ok, make the network call to get an access token. - no_redirect_result = self._finish(code, self.redirect_uri) + no_redirect_result = self._finish(code, self.redirect_uri, self.code_verifier) return OAuth2FlowResult.from_no_redirect_result( no_redirect_result, url_state) @@ -616,3 +672,16 @@ def encode(o): utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} return url_encode(utf8_params) + +def _generate_pkce_code_verifier(): + code_verifier = base64.urlsafe_b64encode(os.urandom(PKCE_VERIFIER_LENGTH)).decode('utf-8') + code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) + if len(code_verifier) > PKCE_VERIFIER_LENGTH: + code_verifier = code_verifier[:128] + return code_verifier + +def _generate_pkce_code_challenge(code_verifier): + code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') + code_challenge = code_challenge.replace('=', '') + return code_challenge diff --git a/example/oauth/commandline-oauth-pkce.py b/example/oauth/commandline-oauth-pkce.py new file mode 100644 index 00000000..9fbb83a7 --- /dev/null +++ b/example/oauth/commandline-oauth-pkce.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +import dropbox +from dropbox import DropboxOAuth2FlowNoRedirect + +''' +This example uses PKCE, a currently beta feature. +If you are interested in using this, please contact +Dropbox support +''' +APP_KEY = "" + +auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, pkce_method='S256', token_access_type='offline') + +authorize_url = auth_flow.start() +print("1. Go to: " + authorize_url) +print("2. Click \"Allow\" (you might have to log in first).") +print("3. Copy the authorization code.") +auth_code = input("Enter the authorization code here: ").strip() + +try: + oauth_result = auth_flow.finish(auth_code) + print(oauth_result) +except Exception as e: + print('Error: %s' % (e,)) + exit(1) + +dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, app_key=APP_KEY) +dbx.users_get_current_account() diff --git a/example/commandline-oauth.py b/example/oauth/commandline-oauth.py similarity index 98% rename from example/commandline-oauth.py rename to example/oauth/commandline-oauth.py index 83ad8779..32b038b8 100644 --- a/example/commandline-oauth.py +++ b/example/oauth/commandline-oauth.py @@ -19,6 +19,7 @@ print(oauth_result) except Exception as e: print('Error: %s' % (e,)) + exit(1) dbx = dropbox.Dropbox(oauth2_access_token=oauth_result.access_token, app_key=APP_KEY, app_secret=APP_SECRET) diff --git a/test/test_dropbox_unit.py b/test/test_dropbox_unit.py index a6e2a38a..2287ef31 100644 --- a/test/test_dropbox_unit.py +++ b/test/test_dropbox_unit.py @@ -24,6 +24,68 @@ class TestOAuth: + def test_authorization_url(self): + flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', + 'dummy_session', 'dbx-auth-csrf-token') + for redirect_uri in [None, 'localhost']: + for state in [None, 'state']: + for token_access_type in [None, 'legacy', 'offline', 'online']: + for scope in [None, SCOPE_LIST]: + for include_granted_scopes in [None, 'user', 'team']: + for code_challenge_method in [None, 'plain', 'S256']: + for code_challenge in [None, 'mychallenge']: + authorization_url = \ + flow_obj._get_authorize_url(redirect_uri, state, + token_access_type, scope, + include_granted_scopes, + code_challenge, + code_challenge_method) + assert authorization_url\ + .startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in authorization_url + assert 'response_type=code' in authorization_url + + if redirect_uri: + assert 'redirect_uri={}'.format(redirect_uri) \ + in authorization_url + else: + assert 'redirect_uri' not in authorization_url + + if state: + assert 'state={}'.format(state) in authorization_url + else: + assert 'state' not in authorization_url + + if token_access_type and token_access_type != 'legacy': + assert 'token_access_type={}'.format(token_access_type) \ + in authorization_url + else: + assert 'token_access_type' not in authorization_url + + if scope: + assert 'scope={}'.format("+".join(scope)) \ + in authorization_url + else: + assert 'scope' not in authorization_url + + if include_granted_scopes and scope: + assert 'include_granted_scopes={}'\ + .format(include_granted_scopes)\ + in authorization_url + else: + assert 'include_granted_scopes' not in authorization_url + + if code_challenge_method and code_challenge: + assert 'code_challenge_method={}'.format( + code_challenge_method)\ + in authorization_url + assert 'code_challenge={}'.format(code_challenge)\ + in authorization_url + else: + assert 'code_challenge_method' not in authorization_url + assert 'code_challenge' not in authorization_url + def test_authorization_url_legacy_default(self): flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', 'dummy_session', 'dbx-auth-csrf-token') From 8607f769b1642eb6a7f12f8f794e630054ae726e Mon Sep 17 00:00:00 2001 From: Brad Rogers Date: Thu, 2 Apr 2020 12:00:39 -0700 Subject: [PATCH 2/2] Address additional comments --- dropbox/dropbox.py | 37 +++++++------ dropbox/oauth.py | 74 +++++++++++++------------- test/test_dropbox_unit.py | 107 ++++++++++++++++++-------------------- 3 files changed, 109 insertions(+), 109 deletions(-) diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index f9d7c6ad..e3055c42 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -173,19 +173,17 @@ def __init__(self, refresh will request all available scopes for application """ - assert oauth2_access_token or oauth2_refresh_token, \ - 'OAuth2 access token or refresh token must be set' + if not (oauth2_access_token or oauth2_refresh_token): + raise BadInputException('OAuth2 access token or refresh token must be set') - assert headers is None or isinstance(headers, dict), \ - 'Expected dict, got %r' % headers + if headers is not None and not isinstance(headers, dict): + raise BadInputException('Expected dict, got {}'.format(headers)) - if oauth2_refresh_token: - assert app_key, \ - "app_key is required to refresh tokens" + if oauth2_refresh_token and not app_key: + raise BadInputException("app_key is required to refresh tokens") - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") self._oauth2_access_token = oauth2_access_token self._oauth2_refresh_token = oauth2_refresh_token @@ -198,8 +196,9 @@ def __init__(self, self._max_retries_on_error = max_retries_on_error self._max_retries_on_rate_limit = max_retries_on_rate_limit if session: - assert isinstance(session, requests.sessions.Session), \ - 'Expected requests.sessions.Session, got %r' % session + if not isinstance(session, requests.sessions.Session): + raise BadInputException('Expected requests.sessions.Session, got {}' + .format(session)) self._session = session else: self._session = create_session() @@ -364,9 +363,8 @@ def refresh_access_token(self, host=API_HOST, scope=None): :return: """ - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") if not (self._oauth2_refresh_token and self._app_key): self._logger.warning('Unable to refresh access token without \ @@ -721,3 +719,12 @@ def _get_dropbox_client_with_select_header(self, select_header_name, team_member session=self._session, headers=new_headers, ) + +class BadInputException(Exception): + """ + Thrown if incorrect types/values are used + + This should only ever be thrown during testing, app should have validation of input prior to + reaching this point + """ + pass diff --git a/dropbox/oauth.py b/dropbox/oauth.py index e10b32c4..562b8827 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -34,7 +34,6 @@ TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy'] INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team'] -PKCE_CODE_CHALLENGE_METHOD_TYPES = ['S256', 'plain'] PKCE_VERIFIER_LENGTH = 128 class OAuth2FlowNoRedirectResult(object): @@ -131,17 +130,16 @@ def __repr__(self): class DropboxOAuth2FlowBase(object): def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None, pkce_method=None): - if scope is not None: - assert len(scope) > 0 and isinstance(scope, list), \ - "Scope list must be of type list" - if pkce_method is not None: - assert pkce_method in PKCE_CODE_CHALLENGE_METHOD_TYPES - if token_access_type is not None: - assert token_access_type in TOKEN_ACCESS_TYPES - - assert pkce_method or consumer_secret, \ - "must pass in consumer secret or pkce method" + scope=None, include_granted_scopes=None, use_pkce=False): + if scope is not None and (len(scope) == 0 or not isinstance(scope, list)): + raise BadInputException("Scope list must be of type list") + if token_access_type is not None and token_access_type not in TOKEN_ACCESS_TYPES: + raise BadInputException("Token access type must be from the following enum: {}".format( + TOKEN_ACCESS_TYPES)) + if not (use_pkce or consumer_secret): + raise BadInputException("Must pass in either consumer secret or use PKCE") + if include_granted_scopes and not scope: + raise BadInputException("Must pass in scope to pass include_granted_scopes") self.consumer_key = consumer_key self.consumer_secret = consumer_secret @@ -151,20 +149,15 @@ def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access self.scope = scope self.include_granted_scopes = include_granted_scopes - self.pkce_method = pkce_method - if pkce_method: + if use_pkce: self.code_verifier = _generate_pkce_code_verifier() - if pkce_method == 'S256': - self.code_challenge = _generate_pkce_code_challenge(self.code_verifier) - else: - self.code_challenge = self.code_verifier + self.code_challenge = _generate_pkce_code_challenge(self.code_verifier) else: self.code_verifier = None self.code_challenge = None def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, - include_granted_scopes=None, code_challenge=None, - code_challenge_method=None): + include_granted_scopes=None, code_challenge=None): params = dict(response_type='code', client_id=self.consumer_key) if redirect_uri is not None: @@ -175,10 +168,9 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, assert token_access_type in TOKEN_ACCESS_TYPES if token_access_type != 'legacy': params['token_access_type'] = token_access_type - if code_challenge and code_challenge_method: - assert code_challenge_method in PKCE_CODE_CHALLENGE_METHOD_TYPES + if code_challenge: params['code_challenge'] = code_challenge - params['code_challenge_method'] = code_challenge_method + params['code_challenge_method'] = 'S256' if scope is not None: params['scope'] = " ".join(scope) @@ -310,7 +302,7 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): """ def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy', - scope=None, include_granted_scopes=None, pkce_method=None): # noqa: E501; + scope=None, include_granted_scopes=None, use_pkce=False): # noqa: E501; """ Construct an instance. @@ -333,10 +325,9 @@ def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None - :param str pkce_method: method for code_challenge generation if using PKCE - From the following enum: - S256 - use SHA256 to generate the code_challenge - plain - do not generate the code_challenge, use the code_verifier + :param bool use_pkce: Whether or not to use Sha256 based PKCE. PKCE should be only use on + client apps which doesn't call your server. It is less secure than non-PKCE flow but + can be used if you are unable to safely retrieve your app secret """ super(DropboxOAuth2FlowNoRedirect, self).__init__( consumer_key=consumer_key, @@ -345,7 +336,7 @@ def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access token_access_type=token_access_type, scope=scope, include_granted_scopes=include_granted_scopes, - pkce_method=pkce_method, + use_pkce=use_pkce, ) def start(self): @@ -360,8 +351,7 @@ def start(self): return self._get_authorize_url(None, None, self.token_access_type, scope=self.scope, include_granted_scopes=self.include_granted_scopes, - code_challenge=self.code_challenge, - code_challenge_method=self.pkce_method) + code_challenge=self.code_challenge) def finish(self, code): """ @@ -425,7 +415,7 @@ def dropbox_auth_finish(web_app_session, request): def __init__(self, consumer_key, redirect_uri, session, csrf_token_session_key, consumer_secret=None, locale=None, token_access_type='legacy', scope=None, - include_granted_scopes=None, pkce_method=None): + include_granted_scopes=None, use_pkce=False): """ Construct an instance. @@ -456,10 +446,7 @@ def __init__(self, consumer_key, redirect_uri, session, user - include user scopes in the grant team - include team scopes in the grant Note: if this user has never linked the app, include_granted_scopes must be None - :param str pkce_method: method for code_challenge generation if using PKCE - From the following enum: - S256 - use SHA256 to generate the code_challenge - plain - do not generate the code_challenge, use the code_verifier + :param bool use_pkce: Whether or not to use Sha256 based PKCE """ super(DropboxOAuth2Flow, self).__init__( consumer_key=consumer_key, @@ -468,7 +455,7 @@ def __init__(self, consumer_key, redirect_uri, session, token_access_type=token_access_type, scope=scope, include_granted_scopes=include_granted_scopes, - pkce_method=pkce_method) + use_pkce=use_pkce) self.redirect_uri = redirect_uri self.session = session self.csrf_token_session_key = csrf_token_session_key @@ -505,8 +492,7 @@ def start(self, url_state=None): return self._get_authorize_url(self.redirect_uri, state, self.token_access_type, scope=self.scope, include_granted_scopes=self.include_granted_scopes, - code_challenge=self.code_challenge, - code_challenge_method=self.pkce_method) + code_challenge=self.code_challenge) def finish(self, query_params): """ @@ -644,6 +630,16 @@ class ProviderException(Exception): pass +class BadInputException(Exception): + """ + Thrown if incorrect types/values are used + + This should only ever be thrown during testing, app should have validation of input prior to + reaching this point + """ + pass + + def _safe_equals(a, b): if len(a) != len(b): return False diff --git a/test/test_dropbox_unit.py b/test/test_dropbox_unit.py index 2287ef31..88dd4f67 100644 --- a/test/test_dropbox_unit.py +++ b/test/test_dropbox_unit.py @@ -6,6 +6,7 @@ # Tests OAuth Flow from dropbox import DropboxOAuth2Flow, session, Dropbox, create_session +from dropbox.dropbox import BadInputException from dropbox.exceptions import AuthError from dropbox.oauth import OAuth2FlowNoRedirectResult, DropboxOAuth2FlowNoRedirect from datetime import datetime, timedelta @@ -32,59 +33,55 @@ def test_authorization_url(self): for token_access_type in [None, 'legacy', 'offline', 'online']: for scope in [None, SCOPE_LIST]: for include_granted_scopes in [None, 'user', 'team']: - for code_challenge_method in [None, 'plain', 'S256']: - for code_challenge in [None, 'mychallenge']: - authorization_url = \ - flow_obj._get_authorize_url(redirect_uri, state, - token_access_type, scope, - include_granted_scopes, - code_challenge, - code_challenge_method) - assert authorization_url\ - .startswith('https://{}/oauth2/authorize?' - .format(session.WEB_HOST)) - assert 'client_id={}'.format(APP_KEY) in authorization_url - assert 'response_type=code' in authorization_url - - if redirect_uri: - assert 'redirect_uri={}'.format(redirect_uri) \ - in authorization_url - else: - assert 'redirect_uri' not in authorization_url - - if state: - assert 'state={}'.format(state) in authorization_url - else: - assert 'state' not in authorization_url - - if token_access_type and token_access_type != 'legacy': - assert 'token_access_type={}'.format(token_access_type) \ - in authorization_url - else: - assert 'token_access_type' not in authorization_url - - if scope: - assert 'scope={}'.format("+".join(scope)) \ - in authorization_url - else: - assert 'scope' not in authorization_url - - if include_granted_scopes and scope: - assert 'include_granted_scopes={}'\ - .format(include_granted_scopes)\ - in authorization_url - else: - assert 'include_granted_scopes' not in authorization_url - - if code_challenge_method and code_challenge: - assert 'code_challenge_method={}'.format( - code_challenge_method)\ - in authorization_url - assert 'code_challenge={}'.format(code_challenge)\ - in authorization_url - else: - assert 'code_challenge_method' not in authorization_url - assert 'code_challenge' not in authorization_url + for code_challenge in [None, 'mychallenge']: + authorization_url = \ + flow_obj._get_authorize_url(redirect_uri, state, + token_access_type, scope, + include_granted_scopes, + code_challenge) + assert authorization_url\ + .startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in authorization_url + assert 'response_type=code' in authorization_url + + if redirect_uri: + assert 'redirect_uri={}'.format(redirect_uri) \ + in authorization_url + else: + assert 'redirect_uri' not in authorization_url + + if state: + assert 'state={}'.format(state) in authorization_url + else: + assert 'state' not in authorization_url + + if token_access_type and token_access_type != 'legacy': + assert 'token_access_type={}'.format(token_access_type) \ + in authorization_url + else: + assert 'token_access_type' not in authorization_url + + if scope: + assert 'scope={}'.format("+".join(scope)) \ + in authorization_url + else: + assert 'scope' not in authorization_url + + if include_granted_scopes and scope: + assert 'include_granted_scopes={}'\ + .format(include_granted_scopes)\ + in authorization_url + else: + assert 'include_granted_scopes' not in authorization_url + + if code_challenge: + assert 'code_challenge_method=S256' in authorization_url + assert 'code_challenge={}'.format(code_challenge)\ + in authorization_url + else: + assert 'code_challenge_method' not in authorization_url + assert 'code_challenge' not in authorization_url def test_authorization_url_legacy_default(self): flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', @@ -243,12 +240,12 @@ def invalid_grant_session_instance(self, mocker): return session_obj def test_default_Dropbox_raises_assertion_error(self): - with pytest.raises(AssertionError): + with pytest.raises(BadInputException): # Requires either access token or refresh token Dropbox() def test_Dropbox_with_refresh_only_raises_assertion_error(self): - with pytest.raises(AssertionError): + with pytest.raises(BadInputException): # Refresh tokens also require app key and secret Dropbox(oauth2_refresh_token=REFRESH_TOKEN)