From 3f109f5a88badb684d39a4eb1909abedafa603f8 Mon Sep 17 00:00:00 2001 From: Matt Bogosian Date: Mon, 6 Mar 2017 14:35:45 -0800 Subject: [PATCH] Remove support for obsolete API v1 --- README.rst | 14 - dropbox/__init__.py | 3 - dropbox/client.py | 1668 ------------------------------------------ dropbox/dropbox.py | 11 +- dropbox/oauth.py | 8 +- dropbox/rest.py | 426 ----------- dropbox/session.py | 356 +-------- test/test_dropbox.py | 491 ------------- 8 files changed, 24 insertions(+), 2953 deletions(-) delete mode 100644 dropbox/client.py delete mode 100644 dropbox/rest.py diff --git a/README.rst b/README.rst index f5499af7..70740b94 100644 --- a/README.rst +++ b/README.rst @@ -108,14 +108,6 @@ Now, run the included script: $ ./generate_base_client.py -Upgrading from v1 ------------------ - -To ease the transition to the new API and SDK, you can still use the old -``dropbox.client.DropboxClient`` class. In fact, v2 and v1 can be used -simultaneously. Support for the old client will be dropped once the new SDK is -at functional parity. - Testing ------- @@ -135,9 +127,3 @@ variable. .. code-block:: console $ DROPBOX_TOKEN=... DROPBOX_TEAM_TOKEN=... tox - -If you only want to test the API v2 client, use: - -.. code-block:: console - - $ DROPBOX_TOKEN=... DROPBOX_TEAM_TOKEN=... tox -- -k TestDropbox diff --git a/dropbox/__init__.py b/dropbox/__init__.py index 6262d15c..c830bc35 100644 --- a/dropbox/__init__.py +++ b/dropbox/__init__.py @@ -2,6 +2,3 @@ from .dropbox import __version__, Dropbox, DropboxTeam, create_session # noqa: F401 from .oauth import DropboxOAuth2Flow, DropboxOAuth2FlowNoRedirect # noqa: F401 - -# Compatibility with the deprecated v1 client. -from . import client, rest, session # noqa: F401 diff --git a/dropbox/client.py b/dropbox/client.py deleted file mode 100644 index fa5a2d92..00000000 --- a/dropbox/client.py +++ /dev/null @@ -1,1668 +0,0 @@ -""" -Deprecated v1 API client. To be removed once v2 is at parity with v1. -""" - -from __future__ import absolute_import - -import base64 -from io import BytesIO -import os -import re -import six -import urllib -import warnings - -if six.PY3: - basestring = str # pylint: disable=redefined-builtin,useless-suppression - url_path_quote = urllib.parse.quote # pylint: disable=no-member,useless-suppression -else: - url_path_quote = urllib.quote # pylint: disable=no-member,useless-suppression - -try: - import json -except ImportError: - import simplejson as json - -from .rest import ErrorResponse, RESTClient, params_to_urlencoded -from .session import BaseSession, DropboxSession, DropboxOAuth2Session - - -def format_path(path): - """Normalize path for use with the Dropbox API. - - This function turns multiple adjacent slashes into single - slashes, then ensures that there's a leading slash but - not a trailing slash. - """ - if not path: - return path - - path = re.sub(r'/+', '/', path) - - if path == '/': - return '' - else: - return '/' + path.strip('/') - - -class DropboxClient(object): - """ - This is a deprecated class for making calls to the Dropbox v1 API. Please - use the v2 API by instantiating :class:`dropbox.Dropbox` instead. - - All of the API call methods can raise a :class:`dropbox.rest.ErrorResponse` - exception if the server returns a non-200 or invalid HTTP response. Note - that a 401 return status at any point indicates that the access token - you're using is no longer valid and the user must be put through the OAuth - 2 authorization flow again. - """ - - def __init__(self, oauth2_access_token, locale=None, rest_client=None): - """Construct a ``DropboxClient`` instance. - - Parameters - oauth2_access_token - An OAuth 2 access token (string). For backwards compatibility this may - also be a DropboxSession object (see :meth:`create_oauth2_access_token()`). - locale - The locale of the user of your application. For example "en" or "en_US". - Some API calls return localized data and error messages; this setting - tells the server which locale to use. By default, the server uses "en_US". - rest_client - Optional :class:`dropbox.rest.RESTClient`-like object to use for making - requests. - """ - warnings.warn( - 'You are using a deprecated client. Please use the new v2 client ' - 'located at dropbox.Dropbox.', DeprecationWarning, stacklevel=2) - - if rest_client is None: - rest_client = RESTClient - if isinstance(oauth2_access_token, basestring): - if not _OAUTH2_ACCESS_TOKEN_PATTERN.match(oauth2_access_token): - raise ValueError("invalid format for oauth2_access_token: %r" - % (oauth2_access_token,)) - self.session = DropboxOAuth2Session(oauth2_access_token, locale) - elif isinstance(oauth2_access_token, DropboxSession): - # Backwards compatibility with OAuth 1 - if locale is not None: - raise ValueError("The 'locale' parameter to DropboxClient is only useful " - "when also passing in an OAuth 2 access token") - self.session = oauth2_access_token - else: - raise ValueError("'oauth2_access_token' must either be a string or a DropboxSession") - self.rest_client = rest_client - - def request(self, target, params=None, method='POST', - content_server=False, notification_server=False): - """ - An internal method that builds the url, headers, and params for a Dropbox API request. - It is exposed if you need to make API calls not implemented in this library or if you - need to debug requests. - - Parameters - target - The target URL with leading slash (e.g. '/files'). - params - A dictionary of parameters to add to the request. - method - An HTTP method (e.g. 'GET' or 'POST'). - content_server - A boolean indicating whether the request is to the - API content server, for example to fetch the contents of a file - rather than its metadata. - notification_server - A boolean indicating whether the request is to the API notification - server, for example for longpolling. - - Returns - A tuple of ``(url, params, headers)`` that should be used to make the request. - OAuth will be added as needed within these fields. - """ - assert method in ['GET', 'POST', 'PUT'], "Only 'GET', 'POST', and 'PUT' are allowed." - assert not (content_server and notification_server), \ - "Cannot construct request simultaneously for content and notification servers." - - if params is None: - params = {} - - if content_server: - host = self.session.API_CONTENT_HOST - elif notification_server: - host = self.session.API_NOTIFICATION_HOST - else: - host = self.session.API_HOST - - headers, params = self.session.build_access_headers(params) - - if method in ('GET', 'PUT'): - url = self.session.build_url(host, target, params) - else: - url = self.session.build_url(host, target) - - return url, params, headers - - def account_info(self): - """Retrieve information about the user's account. - - Returns - A dictionary containing account information. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#account-info - """ - url, _, headers = self.request("/account/info", method='GET') - return self.rest_client.GET(url, headers) - - def disable_access_token(self): - """ - Disable the access token that this ``DropboxClient`` is using. If this call - succeeds, further API calls using this object will fail. - """ - url, params, headers = self.request("/disable_access_token", method='POST') - - return self.rest_client.POST(url, params, headers) - - def create_oauth2_access_token(self): - """ - If this ``DropboxClient`` was created with an OAuth 1 access token, this method - can be used to create an equivalent OAuth 2 access token. This can be used to - upgrade your app's existing access tokens from OAuth 1 to OAuth 2. - - Example:: - - from dropbox.client import DropboxClient - from dropbox.session import DropboxSession - session = DropboxSession(APP_KEY, APP_SECRET) - access_key, access_secret = \ - '123abc', 'xyz456' # Previously obtained OAuth 1 credentials - session.set_token(access_key, access_secret) - client = DropboxClient(session) - token = client.create_oauth2_access_token() - # Optionally, create a new client using the new token - new_client = DropboxClient(token) - """ - if not isinstance(self.session, DropboxSession): - raise ValueError("This call requires a DropboxClient that is configured with an " - "OAuth 1 access token.") - url, params, headers = self.request("/oauth2/token_from_oauth1", method='POST') - - r = self.rest_client.POST(url, params, headers) - return r['access_token'] - - def get_chunked_uploader(self, file_obj, length): - """Creates a :class:`ChunkedUploader` to upload the given file-like object. - - Parameters - file_obj - The file-like object which is the source of the data - being uploaded. - length - The number of bytes to upload. - - The expected use of this function is as follows:: - - bigFile = open("data.txt", 'rb') - - uploader = myclient.get_chunked_uploader(bigFile, size) - print "uploading: ", size - while uploader.offset < size: - try: - upload = uploader.upload_chunked() - except rest.ErrorResponse, e: - # perform error handling and retry logic - uploader.finish('/bigFile.txt') - - The SDK leaves the error handling and retry logic to the developer - to implement, as the exact requirements will depend on the application - involved. - """ - return ChunkedUploader(self, file_obj, length) - - def upload_chunk(self, file_obj, length=None, offset=0, # pylint: disable=unused-argument - upload_id=None): - """Uploads a single chunk of data from a string or file-like object. The majority of users - should use the :class:`ChunkedUploader` object, which provides a simpler interface to the - chunked_upload API endpoint. - - Parameters - file_obj - The source of the chunk to upload; a file-like object or a string. - length - This argument is ignored but still present for backward compatibility reasons. - offset - The byte offset to which this source data corresponds in the original file. - upload_id - The upload identifier for which this chunk should be uploaded, - returned by a previous call, or None to start a new upload. - - Returns - A dictionary containing the keys: - - upload_id - A string used to identify the upload for subsequent calls to :meth:`upload_chunk()` - and :meth:`commit_chunked_upload()`. - offset - The offset at which the next upload should be applied. - expires - The time after which this partial upload is invalid. - """ - - params = dict() - - if upload_id: - params['upload_id'] = upload_id - params['offset'] = offset - - url, _, headers = self.request("/chunked_upload", params, - method='PUT', content_server=True) - - try: - reply = self.rest_client.PUT(url, file_obj, headers) - return reply['offset'], reply['upload_id'] - except ErrorResponse as e: - raise e - - def commit_chunked_upload(self, full_path, upload_id, overwrite=False, parent_rev=None): - """Commit the previously uploaded chunks for the given path. - - Parameters - full_path - The full path to which the chunks are uploaded, *including the file name*. - If the destination folder does not yet exist, it will be created. - upload_id - The chunked upload identifier, previously returned from upload_chunk. - overwrite - Whether to overwrite an existing file at the given path. (Default ``False``.) - If overwrite is False and a file already exists there, Dropbox - will rename the upload to make sure it doesn't overwrite anything. - You need to check the metadata returned for the new name. - This field should only be True if your intent is to potentially - clobber changes to a file that you don't know about. - parent_rev - Optional rev field from the 'parent' of this upload. - If your intent is to update the file at the given path, you should - pass the parent_rev parameter set to the rev value from the most recent - metadata you have of the existing file at that path. If the server - has a more recent version of the file at the specified path, it will - automatically rename your uploaded file, spinning off a conflict. - Using this parameter effectively causes the overwrite parameter to be ignored. - The file will always be overwritten if you send the most recent parent_rev, - and it will never be overwritten if you send a less recent one. - - Returns - A dictionary containing the metadata of the newly committed file. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#commit-chunked-upload - """ - - params = { - 'upload_id': upload_id, - 'overwrite': overwrite, - } - - if parent_rev is not None: - params['parent_rev'] = parent_rev - - url, params, headers = self.request("/commit_chunked_upload/%s" % full_path, - params, content_server=True) - - return self.rest_client.POST(url, params, headers) - - def put_file(self, full_path, file_obj, overwrite=False, parent_rev=None): - """Upload a file. - - A typical use case would be as follows:: - - f = open('working-draft.txt', 'rb') - response = client.put_file('/magnum-opus.txt', f) - print "uploaded:", response - - which would return the metadata of the uploaded file, similar to:: - - { - 'bytes': 77, - 'icon': 'page_white_text', - 'is_dir': False, - 'mime_type': 'text/plain', - 'modified': 'Wed, 20 Jul 2011 22:04:50 +0000', - 'path': '/magnum-opus.txt', - 'rev': '362e2029684fe', - 'revision': 221922, - 'root': 'dropbox', - 'size': '77 bytes', - 'thumb_exists': False - } - - Parameters - full_path - The full path to upload the file to, *including the file name*. - If the destination folder does not yet exist, it will be created. - file_obj - A file-like object to upload. If you would like, you can pass a string as file_obj. - overwrite - Whether to overwrite an existing file at the given path. (Default ``False``.) - If overwrite is False and a file already exists there, Dropbox - will rename the upload to make sure it doesn't overwrite anything. - You need to check the metadata returned for the new name. - This field should only be True if your intent is to potentially - clobber changes to a file that you don't know about. - parent_rev - Optional rev field from the 'parent' of this upload. - If your intent is to update the file at the given path, you should - pass the parent_rev parameter set to the rev value from the most recent - metadata you have of the existing file at that path. If the server - has a more recent version of the file at the specified path, it will - automatically rename your uploaded file, spinning off a conflict. - Using this parameter effectively causes the overwrite parameter to be ignored. - The file will always be overwritten if you send the most recent parent_rev, - and it will never be overwritten if you send a less recent one. - - Returns - A dictionary containing the metadata of the newly uploaded file. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#files-put - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 503: User over quota. - """ - path = "/files_put/%s%s" % (self.session.root, format_path(full_path)) - - params = { - 'overwrite': bool(overwrite), - } - - if parent_rev is not None: - params['parent_rev'] = parent_rev - - url, params, headers = self.request(path, params, method='PUT', content_server=True) - - return self.rest_client.PUT(url, file_obj, headers) - - def get_file(self, from_path, rev=None, start=None, length=None): - """Download a file. - - Example:: - - out = open('magnum-opus.txt', 'wb') - with client.get_file('/magnum-opus.txt') as f: - out.write(f.read()) - - which would download the file ``magnum-opus.txt`` and write the contents into - the file ``magnum-opus.txt`` on the local filesystem. - - Parameters - from_path - The path to the file to be downloaded. - rev - Optional previous rev value of the file to be downloaded. - start - Optional byte value from which to start downloading. - length - Optional length in bytes for partially downloading the file. If ``length`` is - specified but ``start`` is not, then the last ``length`` bytes will be downloaded. - Returns - A :class:`dropbox.rest.RESTResponse` that is the HTTP response for - the API request. It is a file-like object that can be read from. You - must call ``close()`` when you're done. - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at the given path, or the file that was there was deleted. - - 200: Request was okay but response was malformed in some way. - """ - path = "/files/%s%s" % (self.session.root, format_path(from_path)) - - params = {} - if rev is not None: - params['rev'] = rev - - url, params, headers = self.request(path, params, method='GET', content_server=True) - if start is not None: - if length: - headers['Range'] = 'bytes=%s-%s' % (start, start + length - 1) - else: - headers['Range'] = 'bytes=%s-' % start - elif length is not None: - headers['Range'] = 'bytes=-%s' % length - return self.rest_client.request("GET", url, headers=headers, raw_response=True) - - def get_file_and_metadata(self, from_path, rev=None): - """Download a file alongwith its metadata. - - Acts as a thin wrapper around get_file() (see :meth:`get_file()` comments for - more details) - - A typical usage looks like this:: - - out = open('magnum-opus.txt', 'wb') - f, metadata = client.get_file_and_metadata('/magnum-opus.txt') - with f: - out.write(f.read()) - - Parameters - from_path - The path to the file to be downloaded. - rev - Optional previous rev value of the file to be downloaded. - - Returns - A pair of ``(response, metadata)``: - - response - A :class:`dropbox.rest.RESTResponse` that is the HTTP response for - the API request. It is a file-like object that can be read from. You - must call ``close()`` when you're done. - metadata - A dictionary containing the metadata of the file (see - https://www.dropbox.com/developers/core/docs#metadata for details). - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at the given path, or the file that was there was deleted. - - 200: Request was okay but response was malformed in some way. - """ - file_res = self.get_file(from_path, rev) - metadata = DropboxClient.__parse_metadata_as_dict(file_res) - - return file_res, metadata - - @staticmethod - def __parse_metadata_as_dict(dropbox_raw_response): - # Parses file metadata from a raw dropbox HTTP response, raising a - # dropbox.rest.ErrorResponse if parsing fails. - metadata = None - for header, header_val in dropbox_raw_response.getheaders().items(): - if header.lower() == 'x-dropbox-metadata': - try: - metadata = json.loads(header_val) - except ValueError: - raise ErrorResponse(dropbox_raw_response, '') - if not metadata: - raise ErrorResponse(dropbox_raw_response, '') - return metadata - - def delta(self, cursor=None, path_prefix=None, include_media_info=False): - """A way of letting you keep up with changes to files and folders in a - user's Dropbox. You can periodically call delta() to get a list of "delta - entries", which are instructions on how to update your local state to - match the server's state. - - Parameters - cursor - On the first call, omit this argument (or pass in ``None``). On - subsequent calls, pass in the ``cursor`` string returned by the previous - call. - path_prefix - If provided, results will be limited to files and folders - whose paths are equal to or under ``path_prefix``. The ``path_prefix`` is - fixed for a given cursor. Whatever ``path_prefix`` you use on the first - ``delta()`` must also be passed in on subsequent calls that use the returned - cursor. - include_media_info - If True, delta will return additional media info for photos and videos - (the time a photo was taken, the GPS coordinates of a photo, etc.). There - is a delay between when a file is uploaded to Dropbox and when this - information is available; delta will only include a file in the changelist - once its media info is ready. The value you use on the first ``delta()`` must - also be passed in on subsequent calls that use the returned cursor. - - Returns - A dict with four keys: - - entries - A list of "delta entries" (described below). - reset - If ``True``, you should your local state to be an empty folder - before processing the list of delta entries. This is only ``True`` only - in rare situations. - cursor - A string that is used to keep track of your current state. - On the next call to delta(), pass in this value to return entries - that were recorded since the cursor was returned. - has_more - If ``True``, then there are more entries available; you can - call delta() again immediately to retrieve those entries. If ``False``, - then wait at least 5 minutes (preferably longer) before checking again. - - Delta Entries: Each entry is a 2-item list of one of following forms: - - - [*path*, *metadata*]: Indicates that there is a file/folder at the given - path. You should add the entry to your local path. (The *metadata* - value is the same as what would be returned by the ``metadata()`` call.) - - - If the new entry includes parent folders that don't yet exist in your - local state, create those parent folders in your local state. You - will eventually get entries for those parent folders. - - If the new entry is a file, replace whatever your local state has at - *path* with the new entry. - - If the new entry is a folder, check what your local state has at - *path*. If it's a file, replace it with the new entry. If it's a - folder, apply the new *metadata* to the folder, but do not modify - the folder's children. - - [*path*, ``None``]: Indicates that there is no file/folder at the *path* on - Dropbox. To update your local state to match, delete whatever is at *path*, - including any children (you will sometimes also get "delete" delta entries - for the children, but this is not guaranteed). If your local state doesn't - have anything at *path*, ignore this entry. - - Remember: Dropbox treats file names in a case-insensitive but case-preserving - way. To facilitate this, the *path* strings above are lower-cased versions of - the actual path. The *metadata* dicts have the original, case-preserved path. - """ - path = "/delta" - - params = {'include_media_info': include_media_info} - if cursor is not None: - params['cursor'] = cursor - if path_prefix is not None: - params['path_prefix'] = path_prefix - - url, params, headers = self.request(path, params) - - return self.rest_client.POST(url, params, headers) - - def longpoll_delta(self, cursor, timeout=None): - """A long-poll endpoint to wait for changes on an account. In conjunction with - :meth:`delta()`, this call gives you a low-latency way to monitor an account for - file changes. - - Note that this call goes to ``api-notify.dropbox.com`` instead of ``api.dropbox.com``. - - Unlike most other API endpoints, this call does not require OAuth authentication. - The passed-in cursor can only be acquired via an authenticated call to :meth:`delta()`. - - Parameters - cursor - A delta cursor as returned from a call to :meth:`delta()`. Note that a cursor - returned from a call to :meth:`delta()` with ``include_media_info=True`` is - incompatible with ``longpoll_delta()`` and an error will be returned. - timeout - An optional integer indicating a timeout, in seconds. The default value is - 30 seconds, which is also the minimum allowed value. The maximum is 480 - seconds. The request will block for at most this length of time, plus up - to 90 seconds of random jitter added to avoid the thundering herd problem. - Care should be taken when using this parameter, as some network - infrastructure does not support long timeouts. - - Returns - The connection will block until there are changes available or a timeout occurs. - The response will be a dictionary that looks like the following example:: - - {"changes": false, "backoff": 60} - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#longpoll-delta - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (generally due to an invalid parameter; check e.error for details). - """ - path = "/longpoll_delta" - - params = {'cursor': cursor} - if timeout is not None: - params['timeout'] = timeout - - url, params, headers = self.request(path, params, method='GET', notification_server=True) - - return self.rest_client.GET(url, headers) - - def create_copy_ref(self, from_path): - """Creates and returns a copy ref for a specific file. The copy ref can be - used to instantly copy that file to the Dropbox of another account. - - Parameters - path - The path to the file for a copy ref to be created on. - - Returns - A dictionary that looks like the following example:: - - {"expires": "Fri, 31 Jan 2042 21:01:05 +0000", "copy_ref": "z1X6ATl6aWtzOGq0c3g5Ng"} - - """ - path = "/copy_ref/%s%s" % (self.session.root, format_path(from_path)) - - url, _, headers = self.request(path, {}, method='GET') - - return self.rest_client.GET(url, headers) - - def add_copy_ref(self, copy_ref, to_path): - """Adds the file referenced by the copy ref to the specified path - - Parameters - copy_ref - A copy ref string that was returned from a create_copy_ref call. - The copy_ref can be created from any other Dropbox account, or from the same account. - path - The path to where the file will be created. - - Returns - A dictionary containing the metadata of the new copy of the file. - """ - path = "/fileops/copy" - - params = {'from_copy_ref': copy_ref, - 'to_path': format_path(to_path), - 'root': self.session.root} - - url, params, headers = self.request(path, params) - - return self.rest_client.POST(url, params, headers) - - def file_copy(self, from_path, to_path): - """Copy a file or folder to a new location. - - Parameters - from_path - The path to the file or folder to be copied. - to_path - The destination path of the file or folder to be copied. - This parameter should include the destination filename (e.g. - from_path: '/test.txt', to_path: '/dir/test.txt'). If there's - already a file at the to_path it will raise an ErrorResponse. - - Returns - A dictionary containing the metadata of the new copy of the file or folder. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#fileops-copy - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 403: An invalid copy operation was attempted - (e.g. there is already a file at the given destination, - or trying to copy a shared folder). - - 404: No file was found at given from_path. - - 503: User over storage quota. - """ - params = { - 'root': self.session.root, - 'from_path': format_path(from_path), - 'to_path': format_path(to_path), - } - - url, params, headers = self.request("/fileops/copy", params) - - return self.rest_client.POST(url, params, headers) - - def file_create_folder(self, path): - """Create a folder. - - Parameters - path - The path of the new folder. - - Returns - A dictionary containing the metadata of the newly created folder. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#fileops-create-folder - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 403: A folder at that path already exists. - """ - params = {'root': self.session.root, 'path': format_path(path)} - - url, params, headers = self.request("/fileops/create_folder", params) - - return self.rest_client.POST(url, params, headers) - - def file_delete(self, path): - """Delete a file or folder. - - Parameters - path - The path of the file or folder. - - Returns - A dictionary containing the metadata of the just deleted file. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#fileops-delete - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at the given path. - """ - params = {'root': self.session.root, 'path': format_path(path)} - - url, params, headers = self.request("/fileops/delete", params) - - return self.rest_client.POST(url, params, headers) - - def file_move(self, from_path, to_path): - """Move a file or folder to a new location. - - Parameters - from_path - The path to the file or folder to be moved. - to_path - The destination path of the file or folder to be moved. - This parameter should include the destination filename (e.g. if - ``from_path`` is ``'/test.txt'``, ``to_path`` might be - ``'/dir/test.txt'``). If there's already a file at the - ``to_path`` it will raise an ErrorResponse. - - Returns - A dictionary containing the metadata of the new copy of the file or folder. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#fileops-move - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 403: An invalid move operation was attempted - (e.g. there is already a file at the given destination, - or moving a shared folder into a shared folder). - - 404: No file was found at given from_path. - - 503: User over storage quota. - """ - params = {'root': self.session.root, - 'from_path': format_path(from_path), - 'to_path': format_path(to_path)} - - url, params, headers = self.request("/fileops/move", params) - - return self.rest_client.POST(url, params, headers) - - def metadata(self, path, list=True, file_limit=25000, # pylint: disable=redefined-builtin - hash=None, rev=None, include_deleted=False, # pylint: disable=redefined-builtin - include_media_info=False): - """Retrieve metadata for a file or folder. - - A typical use would be:: - - folder_metadata = client.metadata('/') - print "metadata:", folder_metadata - - which would return the metadata of the root folder. This - will look something like:: - - { - 'bytes': 0, - 'contents': [ - { - 'bytes': 0, - 'icon': 'folder', - 'is_dir': True, - 'modified': 'Thu, 25 Aug 2011 00:03:15 +0000', - 'path': '/Sample Folder', - 'rev': '803beb471', - 'revision': 8, - 'root': 'dropbox', - 'size': '0 bytes', - 'thumb_exists': False - }, - { - 'bytes': 77, - 'icon': 'page_white_text', - 'is_dir': False, - 'mime_type': 'text/plain', - 'modified': 'Wed, 20 Jul 2011 22:04:50 +0000', - 'path': '/magnum-opus.txt', - 'rev': '362e2029684fe', - 'revision': 221922, - 'root': 'dropbox', - 'size': '77 bytes', - 'thumb_exists': False - } - ], - 'hash': 'efdac89c4da886a9cece1927e6c22977', - 'icon': 'folder', - 'is_dir': True, - 'path': '/', - 'root': 'app_folder', - 'size': '0 bytes', - 'thumb_exists': False - } - - In this example, the root folder contains two things: ``Sample Folder``, - which is a folder, and ``/magnum-opus.txt``, which is a text file 77 bytes long - - Parameters - path - The path to the file or folder. - list - Whether to list all contained files (only applies when - path refers to a folder). - file_limit - The maximum number of file entries to return within - a folder. If the number of files in the folder exceeds this - limit, an exception is raised. The server will return at max - 25,000 files within a folder. - hash - Every folder listing has a hash parameter attached that - can then be passed back into this function later to save on - bandwidth. Rather than returning an unchanged folder's contents, - the server will instead return a 304. - rev - Optional revision of the file to retrieve the metadata for. - This parameter only applies for files. If omitted, you'll receive - the most recent revision metadata. - include_deleted - When listing contained files, include files that have been deleted. - include_media_info - If True, includes additional media info for photos and videos if - available (the time a photo was taken, the GPS coordinates of a photo, - etc.). - - Returns - A dictionary containing the metadata of the file or folder - (and contained files if appropriate). - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#metadata - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 304: Current folder hash matches hash parameters, so contents are unchanged. - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at given path. - - 406: Too many file entries to return. - """ - path = "/metadata/%s%s" % (self.session.root, format_path(path)) - - params = { - 'file_limit': file_limit, - 'list': 'true', - 'include_deleted': include_deleted, - 'include_media_info': include_media_info, - } - - if not list: - params['list'] = 'false' - if hash is not None: - params['hash'] = hash - if rev: - params['rev'] = rev - - url, params, headers = self.request(path, params, method='GET') - - return self.rest_client.GET(url, headers) - - def thumbnail(self, from_path, size='m', format='JPEG'): # pylint: disable=redefined-builtin - """Download a thumbnail for an image. - - Parameters - from_path - The path to the file to be thumbnailed. - size - A string specifying the desired thumbnail size. Currently - supported sizes: ``"xs"`` (32x32), ``"s"`` (64x64), ``"m"`` (128x128), - ``"l``" (640x480), ``"xl"`` (1024x768). - Check https://www.dropbox.com/developers/core/docs#thumbnails for - more details. - format - The image format the server should use for the returned - thumbnail data. Either ``"JPEG"`` or ``"PNG"``. - - Returns - A :class:`dropbox.rest.RESTResponse` that is the HTTP response for - the API request. It is a file-like object that can be read from. You - must call ``close()`` when you're done. - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at the given from_path, - or files of that type cannot be thumbnailed. - - 415: Image is invalid and cannot be thumbnailed. - """ - assert format in ['JPEG', 'PNG'], \ - "expected a thumbnail format of 'JPEG' or 'PNG', got %s" % format - - path = "/thumbnails/%s%s" % (self.session.root, format_path(from_path)) - url, _, headers = self.request(path, {'size': size, 'format': format}, - method='GET', content_server=True) - return self.rest_client.request("GET", url, headers=headers, raw_response=True) - - def thumbnail_and_metadata(self, from_path, size='m', format='JPEG'): # noqa: E501; pylint: disable=redefined-builtin - """Download a thumbnail for an image alongwith its metadata. - - Acts as a thin wrapper around thumbnail() (see :meth:`thumbnail()` comments for - more details) - - Parameters - from_path - The path to the file to be thumbnailed. - size - A string specifying the desired thumbnail size. See :meth:`thumbnail()` - for details. - format - The image format the server should use for the returned - thumbnail data. Either ``"JPEG"`` or ``"PNG"``. - - Returns - A pair of ``(response, metadata)``: - - response - A :class:`dropbox.rest.RESTResponse` that is the HTTP response for - the API request. It is a file-like object that can be read from. You - must call ``close()`` when you're done. - metadata - A dictionary containing the metadata of the file whose thumbnail - was downloaded (see https://www.dropbox.com/developers/core/docs#metadata - for details). - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No file was found at the given from_path, - or files of that type cannot be thumbnailed. - - 415: Image is invalid and cannot be thumbnailed. - - 200: Request was okay but response was malformed in some way. - """ - thumbnail_res = self.thumbnail(from_path, size, format) - metadata = DropboxClient.__parse_metadata_as_dict(thumbnail_res) - - return thumbnail_res, metadata - - def search(self, path, query, file_limit=1000, include_deleted=False): - """Search folder for filenames matching query. - - Parameters - path - The folder to search within. - query - The query to search on (minimum 3 characters). - file_limit - The maximum number of file entries to return within a folder. - The server will return at max 1,000 files. - include_deleted - Whether to include deleted files in search results. - - Returns - A list of the metadata of all matching files (up to - file_limit entries). For a detailed description of what - this call returns, visit: - https://www.dropbox.com/developers/core/docs#search - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - """ - path = "/search/%s%s" % (self.session.root, format_path(path)) - - params = { - 'query': query, - 'file_limit': file_limit, - 'include_deleted': include_deleted, - } - - url, params, headers = self.request(path, params) - - return self.rest_client.POST(url, params, headers) - - def revisions(self, path, rev_limit=1000): - """Retrieve revisions of a file. - - Parameters - path - The file to fetch revisions for. Note that revisions - are not available for folders. - rev_limit - The maximum number of file entries to return within - a folder. The server will return at max 1,000 revisions. - - Returns - A list of the metadata of all matching files (up to rev_limit entries). - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#revisions - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: No revisions were found at the given path. - """ - path = "/revisions/%s%s" % (self.session.root, format_path(path)) - - params = { - 'rev_limit': rev_limit, - } - - url, params, headers = self.request(path, params, method='GET') - - return self.rest_client.GET(url, headers) - - def restore(self, path, rev): - """Restore a file to a previous revision. - - Parameters - path - The file to restore. Note that folders can't be restored. - rev - A previous rev value of the file to be restored to. - - Returns - A dictionary containing the metadata of the newly restored file. - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#restore - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: Unable to find the file at the given revision. - """ - path = "/restore/%s%s" % (self.session.root, format_path(path)) - - params = { - 'rev': rev, - } - - url, params, headers = self.request(path, params) - - return self.rest_client.POST(url, params, headers) - - def media(self, path): - """Get a temporary unauthenticated URL for a media file. - - All of Dropbox's API methods require OAuth, which may cause problems in - situations where an application expects to be able to hit a URL multiple times - (for example, a media player seeking around a video file). This method - creates a time-limited URL that can be accessed without any authentication, - and returns that to you, along with an expiration time. - - Parameters - path - The file to return a URL for. Folders are not supported. - - Returns - A dictionary that looks like the following example:: - - {'url': 'https://dl.dropboxusercontent.com/1/view/abcdefghijk/example', - 'expires': 'Thu, 16 Sep 2011 01:01:25 +0000'} - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#media - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: Unable to find the file at the given path. - """ - path = "/media/%s%s" % (self.session.root, format_path(path)) - - url, _, headers = self.request(path, method='GET') - - return self.rest_client.GET(url, headers) - - def share(self, path, short_url=True): - """Create a shareable link to a file or folder. - - Shareable links created on Dropbox are time-limited, but don't require any - authentication, so they can be given out freely. The time limit should allow - at least a day of shareability, though users have the ability to disable - a link from their account if they like. - - Parameters - path - The file or folder to share. - - Returns - A dictionary that looks like the following example:: - - {'url': u'https://db.tt/c0mFuu1Y', 'expires': 'Tue, 01 Jan 2030 00:00:00 +0000'} - - For a detailed description of what this call returns, visit: - https://www.dropbox.com/developers/core/docs#shares - - Raises - A :class:`dropbox.rest.ErrorResponse` with an HTTP status of: - - - 400: Bad request (may be due to many things; check e.error for details). - - 404: Unable to find the file at the given path. - """ - path = "/shares/%s%s" % (self.session.root, format_path(path)) - - params = { - 'short_url': short_url, - } - - url, params, headers = self.request(path, params, method='GET') - - return self.rest_client.GET(url, headers) - - -class ChunkedUploader(object): - """Contains the logic around a chunked upload, which uploads a - large file to Dropbox via the /chunked_upload endpoint. - """ - - def __init__(self, client, file_obj, length): - self.client = client - self.offset = 0 - self.upload_id = None - - self.last_block = None - self.file_obj = file_obj - self.target_length = length - - def upload_chunked(self, chunk_size=4 * 1024 * 1024): - """Uploads data from this ChunkedUploader's file_obj in chunks, until - an error occurs. Throws an exception when an error occurs, and can - be called again to resume the upload. - - Parameters - chunk_size - The number of bytes to put in each chunk. (Default 4 MB.) - """ - - while self.offset < self.target_length: - next_chunk_size = min(chunk_size, self.target_length - self.offset) - if self.last_block is None: - self.last_block = self.file_obj.read(next_chunk_size) - - try: - (self.offset, self.upload_id) = self.client.upload_chunk( - BytesIO(self.last_block), next_chunk_size, self.offset, self.upload_id) - self.last_block = None - except ErrorResponse as e: - # Handle the case where the server tells us our offset is wrong. - must_reraise = True - if e.status == 400: - reply = e.body - if "offset" in reply and reply['offset'] != 0 and reply['offset'] > self.offset: - self.last_block = None - self.offset = reply['offset'] - must_reraise = False - if must_reraise: - raise - - def finish(self, path, overwrite=False, parent_rev=None): - """Commits the bytes uploaded by this ChunkedUploader to a file - in the users dropbox. - - Parameters - path - The full path of the file in the Dropbox. - overwrite - Whether to overwrite an existing file at the given path. (Default ``False``.) - If overwrite is False and a file already exists there, Dropbox - will rename the upload to make sure it doesn't overwrite anything. - You need to check the metadata returned for the new name. - This field should only be True if your intent is to potentially - clobber changes to a file that you don't know about. - parent_rev - Optional rev field from the 'parent' of this upload. - If your intent is to update the file at the given path, you should - pass the parent_rev parameter set to the rev value from the most recent - metadata you have of the existing file at that path. If the server - has a more recent version of the file at the specified path, it will - automatically rename your uploaded file, spinning off a conflict. - Using this parameter effectively causes the overwrite parameter to be ignored. - The file will always be overwritten if you send the most recent parent_rev, - and it will never be overwritten if you send a less recent one. - """ - - path = "/commit_chunked_upload/%s%s" % (self.client.session.root, format_path(path)) - - params = dict( - overwrite=bool(overwrite), - upload_id=self.upload_id, - ) - - if parent_rev is not None: - params['parent_rev'] = parent_rev - - url, params, headers = self.client.request(path, params, content_server=True) - - return self.client.rest_client.POST(url, params, headers) - - -# Allow access of ChunkedUploader via DropboxClient for backwards compatibility. -DropboxClient.ChunkedUploader = ChunkedUploader - - -class DropboxOAuth2FlowBase(object): - - def __init__(self, consumer_key, consumer_secret, locale=None, rest_client=RESTClient): - self.consumer_key = consumer_key - self.consumer_secret = consumer_secret - self.locale = locale - self.rest_client = rest_client - - def _get_authorize_url(self, redirect_uri, state): - params = dict(response_type='code', - client_id=self.consumer_key) - if redirect_uri is not None: - params['redirect_uri'] = redirect_uri - if state is not None: - params['state'] = state - - return self.build_url(BaseSession.WEB_HOST, '/oauth2/authorize', params) - - def _finish(self, code, redirect_uri): - url = self.build_url(BaseSession.API_HOST, '/oauth2/token') - params = { - 'grant_type': 'authorization_code', - 'code': code, - 'client_id': self.consumer_key, - 'client_secret': self.consumer_secret, - } - if self.locale is not None: - params['locale'] = self.locale - if redirect_uri is not None: - params['redirect_uri'] = redirect_uri - - response = self.rest_client.POST(url, params=params) - access_token = response["access_token"] - user_id = response["uid"] - return access_token, user_id - - def build_path(self, target, params=None): - """Build the path component for an API URL. - - This method urlencodes the parameters, adds them - to the end of the target url, and puts a marker for the API - version in front. - - Parameters - target - A target url (e.g. '/files') to build upon. - params - Optional dictionary of parameters (name to value). - - Returns - The path and parameters components of an API URL. - """ - if six.PY2 and isinstance(target, six.text_type): - target = target.encode("utf8") - - target_path = url_path_quote(target) - - params = params or {} - params = params.copy() - - if self.locale: - params['locale'] = self.locale - - if params: - query_string = params_to_urlencoded(params) - return "/%s%s?%s" % (BaseSession.API_VERSION, target_path, query_string) - else: - return "/%s%s" % (BaseSession.API_VERSION, target_path) - - def build_url(self, host, target, params=None): - """Build an API URL. - - This method adds scheme and hostname to the path - returned from build_path. - - Parameters - target - A target url (e.g. '/files') to build upon. - params - Optional dictionary of parameters (name to value). - - Returns - The full API URL. - """ - return "https://%s%s" % (host, self.build_path(target, params)) - - -class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): - """ - OAuth 2 authorization helper for apps that can't provide a redirect URI - (such as the command-line example apps). - - Example:: - - from dropbox.client import DropboxOAuth2FlowNoRedirect, DropboxClient - from dropbox import rest as dbrest - - auth_flow = DropboxOAuth2FlowNoRedirect(APP_KEY, APP_SECRET) - - authorize_url = auth_flow.start() - print "1. Go to: " + authorize_url - print "2. Click \\"Allow\\" (you might have to log in first)." - print "3. Copy the authorization code." - auth_code = raw_input("Enter the authorization code here: ").strip() - - try: - access_token, user_id = auth_flow.finish(auth_code) - except dbrest.ErrorResponse, e: - print('Error: %s' % (e,)) - return - - c = DropboxClient(access_token) - """ - - def __init__(self, consumer_key, consumer_secret, locale=None, rest_client=None): - """ - Construct an instance. - - Parameters - consumer_key - Your API app's "app key" - consumer_secret - Your API app's "app secret" - locale - The locale of the user of your application. For example "en" or "en_US". - Some API calls return localized data and error messages; this setting - tells the server which locale to use. By default, the server uses "en_US". - rest_client - Optional :class:`dropbox.rest.RESTClient`-like object to use for making - requests. - """ - if rest_client is None: - rest_client = RESTClient - super(DropboxOAuth2FlowNoRedirect, self).__init__(consumer_key, consumer_secret, - locale, rest_client) - - def start(self): - """ - Starts the OAuth 2 authorization process. - - Returns - The URL for a page on Dropbox's website. This page will let the user "approve" - your app, which gives your app permission to access the user's Dropbox account. - Tell the user to visit this URL and approve your app. - """ - return self._get_authorize_url(None, None) - - def finish(self, code): - """ - If the user approves your app, they will be presented with an "authorization code". Have - the user copy/paste that authorization code into your app and then call this method to - get an access token. - - Parameters - code - The authorization code shown to the user when they approved your app. - - Returns - A pair of ``(access_token, user_id)``. ``access_token`` is a string that - can be passed to DropboxClient. ``user_id`` is the Dropbox user ID (string) of the - user that just approved your app. - - Raises - The same exceptions as :meth:`DropboxOAuth2Flow.finish()`. - """ - return self._finish(code, None) - - -class DropboxOAuth2Flow(DropboxOAuth2FlowBase): - """ - OAuth 2 authorization helper. Use this for web apps. - - OAuth 2 has a two-step authorization process. The first step is having the user authorize - your app. The second involves getting an OAuth 2 access token from Dropbox. - - Example:: - - from dropbox.client import DropboxOAuth2Flow, DropboxClient - - def get_dropbox_auth_flow(web_app_session): - redirect_uri = "https://my-web-server.org/dropbox-auth-finish" - return DropboxOAuth2Flow(APP_KEY, APP_SECRET, redirect_uri, - web_app_session, "dropbox-auth-csrf-token") - - # URL handler for /dropbox-auth-start - def dropbox_auth_start(web_app_session, request): - authorize_url = get_dropbox_auth_flow(web_app_session).start() - redirect_to(authorize_url) - - # URL handler for /dropbox-auth-finish - def dropbox_auth_finish(web_app_session, request): - try: - access_token, user_id, url_state = \\ - get_dropbox_auth_flow(web_app_session).finish(request.query_params) - except DropboxOAuth2Flow.BadRequestException, e: - http_status(400) - except DropboxOAuth2Flow.BadStateException, e: - # Start the auth flow again. - redirect_to("/dropbox-auth-start") - except DropboxOAuth2Flow.CsrfException, e: - http_status(403) - except DropboxOAuth2Flow.NotApprovedException, e: - flash('Not approved? Why not?') - return redirect_to("/home") - except DropboxOAuth2Flow.ProviderException, e: - logger.log("Auth error: %s" % (e,)) - http_status(403) - - """ - - def __init__(self, consumer_key, consumer_secret, redirect_uri, session, - csrf_token_session_key, locale=None, rest_client=None): - """ - Construct an instance. - - Parameters - consumer_key - Your API app's "app key". - consumer_secret - Your API app's "app secret". - redirect_uri - The URI that the Dropbox server will redirect the user to after the user - finishes authorizing your app. This URI must be HTTPS-based and pre-registered with - the Dropbox servers, though localhost URIs are allowed without pre-registration and can - be either HTTP or HTTPS. - session - A dict-like object that represents the current user's web session (will be - used to save the CSRF token). - csrf_token_session_key - The key to use when storing the CSRF token in the session (for - example: "dropbox-auth-csrf-token"). - locale - The locale of the user of your application. For example "en" or "en_US". - Some API calls return localized data and error messages; this setting - tells the server which locale to use. By default, the server uses "en_US". - rest_client - Optional :class:`dropbox.rest.RESTClient`-like object to use for making - requests. - """ - if rest_client is None: - rest_client = RESTClient - super(DropboxOAuth2Flow, self).__init__(consumer_key, consumer_secret, locale, rest_client) - self.redirect_uri = redirect_uri - self.session = session - self.csrf_token_session_key = csrf_token_session_key - - def start(self, url_state=None): - """ - Starts the OAuth 2 authorization process. - - This function builds an "authorization URL". You should redirect your user's browser to - this URL, which will give them an opportunity to grant your app access to their Dropbox - account. When the user completes this process, they will be automatically redirected to - the ``redirect_uri`` you passed in to the constructor. - - This function will also save a CSRF token to ``session[csrf_token_session_key]`` (as - provided to the constructor). This CSRF token will be checked on :meth:`finish()` to - prevent request forgery. - - Parameters - url_state - Any data that you would like to keep in the URL through the - authorization process. This exact value will be returned to you by :meth:`finish()`. - - Returns - The URL for a page on Dropbox's website. This page will let the user "approve" - your app, which gives your app permission to access the user's Dropbox account. - Tell the user to visit this URL and approve your app. - """ - csrf_token = base64.urlsafe_b64encode(os.urandom(16)) # PY3: Returns bytes - if not isinstance(csrf_token, str): - csrf_token = csrf_token.decode('utf-8') - state = csrf_token - if url_state is not None: - state += "|" + url_state - self.session[self.csrf_token_session_key] = csrf_token - - return self._get_authorize_url(self.redirect_uri, state) - - def finish(self, query_params): - """ - Call this after the user has visited the authorize URL (see :meth:`start()`), approved your - app and was redirected to your redirect URI. - - Parameters - query_params - The query parameters on the GET request to your redirect URI. - - Returns - A tuple of ``(access_token, user_id, url_state)``. ``access_token`` can be used to - construct a :class:`DropboxClient`. ``user_id`` is the Dropbox user ID (string) of the - user that just approved your app. ``url_state`` is the value you originally passed in to - :meth:`start()`. - - Raises - :class:`BadRequestException` - If the redirect URL was missing parameters or if the given parameters were not valid. - :class:`BadStateException` - If there's no CSRF token in the session. - :class:`CsrfException` - If the ``'state'`` query parameter doesn't contain the CSRF token from the user's - session. - :class:`NotApprovedException` - If the user chose not to approve your app. - :class:`ProviderException` - If Dropbox redirected to your redirect URI with some unexpected error identifier - and error message. - """ - # Check well-formedness of request. - - state = query_params.get('state') - if state is None: - raise self.BadRequestException("Missing query parameter 'state'.") - - error = query_params.get('error') - error_description = query_params.get('error_description') - code = query_params.get('code') - - if error is not None and code is not None: - raise self.BadRequestException("Query parameters 'code' and 'error' are both set; " - " only one must be set.") - if error is None and code is None: - raise self.BadRequestException("Neither query parameter 'code' or 'error' is set.") - - # Check CSRF token - - if self.csrf_token_session_key not in self.session: - raise self.BadStateException("Missing CSRF token in session.") - csrf_token_from_session = self.session[self.csrf_token_session_key] - if len(csrf_token_from_session) <= 20: - raise AssertionError("CSRF token unexpectedly short: %r" % (csrf_token_from_session,)) - - split_pos = state.find('|') - if split_pos < 0: - given_csrf_token = state - url_state = None - else: - given_csrf_token = state[0:split_pos] - url_state = state[split_pos + 1:] - - if not _safe_equals(csrf_token_from_session, given_csrf_token): - raise self.CsrfException("expected %r, got %r" % (csrf_token_from_session, - given_csrf_token)) - - del self.session[self.csrf_token_session_key] - - # Check for error identifier - - if error is not None: - if error == 'access_denied': - # The user clicked "Deny" - if error_description is None: - raise self.NotApprovedException("No additional description from Dropbox") - else: - raise self.NotApprovedException("Additional description from Dropbox: " + - error_description) - else: - # All other errors - full_message = error - if error_description is not None: - full_message += ": " + error_description - raise self.ProviderException(full_message) - - # If everything went ok, make the network call to get an access token. - - access_token, user_id = self._finish(code, self.redirect_uri) - return access_token, user_id, url_state - - class BadRequestException(Exception): - """ - Thrown if the redirect URL was missing parameters or if the - given parameters were not valid. - - The recommended action is to show an HTTP 400 error page. - """ - pass - - class BadStateException(Exception): - """ - Thrown if all the parameters are correct, but there's no CSRF token in the session. This - probably means that the session expired. - - The recommended action is to redirect the user's browser to try the approval process again. - """ - pass - - class CsrfException(Exception): - """ - Thrown if the given 'state' parameter doesn't contain the CSRF - token from the user's session. - This is blocked to prevent CSRF attacks. - - The recommended action is to respond with an HTTP 403 error page. - """ - pass - - class NotApprovedException(Exception): - """ - The user chose not to approve your app. - """ - pass - - class ProviderException(Exception): - """ - Dropbox redirected to your redirect URI with some unexpected error identifier and error - message. - - The recommended action is to log the error, tell the user something went wrong, and let - them try again. - """ - pass - - -def _safe_equals(a, b): - if len(a) != len(b): - return False - res = 0 - for ca, cb in zip(a, b): - res |= ord(ca) ^ ord(cb) - return res == 0 - -# From the "Bearer" token spec, RFC 6750. -_OAUTH2_ACCESS_TOKEN_PATTERN = re.compile(r'\A[-_~/A-Za-z0-9\.\+]+=*\Z') diff --git a/dropbox/dropbox.py b/dropbox/dropbox.py index bb0c9302..12f4c63f 100644 --- a/dropbox/dropbox.py +++ b/dropbox/dropbox.py @@ -12,10 +12,10 @@ import json import logging import random -import six import time import requests +import six from . import files, stone_serializers from .auth import ( @@ -33,8 +33,8 @@ RateLimitError, ) from .session import ( - API_CONTENT_HOST, API_HOST, + API_CONTENT_HOST, API_NOTIFICATION_HOST, HOST_API, HOST_CONTENT, @@ -42,7 +42,6 @@ pinned_session, ) - class RouteResult(object): """The successful result of a call to a route.""" @@ -62,7 +61,6 @@ def __init__(self, obj_result, http_resp=None): self.obj_result = obj_result self.http_resp = http_resp - class RouteErrorResult(object): """The error result of a call to a route.""" @@ -76,14 +74,12 @@ def __init__(self, request_id, obj_result): self.request_id = request_id self.obj_result = obj_result - def create_session(max_connections=8, proxies=None): """ Creates a session object that can be used by multiple :class:`Dropbox` and :class:`DropboxTeam` instances. This lets you share a connection pool amongst them, as well as proxy parameters. - :param int max_connections: Maximum connection pool size. :param dict proxies: See the `requests module `_ @@ -98,7 +94,6 @@ def create_session(max_connections=8, proxies=None): session.proxies = proxies return session - class _DropboxTransport(object): """ Responsible for implementing the wire protocol for making requests to the @@ -484,7 +479,6 @@ def _save_body_to_file(self, download_path, http_resp, chunksize=2**16): for c in http_resp.iter_content(chunksize): f.write(c) - class Dropbox(_DropboxTransport, DropboxBase): """ Use this class to make requests to the Dropbox API using a user's access @@ -493,7 +487,6 @@ class Dropbox(_DropboxTransport, DropboxBase): """ pass - class DropboxTeam(_DropboxTransport, DropboxTeamBase): """ Use this class to make requests to the Dropbox API using a team's access diff --git a/dropbox/oauth.py b/dropbox/oauth.py index 7749c1bb..ea699c29 100644 --- a/dropbox/oauth.py +++ b/dropbox/oauth.py @@ -39,10 +39,8 @@ def __init__(self, access_token, account_id, user_id): Args: access_token (str): Token to be used to authenticate later requests. - account_id (str): The Dropbox user's account ID. Please use this - instead of the user_id. - user_id (str): For backwards compatibility with API v1, please - avoid using this if possible. + account_id (str): The Dropbox user's account ID. + user_id (str): Deprecated (use account_id instead). """ self.access_token = access_token self.account_id = account_id @@ -197,7 +195,7 @@ class DropboxOAuth2FlowNoRedirect(DropboxOAuth2FlowBase): dbx = Dropbox(oauth_result.access_token) """ - def __init__(self, consumer_key, consumer_secret, locale=None): + def __init__(self, consumer_key, consumer_secret, locale=None): # noqa: E501; pylint: disable=useless-super-delegation """ Construct an instance. diff --git a/dropbox/rest.py b/dropbox/rest.py deleted file mode 100644 index 301bb7b1..00000000 --- a/dropbox/rest.py +++ /dev/null @@ -1,426 +0,0 @@ -""" -Deprecated: This is included only to support the use of the old v1 client -class. It will be removed once v2 is at parity with v1. Do not use this for any -new functionality. - -A simple JSON REST request abstraction layer that is used by the -``dropbox.client`` and ``dropbox.session`` modules. You shouldn't need to use -this. -""" - -from __future__ import absolute_import - -import io -import pkg_resources -import six -import socket -import ssl -import sys -import urllib - -try: - import json -except ImportError: - import simplejson as json - -try: - import urllib3 -except ImportError: - raise ImportError('Dropbox python client requires urllib3.') - -if six.PY3: - url_encode = urllib.parse.urlencode # pylint: disable=no-member,useless-suppression -else: - url_encode = urllib.urlencode # pylint: disable=no-member,useless-suppression - -TRUSTED_CERT_FILE = pkg_resources.resource_filename(__name__, 'trusted-certs.crt') - - -class RESTResponse(io.IOBase): - """ - Responses to requests can come in the form of ``RESTResponse``. These are - thin wrappers around the socket file descriptor. - :meth:`read()` and :meth:`close()` are implemented. - It is important to call :meth:`close()` to return the connection - back to the connection pool to be reused. If a connection - is not closed by the caller it may leak memory. The object makes a - best-effort attempt upon destruction to call :meth:`close()`, - but it's still best to explicitly call :meth:`close()`. - """ - - def __init__(self, resp): - super(RESTResponse, self).__init__() - # arg: A urllib3.HTTPResponse object - self.urllib3_response = resp - self.status = resp.status - self.version = resp.version - self.reason = resp.reason - self.strict = resp.strict - self.is_closed = False - - def __del__(self): - # Attempt to close when ref-count goes to zero. - self.close() - - def __exit__(self, typ, value, traceback): - # Allow this to be used in "with" blocks. - self.close() - - # ----------------- - # Important methods - # ----------------- - def read(self, amt=None): - """ - Read data off the underlying socket. - - Parameters - amt - Amount of data to read. Defaults to ``None``, indicating to read - everything. - - Returns - Data off the socket. If ``amt`` is not ``None``, at most ``amt`` bytes are returned. - An empty string when the socket has no data. - - Raises - ``ValueError`` - If the ``RESTResponse`` has already been closed. - """ - if self.is_closed: - raise ValueError('Response already closed') - return self.urllib3_response.read(amt) - - BLOCKSIZE = 4 * 1024 * 1024 # 4MB at a time just because - - def close(self): - """Closes the underlying socket.""" - - # Double closing is harmless - if self.is_closed: - return - - # Mark as closed and release the connection (exactly once) - self.is_closed = True - self.urllib3_response.release_conn() - - @property - def closed(self): - return self.is_closed - - # --------------------------------- - # Backwards compat for HTTPResponse - # --------------------------------- - def getheaders(self): - """Returns a dictionary of the response headers.""" - return self.urllib3_response.getheaders() - - def getheader(self, name, default=None): - """Returns a given response header.""" - return self.urllib3_response.getheader(name, default) - - # Some compat functions showed up recently in urllib3 - try: - urllib3.HTTPResponse.flush # pylint: disable=pointless-statement - urllib3.HTTPResponse.fileno # pylint: disable=pointless-statement - except AttributeError: - pass - else: - def fileno(self): - return self.urllib3_response.fileno() - def flush(self): - return self.urllib3_response.flush() - -def create_connection(address): - host, port = address - err = None - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, _, sa = res - sock = None - try: - sock = socket.socket(af, socktype, proto) - sock.connect(sa) - return sock - - except socket.error as e: - err = e - if sock is not None: - sock.close() - - if err is not None: - raise err # pylint: disable=raising-bad-type - else: - raise socket.error("getaddrinfo returns an empty list") - -def json_loadb(data): - if sys.version_info >= (3,): - data = data.decode('utf8') - return json.loads(data) - - -class RESTClientObject(object): - def __init__(self, max_reusable_connections=8, mock_urlopen=None): - """ - Parameters - max_reusable_connections - max connections to keep alive in the pool - mock_urlopen - an optional alternate urlopen function for testing - - This class uses ``urllib3`` to maintain a pool of connections. We attempt - to grab an existing idle connection from the pool, otherwise we spin - up a new connection. Once a connection is closed, it is reinserted - into the pool (unless the pool is full). - - SSL settings: - - Certificates validated using Dropbox-approved trusted root certs - - TLS v1.0 (newer TLS versions are not supported by urllib3) - - Default ciphersuites. Choosing ciphersuites is not supported by urllib3 - - Hostname verification is provided by urllib3 - """ - self.mock_urlopen = mock_urlopen - self.pool_manager = urllib3.PoolManager( - num_pools=4, # only a handful of hosts. api.dropbox.com, api-content.dropbox.com - maxsize=max_reusable_connections, - block=False, - timeout=60.0, # long enough so datastores await doesn't get interrupted - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=TRUSTED_CERT_FILE, - ssl_version=ssl.PROTOCOL_TLSv1, - ) - - def request(self, method, url, post_params=None, body=None, headers=None, raw_response=False, - is_json_request=False): - """Performs a REST request. See :meth:`RESTClient.request()` for detailed description.""" - - headers = headers or {} - - from .dropbox import __version__ - headers['User-Agent'] = 'OfficialDropboxPythonSDK/' + __version__ - - if post_params is not None: - if body: - raise ValueError("body parameter cannot be used with post_params parameter") - if is_json_request: - body = json.dumps(post_params) - headers["Content-type"] = "application/json" - else: - body = params_to_urlencoded(post_params) - headers["Content-type"] = "application/x-www-form-urlencoded" - - # Handle StringIO/BytesIO instances, because urllib3 doesn't. - if hasattr(body, 'getvalue'): - body = body.getvalue() - - # Reject any headers containing newlines; the error from the server isn't pretty. - for key, value in headers.items(): - if isinstance(value, six.string_types) and '\n' in value: - raise ValueError("headers should not contain newlines (%s: %s)" % - (key, value)) - - try: - # Grab a connection from the pool to make the request. - # We return it to the pool when caller close() the response - urlopen = self.mock_urlopen if self.mock_urlopen else self.pool_manager.urlopen - r = urlopen( - method=method, - url=url, - body=body, - headers=headers, - preload_content=False - ) - r = RESTResponse(r) # wrap up the urllib3 response before proceeding - except socket.error as e: - raise RESTSocketError(url, e) - except urllib3.exceptions.SSLError as e: - raise RESTSocketError(url, "SSL certificate error: %s" % e) - - if r.status not in (200, 206): - raise ErrorResponse(r, r.read()) - - return self.process_response(r, raw_response) - - def process_response(self, r, raw_response): - if raw_response: - return r - else: - s = r.read() - try: - resp = json_loadb(s) - except ValueError: - raise ErrorResponse(r, s) - r.close() - - return resp - - def GET(self, url, headers=None, raw_response=False): - assert type(raw_response) == bool - return self.request("GET", url, headers=headers, raw_response=raw_response) - - def POST(self, url, params=None, headers=None, raw_response=False, is_json_request=False): - assert type(raw_response) == bool - return self.request("POST", url, - post_params=params, headers=headers, raw_response=raw_response, - is_json_request=is_json_request) - - def PUT(self, url, body, headers=None, raw_response=False): - assert type(raw_response) == bool - return self.request("PUT", url, body=body, headers=headers, raw_response=raw_response) - - -class RESTClient(object): - """ - A class with all static methods to perform JSON REST requests that is used internally - by the Dropbox Client API. It provides just enough gear to make requests - and get responses as JSON data (when applicable). All requests happen over SSL. - """ - - IMPL = RESTClientObject() - - @classmethod - def request(cls, *n, **kw): - """Perform a REST request and parse the response. - - Parameters - method - An HTTP method (e.g. ``'GET'`` or ``'POST'``). - url - The URL to make a request to. - post_params - A dictionary of parameters to put in the body of the request. - This option may not be used if the body parameter is given. - body - The body of the request. Typically, this value will be a string. - It may also be a file-like object. The body - parameter may not be used with the post_params parameter. - headers - A dictionary of headers to send with the request. - raw_response - Whether to return a :class:`RESTResponse` object. Default ``False``. - It's best enabled for requests that return large amounts of data that you - would want to ``.read()`` incrementally rather than loading into memory. Also - use this for calls where you need to read metadata like status or headers, - or if the body is not JSON. - - Returns - The JSON-decoded data from the server, unless ``raw_response`` is - set, in which case a :class:`RESTResponse` object is returned instead. - - Raises - :class:`ErrorResponse` - The returned HTTP status is not 200, or the body was - not parsed from JSON successfully. - :class:`RESTSocketError` - A ``socket.error`` was raised while contacting Dropbox. - """ - return cls.IMPL.request(*n, **kw) - - @classmethod - def GET(cls, *n, **kw): - """Perform a GET request using :meth:`RESTClient.request()`.""" - return cls.IMPL.GET(*n, **kw) - - @classmethod - def POST(cls, *n, **kw): - """Perform a POST request using :meth:`RESTClient.request()`.""" - return cls.IMPL.POST(*n, **kw) - - @classmethod - def PUT(cls, *n, **kw): - """Perform a PUT request using :meth:`RESTClient.request()`.""" - return cls.IMPL.PUT(*n, **kw) - - -class RESTSocketError(socket.error): - """A light wrapper for ``socket.error`` that adds some more information.""" - - def __init__(self, host, e): - msg = "Error connecting to \"%s\": %s" % (host, str(e)) - socket.error.__init__(self, msg) - - -# Dummy class for docstrings, see doco.py. -class _ErrorResponse__doc__(Exception): - """Exception raised when :class:`DropboxClient` exeriences a problem. - - For example, this is raised when the server returns an unexpected - non-200 HTTP response. - """ - - _status__doc__ = "HTTP response status (an int)." - _reason__doc__ = "HTTP response reason (a string)." - _headers__doc__ = "HTTP response headers (a list of (header, value) tuples)." - _body__doc__ = "HTTP response body (string or JSON dict)." - _error_msg__doc__ = "Error message for developer (optional)." - _user_error_msg__doc__ = "Error message for end user (optional)." - - -class ErrorResponse(Exception): - """ - Raised by :meth:`RESTClient.request()` for requests that: - - - Return a non-200 HTTP response, or - - Have a non-JSON response body, or - - Have a malformed/missing header in the response. - - Most errors that Dropbox returns will have an error field that is unpacked and - placed on the ErrorResponse exception. In some situations, a user_error field - will also come back. Messages under user_error are worth showing to an end-user - of your app, while other errors are likely only useful for you as the developer. - """ - - def __init__(self, http_resp, body): - """ - Parameters - http_resp - The :class:`RESTResponse` which errored - body - Body of the :class:`RESTResponse`. - The reason we can't simply call ``http_resp.read()`` to - get the body, is that ``read()`` is not idempotent. - Since it can't be called more than once, - we have to pass the string body in separately - """ - Exception.__init__(self) - self.status = http_resp.status - self.reason = http_resp.reason - self.body = body - self.headers = http_resp.getheaders() - http_resp.close() # won't need this connection anymore - - try: - self.body = json_loadb(self.body) - self.error_msg = self.body.get('error') - self.user_error_msg = self.body.get('user_error') - except ValueError: - self.error_msg = None - self.user_error_msg = None - - def __str__(self): - if self.user_error_msg and self.user_error_msg != self.error_msg: - # one is translated and the other is English - msg = "%r (%r)" % (self.user_error_msg, self.error_msg) - elif self.error_msg: - msg = repr(self.error_msg) - elif not self.body: - msg = repr(self.reason) - else: - msg = "Error parsing response body or headers: " +\ - "Body - %.100r Headers - %r" % (self.body, self.headers) - - return "[%d] %s" % (self.status, msg) - - -def params_to_urlencoded(params): - """ - Returns a application/x-www-form-urlencoded 'str' representing the key/value pairs in 'params'. - - Keys are values are str()'d before calling urllib.urlencode, with the exception of unicode - objects which are utf8-encoded. - """ - def encode(o): - if isinstance(o, six.text_type): - return o.encode('utf8') - else: - return str(o) - utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} - return url_encode(utf8_params) diff --git a/dropbox/session.py b/dropbox/session.py index ce219036..fe0c7b23 100644 --- a/dropbox/session.py +++ b/dropbox/session.py @@ -1,57 +1,11 @@ -import os import pkg_resources +import os import ssl import requests from requests.adapters import HTTPAdapter from urllib3.poolmanager import PoolManager - -_TRUSTED_CERT_FILE = pkg_resources.resource_filename(__name__, 'trusted-certs.crt') - - -# TODO(kelkabany): We probably only want to instantiate this once so that even -# if multiple Dropbox objects are instantiated, they all share the same pool. -class _SSLAdapter(HTTPAdapter): - def init_poolmanager(self, connections, maxsize, block=False): # noqa: E501; pylint: disable=arguments-differ - self.poolmanager = PoolManager(num_pools=connections, - maxsize=maxsize, - block=block, - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=_TRUSTED_CERT_FILE, - ssl_version=ssl.PROTOCOL_TLSv1) - -def pinned_session(pool_maxsize=8): - http_adapter = _SSLAdapter(pool_connections=4, - pool_maxsize=pool_maxsize) - - _session = requests.session() - _session.mount('https://', http_adapter) - - return _session - -# ======================================================================== -# Deprecated: The code below is included only to support the use of the -# old v1 client class. It will be removed once v2 is at parity with v1. Do -# not use this for any new functionality. -# ======================================================================== - -import random -import six -import time -import urllib - -from . import rest - -if six.PY2: - from urlparse import parse_qs # pylint: disable=import-error,useless-suppression - url_path_quote = urllib.quote # pylint: disable=no-member,useless-suppression - url_encode = urllib.urlencode # pylint: disable=no-member,useless-suppression -else: - from urllib.parse import parse_qs # noqa: E501; pylint: disable=import-error,no-name-in-module,useless-suppression - url_path_quote = urllib.parse.quote # pylint: disable=no-member,useless-suppression - url_encode = urllib.parse.urlencode # pylint: disable=no-member,useless-suppression - API_DOMAIN = os.environ.get('DROPBOX_API_DOMAIN', os.environ.get('DROPBOX_DOMAIN', '.dropboxapi.com')) @@ -75,296 +29,24 @@ def pinned_session(pool_maxsize=8): API_NOTIFICATION_HOST = os.environ.get('DROPBOX_API_NOTIFY_HOST', HOST_NOTIFY + API_DOMAIN) WEB_HOST = os.environ.get('DROPBOX_WEB_HOST', HOST_WWW + WEB_DOMAIN) -class OAuthToken(object): - """ - A class representing an OAuth token. Contains two fields: ``key`` and - ``secret``. - """ - def __init__(self, key, secret): - self.key = key - self.secret = secret - -class BaseSession(object): - API_VERSION = 1 - - def __init__(self, consumer_key, consumer_secret, access_type="auto", - locale=None, rest_client=rest.RESTClient): - """Initialize a DropboxSession object. - - Your consumer key and secret are available - at https://www.dropbox.com/developers/apps - - Args: - - - ``access_type``: Either 'auto' (the default), 'dropbox', or - 'app_folder'. You probably don't need to specify this and should - just use the default. - - ``locale``: A locale string ('en', 'pt_PT', etc.) [optional] - The locale setting will be used to translate any user-facing error - messages that the server generates. At this time Dropbox supports - 'en', 'es', 'fr', 'de', and 'ja', though we will be supporting more - languages in the future. If you send a language the server doesn't - support, messages will remain in English. Look for these translated - messages in rest.ErrorResponse exceptions as e.user_error_msg. - - """ - assert access_type in ['dropbox', 'app_folder', 'auto'], \ - "expected access_type of 'dropbox' or 'app_folder'" - self.consumer_creds = OAuthToken(consumer_key, consumer_secret) - self.token = None - self.request_token = None - self.root = 'sandbox' if access_type == 'app_folder' else access_type - self.locale = locale - self.rest_client = rest_client - - def is_linked(self): - """Return whether the DropboxSession has an access token attached.""" - return bool(self.token) - - def unlink(self): - """Remove any attached access token from the DropboxSession.""" - self.token = None - - def build_path(self, target, params=None): - """Build the path component for an API URL. - - This method urlencodes the parameters, adds them - to the end of the target url, and puts a marker for the API - version in front. - - Args: - - ``target``: A target url (e.g. '/files') to build upon. - - ``params``: A dictionary of parameters (name to value). [optional] - - Returns: - - The path and parameters components of an API URL. - """ - if six.PY2 and isinstance(target, six.text_type): - target = target.encode("utf8") - - target_path = url_path_quote(target) - - params = params or {} - params = params.copy() - - if self.locale: - params['locale'] = self.locale - - if params: - return "/%s%s?%s" % (self.API_VERSION, target_path, url_encode(params)) - else: - return "/%s%s" % (self.API_VERSION, target_path) - - def build_url(self, host, target, params=None): - """Build an API URL. - - This method adds scheme and hostname to the path - returned from build_path. - - Args: - - ``target``: A target url (e.g. '/files') to build upon. - - ``params``: A dictionary of parameters (name to value). [optional] - - Returns: - - The full API URL. - """ - return "https://%s%s" % (host, self.build_path(target, params)) - -BaseSession.API_HOST = API_HOST -BaseSession.API_CONTENT_HOST = API_CONTENT_HOST -BaseSession.API_NOTIFICATION_HOST = API_NOTIFICATION_HOST -BaseSession.WEB_HOST = WEB_HOST - -class DropboxSession(BaseSession): - - def set_token(self, access_token, access_token_secret): - """Attach an access token to the DropboxSession. - - Note that the access 'token' is made up of both a token string - and a secret string. - """ - self.token = OAuthToken(access_token, access_token_secret) - - def set_request_token(self, request_token, request_token_secret): - """Attach an request token to the DropboxSession. - - Note that the request 'token' is made up of both a token string - and a secret string. - """ - self.request_token = OAuthToken(request_token, request_token_secret) - - def build_authorize_url(self, request_token, oauth_callback=None): - """Build a request token authorization URL. - - After obtaining a request token, you'll need to send the user to - the URL returned from this function so that they can confirm that - they want to connect their account to your app. - - Args: - - ``request_token``: A request token from obtain_request_token. - - ``oauth_callback``: A url to redirect back to with the authorized - request token. - - Returns: - - An authorization for the given request token. - """ - params = {'oauth_token': request_token.key, - } - - if oauth_callback: - params['oauth_callback'] = oauth_callback - - return self.build_url(self.WEB_HOST, '/oauth/authorize', params) - - def obtain_request_token(self): - """Obtain a request token from the Dropbox API. - - This is your first step in the OAuth process. You call this to get a - request_token from the Dropbox server that you can then use with - DropboxSession.build_authorize_url() to get the user to authorize it. - After it's authorized you use this token with - DropboxSession.obtain_access_token() to get an access token. - - NOTE: You should only need to do this once for each user, and then you - can store the access token for that user for later operations. - - Returns: - - An :py:class:`OAuthToken` object representing the - request token Dropbox assigned to this app. Also attaches the - request token as self.request_token. - """ - self.token = None # clear any token currently on the request - url = self.build_url(self.API_HOST, '/oauth/request_token') - headers, params = self.build_access_headers() - - response = self.rest_client.POST(url, headers=headers, params=params, raw_response=True) - self.request_token = self._parse_token(response.read()) - return self.request_token - - def obtain_access_token(self, request_token=None): - """Obtain an access token for a user. - - After you get a request token, and then send the user to the authorize - URL, you can use the authorized request token with this method to get the - access token to use for future operations. The access token is stored on - the session object. - - Args: - - ``request_token``: A request token from obtain_request_token. [optional] - The request_token should have been authorized via the - authorization url from build_authorize_url. If you don't pass - a request_token, the fallback is self.request_token, which - will exist if you previously called obtain_request_token on this - DropboxSession instance. - - Returns: - - An :py:class:`OAuthToken` object with fields ``key`` and ``secret`` - representing the access token Dropbox assigned to this app and - user. Also attaches the access token as self.token. - """ - request_token = request_token or self.request_token - assert request_token, "No request_token available on the session. Please pass one." - url = self.build_url(self.API_HOST, '/oauth/access_token') - headers, params = self.build_access_headers(request_token=request_token) - - response = self.rest_client.POST(url, headers=headers, params=params, raw_response=True) - self.token = self._parse_token(response.read()) - return self.token - - def build_access_headers(self, params=None, request_token=None): - """Build OAuth access headers for a future request. - - Args: - - ``params``: A dictionary of parameters to add to what's already on the url. - Typically, this would consist of POST parameters. - - Returns: - - A tuple of (header_dict, params) where header_dict is a dictionary - of header names and values appropriate for passing into dropbox.rest.RESTClient - and params is a dictionary like the one that was passed in, but augmented with - oauth-related parameters as appropriate. - """ - if params is None: - params = {} - else: - params = params.copy() - - oauth_params = { - 'oauth_consumer_key': self.consumer_creds.key, - 'oauth_timestamp': self._generate_oauth_timestamp(), - 'oauth_nonce': self._generate_oauth_nonce(), - 'oauth_version': self._oauth_version(), - } - - token = request_token if request_token is not None else self.token - - if token: - oauth_params['oauth_token'] = token.key - - self._oauth_sign_request(oauth_params, self.consumer_creds, token) - - headers = { - 'Authorization': - 'OAuth %s' % ','.join( - '%s="%s"' % (k, v) for k, v in six.iteritems(oauth_params))} - - return headers, params - - @classmethod - def _oauth_sign_request(cls, params, consumer_pair, token_pair): - params.update({'oauth_signature_method': 'PLAINTEXT', - 'oauth_signature': ('%s&%s' % (consumer_pair.secret, token_pair.secret) - if token_pair is not None else '%s&' % (consumer_pair.secret,))}) - - @classmethod - def _generate_oauth_timestamp(cls): - return int(time.time()) - - @classmethod - def _generate_oauth_nonce(cls, length=8): - return ''.join([str(random.SystemRandom().randint(0, 9)) for _ in range(length)]) - - @classmethod - def _oauth_version(cls): - return '1.0' - - @classmethod - def _parse_token(cls, s): - if not s: - raise ValueError("Invalid parameter string.") - - params = parse_qs(s, keep_blank_values=False) - if not params: - raise ValueError("Invalid parameter string: %r" % s) - - if six.PY2: - oauth_token_key = 'oauth_token' - oauth_token_secret_key = 'oauth_token_secret' - else: - oauth_token_key = b'oauth_token' - oauth_token_secret_key = b'oauth_token_secret' - try: - key = params[oauth_token_key][0] - except Exception: - raise ValueError("'oauth_token' not found in OAuth request.") - - try: - secret = params[oauth_token_secret_key][0] - except Exception: - raise ValueError("'oauth_token_secret' not found in " - "OAuth request.") - - return OAuthToken(key, secret) +_TRUSTED_CERT_FILE = pkg_resources.resource_filename(__name__, 'trusted-certs.crt') -# Don't use this class directly. -class DropboxOAuth2Session(BaseSession): +# TODO(kelkabany): We probably only want to instantiate this once so that even +# if multiple Dropbox objects are instantiated, they all share the same pool. +class _SSLAdapter(HTTPAdapter): + def init_poolmanager(self, connections, maxsize, block=False, **_): + self.poolmanager = PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=_TRUSTED_CERT_FILE, + ssl_version=ssl.PROTOCOL_TLSv1, + ) - def __init__(self, oauth2_access_token, locale, rest_client=rest.RESTClient): - super(DropboxOAuth2Session, self).__init__("", "", "auto", - locale=locale, rest_client=rest_client) - self.access_token = oauth2_access_token +def pinned_session(pool_maxsize=8): + http_adapter = _SSLAdapter(pool_connections=4, pool_maxsize=pool_maxsize) + _session = requests.session() + _session.mount('https://', http_adapter) - def build_access_headers(self, params=None, token=None): - assert token is None - headers = {"Authorization": "Bearer " + self.access_token} - return headers, params + return _session diff --git a/test/test_dropbox.py b/test/test_dropbox.py index b2b492d3..13ad1c7e 100644 --- a/test/test_dropbox.py +++ b/test/test_dropbox.py @@ -5,13 +5,11 @@ import datetime import functools import os -import posixpath import random import re import six import string import sys -import threading import unittest try: @@ -23,7 +21,6 @@ Dropbox, DropboxOAuth2Flow, DropboxTeam, - client, session, ) from dropbox.exceptions import ( @@ -34,8 +31,6 @@ from dropbox.files import ( ListFolderError, ) -from dropbox.rest import ErrorResponse - def _token_from_env_or_die(env_name='DROPBOX_TOKEN'): oauth2_token = os.environ.get(env_name) @@ -61,7 +56,6 @@ def wrapped(self, *args, **kwargs): return f(self, *args, **kwargs) return wrapped - MALFORMED_TOKEN = 'asdf' INVALID_TOKEN = 'z' * 62 @@ -143,490 +137,5 @@ def test_team(self, dbxt): team_member_id = r.members[0].profile.team_member_id dbxt.as_user(team_member_id).files_list_folder('') - -PY3 = sys.version_info[0] == 3 - -def dbx_v1_client_from_env(f): - @functools.wraps(f) - def wrapped(self, *args, **kwargs): - oauth2_token = _token_from_env_or_die() - args += (client.DropboxClient(oauth2_token),) - return f(self, *args, **kwargs) - return wrapped - -def dbx_v1_client_from_env_with_test_dir(f): - @functools.wraps(f) - def wrapped(self, *args, **kwargs): - oauth2_token = _token_from_env_or_die() - dbx_client = client.DropboxClient(oauth2_token) - test_dir = "/Test/%s" % str(datetime.datetime.utcnow()) - args += (dbx_client, test_dir,) - try: - return f(self, *args, **kwargs) - finally: - try: - dbx_client.file_delete(test_dir) - except ErrorResponse as exc: - if 'not found' not in exc.body['error']: - raise - return wrapped - - -class BaseClientTests(unittest.TestCase): - - _LOCAL_TEST_DIR = os.path.dirname(__file__) - FOO_TXT = os.path.join(_LOCAL_TEST_DIR, 'foo.txt') - FROG_JPG = os.path.join(_LOCAL_TEST_DIR, 'Costa Rican Frog.jpg') - SONG_MP3 = os.path.join(_LOCAL_TEST_DIR, 'dropbox_song.mp3') - - def upload_file(self, dbx_client, src, target, **kwargs): - with open(src, 'rb') as f: - return dbx_client.put_file(target, f, **kwargs) - - def dict_has(self, dictionary, *args, **kwargs): - for key in args: - self.assertTrue(key in dictionary) - for (key, value) in kwargs.items(): - self.assertEqual(value, dictionary[key]) - - def assert_file(self, dictionary, filename, *args, **kwargs): - defaults = dict( - bytes=os.path.getsize(filename), - is_dir=False - ) - combined = dict(list(defaults.items()) + list(kwargs.items())) - self.dict_has(dictionary, *args, **combined) - - @dbx_v1_client_from_env - def test_account_info(self, dbx_client): - """Tests if the account_info returns the expected fields.""" - account_info = dbx_client.account_info() - self.dict_has( - account_info, - "country", - "display_name", - "referral_link", - "quota_info", - "uid", - ) - - @dbx_v1_client_from_env_with_test_dir - def test_put_file(self, dbx_client, test_dir): - """Tests if put_file returns the expected metadata""" - def test_put(file, path): # pylint: disable=redefined-builtin - file_path = posixpath.join(test_dir, path) - f = open(file, "rb") - metadata = dbx_client.put_file(file_path, f) - self.assert_file(metadata, file, path=file_path) - test_put(self.FOO_TXT, "put_foo.txt") - test_put(self.SONG_MP3, "put_song.mp3") - test_put(self.FROG_JPG, "put_frog.jpg") - - @dbx_v1_client_from_env_with_test_dir - def test_put_file_overwrite(self, dbx_client, test_dir): - """Tests if put_file with overwrite=true returns the expected metadata""" - path = posixpath.join(test_dir, "foo_overwrite.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - f = BytesIO(b"This Overwrites") - metadata = dbx_client.put_file(path, f, overwrite=True) - self.dict_has( - metadata, - size="15 bytes", - bytes=15, - is_dir=False, - path=path, - mime_type="text/plain", - ) - - @dbx_v1_client_from_env_with_test_dir - def test_get_file(self, dbx_client, test_dir): - """Tests if storing and retrieving a file returns the same file""" - def test_get(file, path): # pylint: disable=redefined-builtin - file_path = posixpath.join(test_dir, path) - self.upload_file(dbx_client, file, file_path) - downloaded = dbx_client.get_file(file_path).read() - local = open(file, "rb").read() - self.assertEqual(len(downloaded), len(local)) - self.assertEqual(downloaded, local) - test_get(self.FOO_TXT, "get_foo.txt") - test_get(self.FROG_JPG, "get_frog.txt") - test_get(self.SONG_MP3, "get_song.txt") - - @dbx_v1_client_from_env_with_test_dir - def test_get_partial_file(self, dbx_client, test_dir): - """Tests if storing a file and retrieving part of it returns the correct part""" - def test_get(file, path, start_frac, download_frac): # noqa: E501; pylint: disable=redefined-builtin - file_path = posixpath.join(test_dir, path) - self.upload_file(dbx_client, file, file_path) - local = open(file, "rb").read() - local_len = len(local) - - download_start = int(start_frac * local_len) if start_frac is not None else None - download_length = int(download_frac * local_len) if download_frac is not None else None - downloaded = dbx_client.get_file(file_path, start=download_start, - length=download_length).read() - - local_file = open(file, "rb") - if download_start: - local_file.seek(download_start) - if download_length is None: - local_partial = local_file.read() - else: - local_partial = local_file.read(download_length) - elif download_length: - local_file.seek(-1 * download_length, 2) - local_partial = local_file.read(download_length) - - self.assertEqual(len(downloaded), len(local_partial)) - self.assertEqual(downloaded, local_partial) - test_get(self.FOO_TXT, "get_foo.txt", 0.25, 0.5) - test_get(self.FROG_JPG, "get_frog.txt", None, 0.5) - test_get(self.SONG_MP3, "get_song.txt", 0.25, None) - - @dbx_v1_client_from_env_with_test_dir - def test_metadata(self, dbx_client, test_dir): - """Tests if metadata returns the expected values for a files uploaded earlier""" - path = posixpath.join(test_dir, "foo_upload.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - metadata = dbx_client.metadata(path) - self.assert_file(metadata, self.FOO_TXT, path=path) - - # Test root metadata - dbx_client.metadata('/') - dbx_client.metadata('') - - @dbx_v1_client_from_env_with_test_dir - def test_metadata_bad(self, dbx_client, test_dir): - """Tests if metadata returns an error for nonexistent file""" - self.assertRaises( - ErrorResponse, - lambda: dbx_client.metadata(posixpath.join(test_dir, "foo_does_not_exist.txt")) - ) - - @dbx_v1_client_from_env_with_test_dir - def test_create_folder(self, dbx_client, test_dir): - """Tests if creating a folder works""" - path = posixpath.join(test_dir, u"new_fold\xe9r") - metadata = dbx_client.file_create_folder(path) - self.dict_has(metadata, - size="0 bytes", - bytes=0, - is_dir=True, - path=path) - - @dbx_v1_client_from_env_with_test_dir - def test_create_folder_dupe(self, dbx_client, test_dir): - """Tests if creating a folder fails correctly if one already exists""" - path = posixpath.join(test_dir, u"new_fold\xe9r_dupe") - dbx_client.file_create_folder(path) - self.assertRaises( - ErrorResponse, - lambda: dbx_client.file_create_folder(path) - ) - - @dbx_v1_client_from_env_with_test_dir - def test_delete(self, dbx_client, test_dir): - """Tests if deleting a file really makes it disappear""" - path = posixpath.join(test_dir, u"d\xe9lfoo.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - metadata = dbx_client.metadata(path) - self.assert_file(metadata, self.FOO_TXT, path=path) - dbx_client.file_delete(path) - - metadata = dbx_client.metadata(path) - self.assert_file( - metadata, - self.FOO_TXT, - path=path, - bytes=0, - size="0 bytes", - is_deleted=True, - ) - - @dbx_v1_client_from_env_with_test_dir - def test_copy(self, dbx_client, test_dir): - """Tests copying a file, to ensure that two copies exist after the operation""" - path = posixpath.join(test_dir, "copyfoo.txt") - path2 = posixpath.join(test_dir, "copyfoo2.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - dbx_client.file_copy(path, path2) - metadata = dbx_client.metadata(path) - metadata2 = dbx_client.metadata(path2) - self.assert_file(metadata, self.FOO_TXT, path=path) - self.assert_file(metadata2, self.FOO_TXT, path=path2) - - @dbx_v1_client_from_env_with_test_dir - def test_move(self, dbx_client, test_dir): - """Tests moving a file, to ensure the new copy exists and the old copy is removed""" - path = posixpath.join(test_dir, "movefoo.txt") - path2 = posixpath.join(test_dir, "movefoo2.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - dbx_client.file_move(path, path2) - - metadata = dbx_client.metadata(path) - self.assert_file(metadata, self.FOO_TXT, path=path, - is_deleted=True, size="0 bytes", bytes=0) - - metadata = dbx_client.metadata(path2) - self.assert_file(metadata, self.FOO_TXT, path=path2) - - @dbx_v1_client_from_env_with_test_dir - def test_thumbnail(self, dbx_client, test_dir): - path = posixpath.join(test_dir, "frog.jpeg") - orig_md = self.upload_file(dbx_client, self.FROG_JPG, path) - path = orig_md['path'] - - for fmt in ('JPEG', 'PNG'): - prev_len = 0 - for ident in ('xs', 's', 'm', 'l', 'xl'): - with dbx_client.thumbnail(path, ident, fmt) as r: - data1 = r.read() - r, _ = dbx_client.thumbnail_and_metadata(path, ident, fmt) - with r: - data2 = r.read() - self.assertEqual(data1, data2) - # Make sure the amount of data returned increases as we increase the size. - self.assertTrue(len(data1) > prev_len) - prev_len = len(data1) - - # Make sure the default is 'm' - with dbx_client.thumbnail(path, 'm') as r: - data_m = r.read() - with dbx_client.thumbnail(path) as r: - data1 = r.read() - r, _ = dbx_client.thumbnail_and_metadata(path) - with r: - data2 = r.read() - self.assertEqual(data_m, data1) - self.assertEqual(data_m, data2) - - @dbx_v1_client_from_env_with_test_dir - def test_stream(self, dbx_client, test_dir): - """Tests file streaming using the /media endpoint""" - path = posixpath.join(test_dir, "stream_song.mp3") - self.upload_file(dbx_client, self.SONG_MP3, path) - link = dbx_client.media(path) - self.dict_has( - link, - "url", - "expires", - ) - - @dbx_v1_client_from_env_with_test_dir - def test_share(self, dbx_client, test_dir): - """Tests file streaming using the /media endpoint""" - path = posixpath.join(test_dir, "stream_song.mp3") - self.upload_file(dbx_client, self.SONG_MP3, path) - link = dbx_client.share(path) - self.dict_has( - link, - "url", - "expires", - ) - - @dbx_v1_client_from_env_with_test_dir - def test_search(self, dbx_client, test_dir): - """Tests searching for a file in a folder""" - path = posixpath.join(test_dir, "search/") - - j = posixpath.join - self.upload_file(dbx_client, self.FOO_TXT, j(path, "text.txt")) - self.upload_file(dbx_client, self.FOO_TXT, j(path, u"t\xe9xt.txt")) - self.upload_file(dbx_client, self.FOO_TXT, j(path, "subFolder/text.txt")) - self.upload_file(dbx_client, self.FOO_TXT, j(path, "subFolder/cow.txt")) - self.upload_file(dbx_client, self.FROG_JPG, j(path, "frog.jpg")) - self.upload_file(dbx_client, self.FROG_JPG, j(path, "frog2.jpg")) - self.upload_file(dbx_client, self.FROG_JPG, j(path, "subFolder/frog2.jpg")) - - results = dbx_client.search(path, "sasdfasdf") - self.assertEqual(results, []) - - results = dbx_client.search(path, "jpg") - self.assertEqual(len(results), 3) - for metadata in results: - self.assert_file(metadata, self.FROG_JPG) - - results = dbx_client.search(j(path, "subFolder"), "jpg") - self.assertEqual(len(results), 1) - self.assert_file(results[0], self.FROG_JPG) - - all_tex_files = {j(path, n) for n in ["text.txt", u"t\xe9xt.txt", "subFolder/text.txt"]} - - results = dbx_client.search(path, "tex") - self.assertEqual({r["path"] for r in results}, all_tex_files) - - results = dbx_client.search(path, u"t\xe9x") - self.assertEqual({r["path"] for r in results}, all_tex_files) - - @dbx_v1_client_from_env_with_test_dir - def test_revisions_restore(self, dbx_client, test_dir): - """Tests getting the old revisions of a file""" - path = posixpath.join(test_dir, "foo_revs.txt") - self.upload_file(dbx_client, self.FOO_TXT, path) - self.upload_file(dbx_client, self.FROG_JPG, path, overwrite=True) - self.upload_file(dbx_client, self.SONG_MP3, path, overwrite=True) - revs = dbx_client.revisions(path) - metadata = dbx_client.metadata(path) - self.assert_file(metadata, self.SONG_MP3, path=path, mime_type="text/plain") - - self.assertEqual(len(revs), 3) - self.assert_file(revs[0], self.SONG_MP3, path=path, mime_type="text/plain") - self.assert_file(revs[1], self.FROG_JPG, path=path, mime_type="text/plain") - self.assert_file(revs[2], self.FOO_TXT, path=path, mime_type="text/plain") - - metadata = dbx_client.restore(path, revs[2]["rev"]) - self.assert_file(metadata, self.FOO_TXT, path=path, mime_type="text/plain") - metadata = dbx_client.metadata(path) - self.assert_file(metadata, self.FOO_TXT, path=path, mime_type="text/plain") - - @dbx_v1_client_from_env_with_test_dir - def test_copy_ref(self, dbx_client, test_dir): - """Tests using the /copy_ref endpoint to move data within a single dropbox""" - path = posixpath.join(test_dir, "foo_copy_ref.txt") - path2 = posixpath.join(test_dir, "foo_copy_ref_target.txt") - - self.upload_file(dbx_client, self.FOO_TXT, path) - copy_ref = dbx_client.create_copy_ref(path) - self.dict_has( - copy_ref, - "expires", - "copy_ref" - ) - - dbx_client.add_copy_ref(copy_ref["copy_ref"], path2) - metadata = dbx_client.metadata(path2) - self.assert_file(metadata, self.FOO_TXT, path=path2) - copied_foo = dbx_client.get_file(path2).read() - local_foo = open(self.FOO_TXT, "rb").read() - self.assertEqual(len(copied_foo), len(local_foo)) - self.assertEqual(copied_foo, local_foo) - - @dbx_v1_client_from_env_with_test_dir - def test_chunked_upload2(self, dbx_client, test_dir): - target_path = posixpath.join(test_dir, 'chunked_upload_file.txt') - chunk_size = 4 * 1024 - _, random_data1 = make_random_data(chunk_size) - _, random_data2 = make_random_data(chunk_size) - - new_offset, upload_id = dbx_client.upload_chunk(BytesIO(random_data1), 0) - self.assertEqual(new_offset, chunk_size) - self.assertIsNotNone(upload_id) - - new_offset, upload_id2 = dbx_client.upload_chunk( - BytesIO(random_data2), - 0, - new_offset, - upload_id, - ) - self.assertEqual(new_offset, chunk_size * 2) - self.assertEqual(upload_id2, upload_id) - - metadata = dbx_client.commit_chunked_upload( - '/auto' + target_path, - upload_id, - overwrite=True, - ) - self.dict_has(metadata, bytes=chunk_size * 2, path=target_path) - - downloaded = dbx_client.get_file(target_path).read() - self.assertEqual(chunk_size * 2, len(downloaded)) - self.assertEqual(random_data1, downloaded[:chunk_size]) - self.assertEqual(random_data2, downloaded[chunk_size:]) - - @dbx_v1_client_from_env_with_test_dir - def test_chunked_uploader(self, dbx_client, test_dir): - path = posixpath.join(test_dir, "chunked_uploader_file.txt") - size = 10 * 1024 * 1024 - chunk_size = 4 * 1024 * 1102 - _, random_data = make_random_data(size) - uploader = dbx_client.get_chunked_uploader(BytesIO(random_data), len(random_data)) - error_count = 0 - while uploader.offset < size and error_count < 5: - try: - uploader.upload_chunked(chunk_size=chunk_size) - except ErrorResponse: - error_count += 1 - uploader.finish(path) - downloaded = dbx_client.get_file(path).read() - self.assertEqual(size, len(downloaded)) - self.assertEqual(random_data, downloaded) - - @dbx_v1_client_from_env_with_test_dir - def test_delta(self, dbx_client, test_dir): - prefix = posixpath.join(test_dir, "delta") - - a = posixpath.join(prefix, "a.txt") - self.upload_file(dbx_client, self.FOO_TXT, a) - b = posixpath.join(prefix, "b.txt") - self.upload_file(dbx_client, self.FOO_TXT, b) - c = posixpath.join(prefix, "c") - c_1 = posixpath.join(prefix, "c/1.txt") - self.upload_file(dbx_client, self.FOO_TXT, c_1) - c_2 = posixpath.join(prefix, "c/2.txt") - self.upload_file(dbx_client, self.FOO_TXT, c_2) - - prefix_lc = prefix.lower() - c_lc = c.lower() - - # /delta on everything - expected = {p.lower() for p in (prefix, a, b, c, c_1, c_2)} - entries = set() - cursor = None - while True: - r = dbx_client.delta(cursor) - if r['reset']: - entries = set() - for path_lc, md in r['entries']: - if path_lc.startswith(prefix_lc + '/') or path_lc == prefix_lc: - assert md is not None, "we should never get deletes under 'prefix'" - entries.add(path_lc) - if not r['has_more']: - break - cursor = r['cursor'] - - self.assertEqual(expected, entries) - - # /delta where path_prefix=c - expected = {p.lower() for p in (c, c_1, c_2)} - entries = set() - cursor = None - while True: - r = dbx_client.delta(cursor, path_prefix=c) - if r['reset']: - entries = set() - for path_lc, md in r['entries']: - assert path_lc.startswith(c_lc + '/') or path_lc == c_lc - assert md is not None, "we should never get deletes" - entries.add(path_lc) - if not r['has_more']: - break - cursor = r['cursor'] - - self.assertEqual(expected, entries) - - @dbx_v1_client_from_env_with_test_dir - def test_longpoll_delta(self, dbx_client, test_dir): - cursor = dbx_client.delta()['cursor'] - - def assert_longpoll(): - r = dbx_client.longpoll_delta(cursor) - assert (r['changes']) - - t = threading.Thread(target=assert_longpoll) - t.start() - - self.upload_file(dbx_client, self.FOO_TXT, posixpath.join(test_dir, "foo.txt")) - t.join() - - -def make_random_data(size): - random_data = os.urandom(size) - if PY3: - random_string = random_data.decode('latin1') - else: - random_string = random_data - return random_string, random_data - - if __name__ == '__main__': unittest.main()