Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions dropbox/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,22 @@ def __init__(self,
:param datetime oauth2_access_token_expiration: Expiration for oauth2_access_token
:param str app_key: application key of requesting application; used for token refresh
:param str app_secret: application secret of requesting application; used for token refresh
Not required if PKCE was used to authorize the token
:param list scope: list of scopes to request on refresh. If left blank,
refresh will request all available scopes for application
"""

assert oauth2_access_token or oauth2_refresh_token, \
'OAuth2 access token or refresh token must be set'
if not (oauth2_access_token or oauth2_refresh_token):
raise BadInputException('OAuth2 access token or refresh token must be set')

assert headers is None or isinstance(headers, dict), \
'Expected dict, got %r' % headers
if headers is not None and not isinstance(headers, dict):
raise BadInputException('Expected dict, got {}'.format(headers))

if oauth2_refresh_token:
assert app_key and app_secret, \
"app_key and app_secret are required to refresh tokens"
if oauth2_refresh_token and not app_key:
raise BadInputException("app_key is required to refresh tokens")

if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")

self._oauth2_access_token = oauth2_access_token
self._oauth2_refresh_token = oauth2_refresh_token
Expand All @@ -197,8 +196,9 @@ def __init__(self,
self._max_retries_on_error = max_retries_on_error
self._max_retries_on_rate_limit = max_retries_on_rate_limit
if session:
assert isinstance(session, requests.sessions.Session), \
'Expected requests.sessions.Session, got %r' % session
if not isinstance(session, requests.sessions.Session):
raise BadInputException('Expected requests.sessions.Session, got {}'
.format(session))
self._session = session
else:
self._session = create_session()
Expand Down Expand Up @@ -346,7 +346,7 @@ def check_and_refresh_access_token(self):
Checks if access token needs to be refreshed and refreshes if possible
:return:
"""
can_refresh = self._oauth2_refresh_token and self._app_key and self._app_secret
can_refresh = self._oauth2_refresh_token and self._app_key
needs_refresh = self._oauth2_access_token_expiration and \
(datetime.utcnow() + timedelta(seconds=TOKEN_EXPIRATION_BUFFER)) >= \
self._oauth2_access_token_expiration
Expand All @@ -363,22 +363,22 @@ def refresh_access_token(self, host=API_HOST, scope=None):
:return:
"""

if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")

if not (self._oauth2_refresh_token and self._app_key and self._app_secret):
if not (self._oauth2_refresh_token and self._app_key):
self._logger.warning('Unable to refresh access token without \
refresh token, app key, and app secret')
refresh token and app key')
return

self._logger.info('Refreshing access token.')
url = "https://{}/oauth2/token".format(host)
body = {'grant_type': 'refresh_token',
'refresh_token': self._oauth2_refresh_token,
'client_id': self._app_key,
'client_secret': self._app_secret,
}
if self._app_secret:
body['client_secret'] = self._app_secret
if scope:
scope = " ".join(scope)
body['scope'] = scope
Expand Down Expand Up @@ -719,3 +719,12 @@ def _get_dropbox_client_with_select_header(self, select_header_name, team_member
session=self._session,
headers=new_headers,
)

class BadInputException(Exception):
"""
Thrown if incorrect types/values are used

This should only ever be thrown during testing, app should have validation of input prior to
reaching this point
"""
pass
129 changes: 97 additions & 32 deletions dropbox/oauth.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import hashlib

__all__ = [
'BadRequestException',
'BadStateException',
Expand All @@ -14,6 +16,7 @@
import os
import six
import urllib
import re
from datetime import datetime, timedelta

from .session import (
Expand All @@ -31,6 +34,7 @@

TOKEN_ACCESS_TYPES = ['offline', 'online', 'legacy']
INCLUDE_GRANTED_SCOPES_TYPES = ['user', 'team']
PKCE_VERIFIER_LENGTH = 128

class OAuth2FlowNoRedirectResult(object):
"""
Expand Down Expand Up @@ -95,7 +99,12 @@ def __init__(self, access_token, account_id, user_id, url_state, refresh_token,
:meth:`DropboxOAuth2Flow.start`.
"""
super(OAuth2FlowResult, self).__init__(
access_token, account_id, user_id, refresh_token, expires_in, scope)
access_token=access_token,
account_id=account_id,
user_id=user_id,
refresh_token=refresh_token,
expires_in=expires_in,
scope=scope)
self.url_state = url_state

@classmethod
Expand All @@ -120,11 +129,17 @@ def __repr__(self):

class DropboxOAuth2FlowBase(object):

def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
if scope is not None:
assert len(scope) > 0 and isinstance(scope, list), \
"Scope list must be of type list"
def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None, use_pkce=False):
if scope is not None and (len(scope) == 0 or not isinstance(scope, list)):
raise BadInputException("Scope list must be of type list")
if token_access_type is not None and token_access_type not in TOKEN_ACCESS_TYPES:
raise BadInputException("Token access type must be from the following enum: {}".format(
TOKEN_ACCESS_TYPES))
if not (use_pkce or consumer_secret):
raise BadInputException("Must pass in either consumer secret or use PKCE")
if include_granted_scopes and not scope:
raise BadInputException("Must pass in scope to pass include_granted_scopes")

self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
Expand All @@ -134,8 +149,15 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type
self.scope = scope
self.include_granted_scopes = include_granted_scopes

if use_pkce:
self.code_verifier = _generate_pkce_code_verifier()
self.code_challenge = _generate_pkce_code_challenge(self.code_verifier)
else:
self.code_verifier = None
self.code_challenge = None

def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None,
include_granted_scopes=None):
include_granted_scopes=None, code_challenge=None):
params = dict(response_type='code',
client_id=self.consumer_key)
if redirect_uri is not None:
Expand All @@ -146,22 +168,28 @@ def _get_authorize_url(self, redirect_uri, state, token_access_type, scope=None,
assert token_access_type in TOKEN_ACCESS_TYPES
if token_access_type != 'legacy':
params['token_access_type'] = token_access_type
if code_challenge:
params['code_challenge'] = code_challenge
params['code_challenge_method'] = 'S256'

if scope is not None:
params['scope'] = " ".join(scope)
if include_granted_scopes is not None:
assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES
params['include_granted_scopes'] = str(include_granted_scopes)
if include_granted_scopes is not None:
assert include_granted_scopes in INCLUDE_GRANTED_SCOPES_TYPES
params['include_granted_scopes'] = include_granted_scopes

return self.build_url('/oauth2/authorize', params, WEB_HOST)

def _finish(self, code, redirect_uri):
def _finish(self, code, redirect_uri, code_verifier):
url = self.build_url('/oauth2/token')
params = {'grant_type': 'authorization_code',
'code': code,
'client_id': self.consumer_key,
'client_secret': self.consumer_secret,
}
if code_verifier:
params['code_verifier'] = code_verifier
else:
params['client_secret'] = self.consumer_secret
if self.locale is not None:
params['locale'] = self.locale
if redirect_uri is not None:
Expand Down Expand Up @@ -273,9 +301,8 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase):
dbx = Dropbox(oauth_result.access_token)
"""

def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
# noqa: E501; pylint: disable=useless-super-delegation
def __init__(self, consumer_key, consumer_secret=None, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None, use_pkce=False): # noqa: E501;
"""
Construct an instance.

Expand All @@ -298,14 +325,18 @@ def __init__(self, consumer_key, consumer_secret, locale=None, token_access_type
user - include user scopes in the grant
team - include team scopes in the grant
Note: if this user has never linked the app, include_granted_scopes must be None
:param bool use_pkce: Whether or not to use Sha256 based PKCE. PKCE should be only use on
client apps which doesn't call your server. It is less secure than non-PKCE flow but
can be used if you are unable to safely retrieve your app secret
"""
super(DropboxOAuth2FlowNoRedirect, self).__init__(
consumer_key,
consumer_secret,
locale,
token_access_type,
scope,
include_granted_scopes,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
locale=locale,
token_access_type=token_access_type,
scope=scope,
include_granted_scopes=include_granted_scopes,
use_pkce=use_pkce,
)

def start(self):
Expand All @@ -317,8 +348,10 @@ def start(self):
access the user's Dropbox account. Tell the user to visit this URL
and approve your app.
"""
return self._get_authorize_url(None, None, self.token_access_type, self.scope,
self.include_granted_scopes)
return self._get_authorize_url(None, None, self.token_access_type,
scope=self.scope,
include_granted_scopes=self.include_granted_scopes,
code_challenge=self.code_challenge)

def finish(self, code):
"""
Expand All @@ -331,7 +364,7 @@ def finish(self, code):
:rtype: OAuth2FlowNoRedirectResult
:raises: The same exceptions as :meth:`DropboxOAuth2Flow.finish()`.
"""
return self._finish(code, None)
return self._finish(code, None, self.code_verifier)


class DropboxOAuth2Flow(DropboxOAuth2FlowBase):
Expand Down Expand Up @@ -379,9 +412,10 @@ def dropbox_auth_finish(web_app_session, request):

"""

def __init__(self, consumer_key, consumer_secret, redirect_uri, session,
csrf_token_session_key, locale=None, token_access_type='legacy',
scope=None, include_granted_scopes=None):
def __init__(self, consumer_key, redirect_uri, session,
csrf_token_session_key, consumer_secret=None, locale=None,
token_access_type='legacy', scope=None,
include_granted_scopes=None, use_pkce=False):
"""
Construct an instance.

Expand Down Expand Up @@ -412,10 +446,16 @@ def __init__(self, consumer_key, consumer_secret, redirect_uri, session,
user - include user scopes in the grant
team - include team scopes in the grant
Note: if this user has never linked the app, include_granted_scopes must be None
:param bool use_pkce: Whether or not to use Sha256 based PKCE
"""
super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale,
token_access_type, scope,
include_granted_scopes)
super(DropboxOAuth2Flow, self).__init__(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
locale=locale,
token_access_type=token_access_type,
scope=scope,
include_granted_scopes=include_granted_scopes,
use_pkce=use_pkce)
self.redirect_uri = redirect_uri
self.session = session
self.csrf_token_session_key = csrf_token_session_key
Expand Down Expand Up @@ -450,7 +490,9 @@ def start(self, url_state=None):
self.session[self.csrf_token_session_key] = csrf_token

return self._get_authorize_url(self.redirect_uri, state, self.token_access_type,
self.scope, self.include_granted_scopes)
scope=self.scope,
include_granted_scopes=self.include_granted_scopes,
code_challenge=self.code_challenge)

def finish(self, query_params):
"""
Expand Down Expand Up @@ -534,7 +576,7 @@ def finish(self, query_params):

# If everything went ok, make the network call to get an access token.

no_redirect_result = self._finish(code, self.redirect_uri)
no_redirect_result = self._finish(code, self.redirect_uri, self.code_verifier)
return OAuth2FlowResult.from_no_redirect_result(
no_redirect_result, url_state)

Expand Down Expand Up @@ -588,6 +630,16 @@ class ProviderException(Exception):
pass


class BadInputException(Exception):
"""
Thrown if incorrect types/values are used

This should only ever be thrown during testing, app should have validation of input prior to
reaching this point
"""
pass


def _safe_equals(a, b):
if len(a) != len(b):
return False
Expand Down Expand Up @@ -616,3 +668,16 @@ def encode(o):

utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
return url_encode(utf8_params)

def _generate_pkce_code_verifier():
code_verifier = base64.urlsafe_b64encode(os.urandom(PKCE_VERIFIER_LENGTH)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
if len(code_verifier) > PKCE_VERIFIER_LENGTH:
code_verifier = code_verifier[:128]
return code_verifier

def _generate_pkce_code_challenge(code_verifier):
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
return code_challenge
29 changes: 29 additions & 0 deletions example/oauth/commandline-oauth-pkce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3

import dropbox
from dropbox import DropboxOAuth2FlowNoRedirect

'''
This example uses PKCE, a currently beta feature.
If you are interested in using this, please contact
Dropbox support
'''
APP_KEY = ""

auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, pkce_method='S256', token_access_type='offline')

authorize_url = auth_flow.start()
print("1. Go to: " + authorize_url)
print("2. Click \"Allow\" (you might have to log in first).")
print("3. Copy the authorization code.")
auth_code = input("Enter the authorization code here: ").strip()

try:
oauth_result = auth_flow.finish(auth_code)
print(oauth_result)
except Exception as e:
print('Error: %s' % (e,))
exit(1)

dbx = dropbox.Dropbox(oauth2_refresh_token=oauth_result.refresh_token, app_key=APP_KEY)
dbx.users_get_current_account()
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
print(oauth_result)
except Exception as e:
print('Error: %s' % (e,))
exit(1)

dbx = dropbox.Dropbox(oauth2_access_token=oauth_result.access_token,
app_key=APP_KEY, app_secret=APP_SECRET)
Expand Down
Loading