From e3d2a6d917a8b6a0b8d2fbd72272fb2b72c99262 Mon Sep 17 00:00:00 2001 From: Brad Rogers Date: Tue, 24 Mar 2020 18:06:14 -0700 Subject: [PATCH 1/3] Add Support for Scopes - Add option for requesting specific scopes and including already granted scopes in pauth flow - Add support for default scope list in Dropbox client - Add support for requesting specific scopes on token refresh - Update oauth command line documentation to work with python3 - Update tox.ini to run pytest instead of outdated python setup.py test - Add venv to .gitignore --- .gitignore | 4 +- dropbox/dropbox.py | 34 +++++++++++--- dropbox/oauth.py | 91 ++++++++++++++++++++++++++++-------- example/commandline-oauth.py | 26 +++++++++++ test/requirements.txt | 3 +- test/test_dropbox.py | 8 ++++ test/test_dropbox_unit.py | 42 +++++++++++++++-- tox.ini | 7 ++- 8 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 example/commandline-oauth.py diff --git a/.gitignore b/.gitignore index cc0a569a..c6fcead9 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,6 @@ __pycache__/ *.pyc *.pyo *~ -.venv \ No newline at end of file +.venv +venv/ +venv3/ diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index eeddcceb..a28d62cd 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -141,12 +141,11 @@ def __init__(self, oauth2_refresh_token=None, oauth2_access_token_expiration=None, app_key=None, - app_secret=None): + app_secret=None, + default_scope_list=None,): """ :param str oauth2_access_token: OAuth2 access token for making client requests. - :param str oauth2_refresh_token: OAuth2 refresh token for refreshing access token - :param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token :param int max_retries_on_error: On 5xx errors, the number of times to retry. :param Optional[int] max_retries_on_rate_limit: On 429 errors, the @@ -165,6 +164,12 @@ def __init__(self, server. After the timeout the client will give up on connection. If `None`, client will wait forever. Defaults to 30 seconds. + :param str oauth2_refresh_token: OAuth2 refresh token for refreshing access token + :param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token + :param str app_key: application key of requesting application; used for token refresh + :param str app_secret: application secret of requesting application; used for token refresh + :param list default_scope_list: list of scopes to request on refresh. If left blank, + refresh will request all available scopes for application """ assert oauth2_access_token or oauth2_refresh_token, \ @@ -177,12 +182,17 @@ def __init__(self, assert app_key and app_secret, \ "app_key and app_secret are required to refresh tokens" + if default_scope_list is not None: + assert isinstance(default_scope_list, list), \ + "Scope list must be of type list" + self._oauth2_access_token = oauth2_access_token self._oauth2_refresh_token = oauth2_refresh_token self._oauth2_access_token_expiration = oauth2_access_token_expiration self._app_key = app_key self._app_secret = app_secret + self._default_scope_list = default_scope_list self._max_retries_on_error = max_retries_on_error self._max_retries_on_rate_limit = max_retries_on_rate_limit @@ -222,7 +232,8 @@ def clone( oauth2_refresh_token=None, oauth2_access_token_expiration=None, app_key=None, - app_secret=None): + app_secret=None, + default_scope_list=None): """ Creates a new copy of the Dropbox client with the same defaults unless modified by arguments to clone() @@ -245,6 +256,7 @@ def clone( oauth2_access_token_expiration or self._oauth2_access_token_expiration, app_key or self._app_key, app_secret or self._app_secret, + default_scope_list or self._default_scope_list ) def request(self, @@ -332,7 +344,6 @@ def request(self, def check_and_refresh_access_token(self): """ Checks if access token needs to be refreshed and refreshes if possible - :return: """ can_refresh = self._oauth2_refresh_token and self._app_key and self._app_secret @@ -341,15 +352,21 @@ def check_and_refresh_access_token(self): self._oauth2_access_token_expiration needs_token = not self._oauth2_access_token if (needs_refresh or needs_token) and can_refresh: - self.refresh_access_token() + self.refresh_access_token(scope_list=self._default_scope_list) - def refresh_access_token(self, host=API_HOST): + def refresh_access_token(self, host=API_HOST, scope_list=None): """ Refreshes an access token via refresh token if available + :param host: host to hit token endpoint with + :param scope_list: list of permission scopes for access token :return: """ + if scope_list is not None: + assert isinstance(scope_list, list), \ + "Scope list must be of type list" + if not (self._oauth2_refresh_token and self._app_key and self._app_secret): self._logger.warning('Unable to refresh access token without \ refresh token, app key, and app secret') @@ -362,6 +379,9 @@ def refresh_access_token(self, host=API_HOST): 'client_id': self._app_key, 'client_secret': self._app_secret, } + if scope_list: + scope = " ".join(scope_list) + body['scope'] = scope res = self._session.post(url, data=body) if res.status_code == 400 and res.json()['error'] == 'invalid_grant': diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 62273ba9..69b64ba4 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -30,6 +30,7 @@ url_encode = urllib.urlencode # pylint: disable=no-member,useless-suppression TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy'] +INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team'] class OAuth2FlowNoRedirectResult(object): """ @@ -39,17 +40,22 @@ class OAuth2FlowNoRedirectResult(object): in using them, please contact Dropbox support """ - def __init__(self, access_token, account_id, user_id, refresh_token, expiration): + def __init__(self, access_token, account_id, user_id, refresh_token, expiration, scope_list): """ Args: access_token (str): Token to be used to authenticate later requests. + refresh_token (str): Token to be used to acquire new access token + when existing one expires + expiration (int, datetime): Either the number of seconds from now that the token expires + in or the datetime at which the token expires account_id (str): The Dropbox user's account ID. user_id (str): Deprecated (use account_id instead). refresh_token (str): Token to be used to acquire new access token when existing one expires expiration (int, datetime): Either the number of seconds from now that the token expires in or the datetime at which the token expires + scope_list (list): list of scopes to request in base oauth flow. """ self.access_token = access_token if not expiration: @@ -61,14 +67,16 @@ def __init__(self, access_token, account_id, user_id, refresh_token, expiration) self.refresh_token = refresh_token self.account_id = account_id self.user_id = user_id + self.scope_list = scope_list def __repr__(self): - return 'OAuth2FlowNoRedirectResult(%s, %s, %s, %s, %s)' % ( + return 'OAuth2FlowNoRedirectResult(%s, %s, %s, %s, %s, %s)' % ( self.access_token, self.account_id, self.user_id, self.refresh_token, self.expires_at, + self.scope_list, ) @@ -77,7 +85,8 @@ class OAuth2FlowResult(OAuth2FlowNoRedirectResult): Authorization information for an OAuth2Flow with redirect. """ - def __init__(self, access_token, account_id, user_id, url_state, refresh_token, expires_in): + def __init__(self, access_token, account_id, user_id, url_state, refresh_token, + expires_in, scope_list): """ Same as OAuth2FlowNoRedirectResult but with url_state. @@ -86,20 +95,23 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token, :meth:`DropboxOAuth2Flow.start`. """ super(OAuth2FlowResult, self).__init__( - access_token, account_id, user_id, refresh_token, expires_in) + access_token, account_id, user_id, refresh_token, expires_in, scope_list) self.url_state = url_state @classmethod def from_no_redirect_result(cls, result, url_state): assert isinstance(result, OAuth2FlowNoRedirectResult) return cls(result.access_token, result.account_id, result.user_id, - url_state, result.refresh_token, result.expires_at) + url_state, result.refresh_token, result.expires_at, result.scope_list) def __repr__(self): - return 'OAuth2FlowResult(%s, %s, %s, %s, %s, %s)' % ( + return 'OAuth2FlowResult(%s, %s, %s, %s, %s, %s, %s, %s, %s)' % ( self.access_token, + self.refresh_token, + self.expires_at, self.account_id, self.user_id, + self.scope_list, self.url_state, self.refresh_token, self.expires_at, @@ -108,14 +120,22 @@ def __repr__(self): class DropboxOAuth2FlowBase(object): - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy'): + def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', + scope_list=None, include_granted_scopes=None): + if scope_list is not None: + assert isinstance(scope_list, list), \ + "Scope list must be of type list" + self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.locale = locale self.token_access_type = token_access_type self.requests_session = pinned_session() + self.scope_list = scope_list + self.include_granted_scopes = include_granted_scopes - def _get_authorize_url(self, redirect_uri, state, token_access_type): + def _get_authorize_url(self, redirect_uri, state, token_access_type, scope_list=None, + include_granted_scopes=None): params = dict(response_type='code', client_id=self.consumer_key) if redirect_uri is not None: @@ -127,6 +147,12 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type): if token_access_type != 'legacy': params['token_access_type'] = token_access_type + if scope_list is not None: + params['scope'] = " ".join(scope_list) + if include_granted_scopes is not None: + assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES + params['include_granted_scopes'] = str(include_granted_scopes) + return self.build_url('/oauth2/authorize', params, WEB_HOST) def _finish(self, code, redirect_uri): @@ -163,6 +189,11 @@ def _finish(self, code, redirect_uri): else: expires_in = None + if 'scope' in d: + scope_list = d + else: + scope_list = None + uid = d['uid'] return OAuth2FlowNoRedirectResult( @@ -171,7 +202,7 @@ def _finish(self, code, redirect_uri): uid, refresh_token, expires_in, - ) + scope_list) def build_path(self, target, params=None): """Build the path component for an API URL. @@ -228,21 +259,23 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET) authorize_url = auth_flow.start() - print "1. Go to: " + authorize_url - print "2. Click \\"Allow\\" (you might have to log in first)." - print "3. Copy the authorization code." + print("1. Go to: " + authorize_url) + print("2. Click \\"Allow\\" (you might have to log in first).") + print("3. Copy the authorization code.") auth_code = raw_input("Enter the authorization code here: ").strip() try: oauth_result = auth_flow.finish(auth_code) - except Exception, e: + except Exception as e: print('Error: %s' % (e,)) return dbx = Dropbox(oauth_result.access_token) """ - def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy'): # noqa: E501; pylint: disable=useless-super-delegation + def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', + scope_list=None, include_granted_scopes=None): + # noqa: E501; pylint: disable=useless-super-delegation """ Construct an instance. @@ -258,12 +291,21 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type legacy - creates one long-lived token with no expiration online - create one short-lived token with an expiration offline - create one short-lived token with an expiration with a refresh token + :param list scope_list: list of scopes to request in base oauth flow. If left blank, + will default to all scopes for app + :param str include_granted_scopes: which scopes to include from previous grants + From the following enum: + user - include user scopes in the grant + team - include team scopes in the grant + Note: if this user has never linked the app, include_granted_scopes must be None """ super(DropboxOAuth2FlowNoRedirect, self).__init__( consumer_key, consumer_secret, locale, token_access_type, + scope_list, + include_granted_scopes, ) def start(self): @@ -275,7 +317,8 @@ def start(self): access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ - return self._get_authorize_url(None, None, self.token_access_type) + return self._get_authorize_url(None, None, self.token_access_type, self.scope_list, + self.include_granted_scopes) def finish(self, code): """ @@ -337,7 +380,8 @@ def dropbox_auth_finish(web_app_session, request): """ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, - csrf_token_session_key, locale=None, token_access_type='legacy'): + csrf_token_session_key, locale=None, token_access_type='legacy', + scope_list=None, include_granted_scopes=None): """ Construct an instance. @@ -361,9 +405,17 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, legacy - creates one long-lived token with no expiration online - create one short-lived token with an expiration offline - create one short-lived token with an expiration with a refresh token + :param list scope_list: list of scopes to request in base oauth flow. If left blank, + will default to all scopes for app + :param str include_granted_scopes: which scopes to include from previous grants + From the following enum: + user - include user scopes in the grant + team - include team scopes in the grant + Note: if this user has never linked the app, include_granted_scopes must be None """ - super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, - locale, token_access_type) + super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale, + token_access_type, scope_list, + include_granted_scopes) self.redirect_uri = redirect_uri self.session = session self.csrf_token_session_key = csrf_token_session_key @@ -397,7 +449,8 @@ def start(self, url_state=None): state += "|" + url_state self.session[self.csrf_token_session_key] = csrf_token - return self._get_authorize_url(self.redirect_uri, state, self.token_access_type) + return self._get_authorize_url(self.redirect_uri, state, self.token_access_type, + self.scope_list, self.include_granted_scopes) def finish(self, query_params): """ diff --git a/example/commandline-oauth.py b/example/commandline-oauth.py new file mode 100644 index 00000000..2560960e --- /dev/null +++ b/example/commandline-oauth.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 + +import dropbox +from dropbox import DropboxOAuth2FlowNoRedirect + +APP_KEY = "" +APP_SECRET = "" + +auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET, scope_list=['files.metadata.read'], + token_access_type='offline') + +authorize_url = auth_flow.start() +print("1. Go to: " + authorize_url) +print("2. Click \"Allow\" (you might have to log in first).") +print("3. Copy the authorization code.") +auth_code = input("Enter the authorization code here: ").strip() + +try: + oauth_result = auth_flow.finish(auth_code) + print(oauth_result) +except Exception as e: + print('Error: %s' % (e,)) + +dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, + app_key=APP_KEY, app_secret=APP_SECRET) +dbx.users_get_current_account() diff --git a/test/requirements.txt b/test/requirements.txt index 55d9d7df..1a6161ba 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,2 +1,3 @@ +pytest mock -pytest-mock \ No newline at end of file +pytest-mock diff --git a/test/test_dropbox.py b/test/test_dropbox.py index b720f6d9..4fa1ffd0 100644 --- a/test/test_dropbox.py +++ b/test/test_dropbox.py @@ -120,6 +120,14 @@ def test_bad_auth(self): def test_refresh(self, dbx): dbx.users_get_current_account() + @refresh_dbx_from_env + def test_downscope(self, dbx): + dbx.users_get_current_account() + dbx.refresh_access_token(scope_list=['files.metadata.read']) + with self.assertRaises(AuthError): + # Should fail because downscoped to not include needed scope + dbx.users_get_current_account() + @dbx_from_env def test_rpc(self, dbx): dbx.files_list_folder('') diff --git a/test/test_dropbox_unit.py b/test/test_dropbox_unit.py index ab43732c..120a9691 100644 --- a/test/test_dropbox_unit.py +++ b/test/test_dropbox_unit.py @@ -17,9 +17,10 @@ EXPIRES_IN = 14400 ACCOUNT_ID = 'dummy_account_id' USER_ID = 'dummy_user_id' +SCOPE_LIST = 'files.metadata' EXPIRATION = datetime.utcnow() + timedelta(seconds=EXPIRES_IN) -EXPIRATION_BUFFER = timedelta(minutes=1) +EXPIRATION_BUFFER = timedelta(minutes=5) class TestOAuth: @@ -60,9 +61,38 @@ def test_authorization_url_offline_token_type(self): assert 'response_type=code' in offline_authorization_url assert 'token_access_type=offline' in offline_authorization_url + def test_authorization_url_with_scopes_and_granted(self): + flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', + 'dummy_session', 'dbx-auth-csrf-token') + + scopes = ['account_info.read', 'files.metadata.read'] + scope_authorization_url = flow_obj._get_authorize_url(None, None, 'offline', scopes, 'user') + assert scope_authorization_url.startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in scope_authorization_url + assert 'response_type=code' in scope_authorization_url + assert 'token_access_type=offline' in scope_authorization_url + assert 'scope=account_info.read+files.metadata.read' in scope_authorization_url + assert 'include_granted_scopes=user' in scope_authorization_url + + def test_authorization_url_with_scopes(self): + flow_obj = DropboxOAuth2Flow(APP_KEY, APP_SECRET, 'http://localhost/dummy', + 'dummy_session', 'dbx-auth-csrf-token') + + scopes = ['account_info.read', 'files.metadata.read'] + scope_authorization_url = flow_obj._get_authorize_url(None, None, 'offline', scopes) + assert scope_authorization_url.startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in scope_authorization_url + assert 'response_type=code' in scope_authorization_url + assert 'token_access_type=offline' in scope_authorization_url + assert 'scope=account_info.read+files.metadata.read' in scope_authorization_url + assert 'include_granted_scopes' not in scope_authorization_url + def test_OAuth2FlowNoRedirectResult_legacy(self): # Test legacy result - result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, None, None) + result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, None, None, + SCOPE_LIST) assert result_obj.access_token == ACCESS_TOKEN assert not result_obj.refresh_token assert not result_obj.expires_at @@ -70,16 +100,18 @@ def test_OAuth2FlowNoRedirectResult_legacy(self): def test_OAuth2FlowNoRedirectResult_offline(self): # Test offline result result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, - REFRESH_TOKEN, EXPIRES_IN) + REFRESH_TOKEN, EXPIRES_IN, SCOPE_LIST) assert result_obj.access_token == ACCESS_TOKEN assert result_obj.refresh_token == REFRESH_TOKEN assert abs(result_obj.expires_at - EXPIRATION) < EXPIRATION_BUFFER assert result_obj.account_id == ACCOUNT_ID assert result_obj.user_id == USER_ID + assert result_obj.scope_list == SCOPE_LIST def test_OAuth2FlowNoRedirectResult_online(self): # Test online result - result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, None, EXPIRES_IN) + result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, None, EXPIRES_IN, + SCOPE_LIST) assert result_obj.access_token == ACCESS_TOKEN assert not result_obj.refresh_token assert abs(result_obj.expires_at - EXPIRATION) < EXPIRATION_BUFFER @@ -87,7 +119,7 @@ def test_OAuth2FlowNoRedirectResult_online(self): def test_OAuth2FlowNoRedirectResult_copy(self): # Test constructor for copying object result_obj = OAuth2FlowNoRedirectResult(ACCESS_TOKEN, ACCOUNT_ID, USER_ID, - REFRESH_TOKEN, EXPIRATION) + REFRESH_TOKEN, EXPIRATION, SCOPE_LIST) assert result_obj.expires_at == EXPIRATION class TestClient: diff --git a/tox.ini b/tox.ini index 478f8e52..06432b1d 100644 --- a/tox.ini +++ b/tox.ini @@ -23,14 +23,19 @@ max-line-length = 100 [testenv] commands = - python setup.py test {posargs} + pytest {posargs} passenv = + DROPBOX_REFRESH_TOKEN + DROPBOX_APP_KEY + DROPBOX_APP_SECRET DROPBOX_DOMAIN DROPBOX_TEAM_TOKEN DROPBOX_TOKEN DROPBOX_WEB_HOST +deps = + -rtest/requirements.txt [testenv:check] From a542b7acf942310b6203d006e80ad8f178c635f6 Mon Sep 17 00:00:00 2001 From: Brad Rogers Date: Tue, 31 Mar 2020 15:02:41 -0700 Subject: [PATCH 2/3] Make changes to address comments --- dropbox/dropbox.py | 28 +++++++++---------- dropbox/oauth.py | 52 ++++++++++++++++++------------------ example/commandline-oauth.py | 5 ++-- test/test_dropbox.py | 2 +- test/test_dropbox_unit.py | 46 ++++++++++++++++++++++++++++--- 5 files changed, 86 insertions(+), 47 deletions(-) diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index a28d62cd..834bca10 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -142,7 +142,7 @@ def __init__(self, oauth2_access_token_expiration=None, app_key=None, app_secret=None, - default_scope_list=None,): + scope=None,): """ :param str oauth2_access_token: OAuth2 access token for making client requests. @@ -168,7 +168,7 @@ def __init__(self, :param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token :param str app_key: application key of requesting application; used for token refresh :param str app_secret: application secret of requesting application; used for token refresh - :param list default_scope_list: list of scopes to request on refresh. If left blank, + :param list scope: list of scopes to request on refresh. If left blank, refresh will request all available scopes for application """ @@ -182,8 +182,8 @@ def __init__(self, assert app_key and app_secret, \ "app_key and app_secret are required to refresh tokens" - if default_scope_list is not None: - assert isinstance(default_scope_list, list), \ + if scope is not None: + assert not scope or isinstance(scope, list), \ "Scope list must be of type list" self._oauth2_access_token = oauth2_access_token @@ -192,7 +192,7 @@ def __init__(self, self._app_key = app_key self._app_secret = app_secret - self._default_scope_list = default_scope_list + self._scope = scope self._max_retries_on_error = max_retries_on_error self._max_retries_on_rate_limit = max_retries_on_rate_limit @@ -233,7 +233,7 @@ def clone( oauth2_access_token_expiration=None, app_key=None, app_secret=None, - default_scope_list=None): + scope=None): """ Creates a new copy of the Dropbox client with the same defaults unless modified by arguments to clone() @@ -256,7 +256,7 @@ def clone( oauth2_access_token_expiration or self._oauth2_access_token_expiration, app_key or self._app_key, app_secret or self._app_secret, - default_scope_list or self._default_scope_list + scope or self._scope ) def request(self, @@ -352,19 +352,19 @@ def check_and_refresh_access_token(self): self._oauth2_access_token_expiration needs_token = not self._oauth2_access_token if (needs_refresh or needs_token) and can_refresh: - self.refresh_access_token(scope_list=self._default_scope_list) + self.refresh_access_token(scope=self._scope) - def refresh_access_token(self, host=API_HOST, scope_list=None): + def refresh_access_token(self, host=API_HOST, scope=None): """ Refreshes an access token via refresh token if available :param host: host to hit token endpoint with - :param scope_list: list of permission scopes for access token + :param scope: list of permission scopes for access token :return: """ - if scope_list is not None: - assert isinstance(scope_list, list), \ + if scope is not None: + assert not scope or isinstance(scope, list), \ "Scope list must be of type list" if not (self._oauth2_refresh_token and self._app_key and self._app_secret): @@ -379,8 +379,8 @@ def refresh_access_token(self, host=API_HOST, scope_list=None): 'client_id': self._app_key, 'client_secret': self._app_secret, } - if scope_list: - scope = " ".join(scope_list) + if scope: + scope = " ".join(scope) body['scope'] = scope res = self._session.post(url, data=body) diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 69b64ba4..50ee4203 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -40,7 +40,7 @@ class OAuth2FlowNoRedirectResult(object): in using them, please contact Dropbox support """ - def __init__(self, access_token, account_id, user_id, refresh_token, expiration, scope_list): + def __init__(self, access_token, account_id, user_id, refresh_token, expiration, scope): """ Args: access_token (str): Token to be used to authenticate later @@ -55,7 +55,7 @@ def __init__(self, access_token, account_id, user_id, refresh_token, expiration, when existing one expires expiration (int, datetime): Either the number of seconds from now that the token expires in or the datetime at which the token expires - scope_list (list): list of scopes to request in base oauth flow. + scope (list): list of scopes to request in base oauth flow. """ self.access_token = access_token if not expiration: @@ -67,7 +67,7 @@ def __init__(self, access_token, account_id, user_id, refresh_token, expiration, self.refresh_token = refresh_token self.account_id = account_id self.user_id = user_id - self.scope_list = scope_list + self.scope = scope def __repr__(self): return 'OAuth2FlowNoRedirectResult(%s, %s, %s, %s, %s, %s)' % ( @@ -76,7 +76,7 @@ def __repr__(self): self.user_id, self.refresh_token, self.expires_at, - self.scope_list, + self.scope, ) @@ -86,7 +86,7 @@ class OAuth2FlowResult(OAuth2FlowNoRedirectResult): """ def __init__(self, access_token, account_id, user_id, url_state, refresh_token, - expires_in, scope_list): + expires_in, scope): """ Same as OAuth2FlowNoRedirectResult but with url_state. @@ -95,14 +95,14 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token, :meth:`DropboxOAuth2Flow.start`. """ super(OAuth2FlowResult, self).__init__( - access_token, account_id, user_id, refresh_token, expires_in, scope_list) + access_token, account_id, user_id, refresh_token, expires_in, scope) self.url_state = url_state @classmethod def from_no_redirect_result(cls, result, url_state): assert isinstance(result, OAuth2FlowNoRedirectResult) return cls(result.access_token, result.account_id, result.user_id, - url_state, result.refresh_token, result.expires_at, result.scope_list) + url_state, result.refresh_token, result.expires_at, result.scope) def __repr__(self): return 'OAuth2FlowResult(%s, %s, %s, %s, %s, %s, %s, %s, %s)' % ( @@ -111,7 +111,7 @@ def __repr__(self): self.expires_at, self.account_id, self.user_id, - self.scope_list, + self.scope, self.url_state, self.refresh_token, self.expires_at, @@ -121,9 +121,9 @@ def __repr__(self): class DropboxOAuth2FlowBase(object): def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope_list=None, include_granted_scopes=None): - if scope_list is not None: - assert isinstance(scope_list, list), \ + scope=None, include_granted_scopes=None): + if scope is not None: + assert not scope or isinstance(scope, list), \ "Scope list must be of type list" self.consumer_key = consumer_key @@ -131,10 +131,10 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type self.locale = locale self.token_access_type = token_access_type self.requests_session = pinned_session() - self.scope_list = scope_list + self.scope = scope self.include_granted_scopes = include_granted_scopes - def _get_authorize_url(self, redirect_uri, state, token_access_type, scope_list=None, + def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None, include_granted_scopes=None): params = dict(response_type='code', client_id=self.consumer_key) @@ -147,8 +147,8 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope_list= if token_access_type != 'legacy': params['token_access_type'] = token_access_type - if scope_list is not None: - params['scope'] = " ".join(scope_list) + if scope is not None: + params['scope'] = " ".join(scope) if include_granted_scopes is not None: assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES params['include_granted_scopes'] = str(include_granted_scopes) @@ -190,9 +190,9 @@ def _finish(self, code, redirect_uri): expires_in = None if 'scope' in d: - scope_list = d + scope = d['scope'] else: - scope_list = None + scope = None uid = d['uid'] @@ -202,7 +202,7 @@ def _finish(self, code, redirect_uri): uid, refresh_token, expires_in, - scope_list) + scope) def build_path(self, target, params=None): """Build the path component for an API URL. @@ -274,7 +274,7 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): """ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', - scope_list=None, include_granted_scopes=None): + scope=None, include_granted_scopes=None): # noqa: E501; pylint: disable=useless-super-delegation """ Construct an instance. @@ -291,7 +291,7 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type legacy - creates one long-lived token with no expiration online - create one short-lived token with an expiration offline - create one short-lived token with an expiration with a refresh token - :param list scope_list: list of scopes to request in base oauth flow. If left blank, + :param list scope: list of scopes to request in base oauth flow. If left blank, will default to all scopes for app :param str include_granted_scopes: which scopes to include from previous grants From the following enum: @@ -304,7 +304,7 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type consumer_secret, locale, token_access_type, - scope_list, + scope, include_granted_scopes, ) @@ -317,7 +317,7 @@ def start(self): access the user's Dropbox account. Tell the user to visit this URL and approve your app. """ - return self._get_authorize_url(None, None, self.token_access_type, self.scope_list, + return self._get_authorize_url(None, None, self.token_access_type, self.scope, self.include_granted_scopes) def finish(self, code): @@ -381,7 +381,7 @@ def dropbox_auth_finish(web_app_session, request): def __init__(self, consumer_key, consumer_secret, redirect_uri, session, csrf_token_session_key, locale=None, token_access_type='legacy', - scope_list=None, include_granted_scopes=None): + scope=None, include_granted_scopes=None): """ Construct an instance. @@ -405,7 +405,7 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, legacy - creates one long-lived token with no expiration online - create one short-lived token with an expiration offline - create one short-lived token with an expiration with a refresh token - :param list scope_list: list of scopes to request in base oauth flow. If left blank, + :param list scope: list of scopes to request in base oauth flow. If left blank, will default to all scopes for app :param str include_granted_scopes: which scopes to include from previous grants From the following enum: @@ -414,7 +414,7 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session, Note: if this user has never linked the app, include_granted_scopes must be None """ super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale, - token_access_type, scope_list, + token_access_type, scope, include_granted_scopes) self.redirect_uri = redirect_uri self.session = session @@ -450,7 +450,7 @@ def start(self, url_state=None): self.session[self.csrf_token_session_key] = csrf_token return self._get_authorize_url(self.redirect_uri, state, self.token_access_type, - self.scope_list, self.include_granted_scopes) + self.scope, self.include_granted_scopes) def finish(self, query_params): """ diff --git a/example/commandline-oauth.py b/example/commandline-oauth.py index 2560960e..83ad8779 100644 --- a/example/commandline-oauth.py +++ b/example/commandline-oauth.py @@ -6,8 +6,7 @@ APP_KEY = "" APP_SECRET = "" -auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET, scope_list=['files.metadata.read'], - token_access_type='offline') +auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET) authorize_url = auth_flow.start() print("1. Go to: " + authorize_url) @@ -21,6 +20,6 @@ except Exception as e: print('Error: %s' % (e,)) -dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, +dbx = dropbox.Dropbox(oauth2_access_token=oauth_result.access_token, app_key=APP_KEY, app_secret=APP_SECRET) dbx.users_get_current_account() diff --git a/test/test_dropbox.py b/test/test_dropbox.py index 4fa1ffd0..f233dd9e 100644 --- a/test/test_dropbox.py +++ b/test/test_dropbox.py @@ -123,7 +123,7 @@ def test_refresh(self, dbx): @refresh_dbx_from_env def test_downscope(self, dbx): dbx.users_get_current_account() - dbx.refresh_access_token(scope_list=['files.metadata.read']) + dbx.refresh_access_token(scope=['files.metadata.read']) with self.assertRaises(AuthError): # Should fail because downscoped to not include needed scope dbx.users_get_current_account() diff --git a/test/test_dropbox_unit.py b/test/test_dropbox_unit.py index 120a9691..a6e2a38a 100644 --- a/test/test_dropbox_unit.py +++ b/test/test_dropbox_unit.py @@ -7,7 +7,7 @@ # Tests OAuth Flow from dropbox import DropboxOAuth2Flow, session, Dropbox, create_session from dropbox.exceptions import AuthError -from dropbox.oauth import OAuth2FlowNoRedirectResult +from dropbox.oauth import OAuth2FlowNoRedirectResult, DropboxOAuth2FlowNoRedirect from datetime import datetime, timedelta APP_KEY = 'dummy_app_key' @@ -17,7 +17,7 @@ EXPIRES_IN = 14400 ACCOUNT_ID = 'dummy_account_id' USER_ID = 'dummy_user_id' -SCOPE_LIST = 'files.metadata' +SCOPE_LIST = ['files.metadata.read', 'files.metadata.write'] EXPIRATION = datetime.utcnow() + timedelta(seconds=EXPIRES_IN) EXPIRATION_BUFFER = timedelta(minutes=5) @@ -106,7 +106,7 @@ def test_OAuth2FlowNoRedirectResult_offline(self): assert abs(result_obj.expires_at - EXPIRATION) < EXPIRATION_BUFFER assert result_obj.account_id == ACCOUNT_ID assert result_obj.user_id == USER_ID - assert result_obj.scope_list == SCOPE_LIST + assert result_obj.scope == SCOPE_LIST def test_OAuth2FlowNoRedirectResult_online(self): # Test online result @@ -122,6 +122,46 @@ def test_OAuth2FlowNoRedirectResult_copy(self): REFRESH_TOKEN, EXPIRATION, SCOPE_LIST) assert result_obj.expires_at == EXPIRATION + @pytest.fixture(scope='function') + def auth_flow_offline_with_scopes(self, mocker): + auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET, token_access_type='offline', + scope=SCOPE_LIST) + session = mock.MagicMock() + post_response = mock.MagicMock(status_code=200) + post_response.json.return_value = {"access_token": ACCESS_TOKEN, "refresh_token": + REFRESH_TOKEN, "expires_in": EXPIRES_IN, "uid": USER_ID, "account_id": ACCOUNT_ID, + "scope": " ".join(SCOPE_LIST)} + mocker.patch.object(session, 'post', return_value=post_response) + auth_flow.requests_session = session + return auth_flow + + def test_NoRedirect_whole_flow(self, auth_flow_offline_with_scopes): + authorization_url = auth_flow_offline_with_scopes.start() + + assert authorization_url.startswith('https://{}/oauth2/authorize?' + .format(session.WEB_HOST)) + assert 'client_id={}'.format(APP_KEY) in authorization_url + assert 'response_type=code' in authorization_url + mycode = 'test oauth code' + auth_result = auth_flow_offline_with_scopes.finish(mycode) + assert auth_result.access_token == ACCESS_TOKEN + assert auth_result.refresh_token == REFRESH_TOKEN + assert abs(auth_result.expires_at - EXPIRATION) < EXPIRATION_BUFFER + assert auth_result.user_id == USER_ID + assert auth_result.account_id == ACCOUNT_ID + assert auth_result.scope == " ".join(SCOPE_LIST) + + auth_flow_offline_with_scopes.requests_session.post.assert_called_once() + token_call_args = auth_flow_offline_with_scopes.requests_session.post.call_args_list + assert len(token_call_args) == 1 + first_call_args = token_call_args[0]._get_call_arguments() + assert first_call_args[0][0] == 'https://{}/oauth2/token'.format(session.API_HOST) + call_data = first_call_args[1]['data'] + assert call_data['client_id'] == APP_KEY + assert call_data['grant_type'] == 'authorization_code' + assert call_data['client_secret'] == APP_SECRET + assert call_data['code'] == mycode + class TestClient: @pytest.fixture(scope='function') From f7bd6a48c95506ea1a2ee60b4d4f85b1fca13714 Mon Sep 17 00:00:00 2001 From: Brad Rogers Date: Wed, 1 Apr 2020 09:30:46 -0700 Subject: [PATCH 3/3] Address final comments --- dropbox/dropbox.py | 4 ++-- dropbox/oauth.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index 834bca10..3b15acad 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -183,7 +183,7 @@ def __init__(self, "app_key and app_secret are required to refresh tokens" if scope is not None: - assert not scope or isinstance(scope, list), \ + assert len(scope) > 0 and isinstance(scope, list), \ "Scope list must be of type list" self._oauth2_access_token = oauth2_access_token @@ -364,7 +364,7 @@ def refresh_access_token(self, host=API_HOST, scope=None): """ if scope is not None: - assert not scope or isinstance(scope, list), \ + assert len(scope) > 0 and isinstance(scope, list), \ "Scope list must be of type list" if not (self._oauth2_refresh_token and self._app_key and self._app_secret): diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 50ee4203..974a5e1b 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -123,7 +123,7 @@ class DropboxOAuth2FlowBase(object): def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy', scope=None, include_granted_scopes=None): if scope is not None: - assert not scope or isinstance(scope, list), \ + assert len(scope) > 0 and isinstance(scope, list), \ "Scope list must be of type list" self.consumer_key = consumer_key