From 43ae3bddba9076951a8d493a59b44d78047931f7 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 10 May 2017 14:14:40 -0700 Subject: [PATCH 01/10] Switched to google-resumable-media in BigQuery. --- bigquery/google/cloud/bigquery/table.py | 355 +++++++++++++++--------- bigquery/nox.py | 2 +- storage/google/cloud/storage/blob.py | 1 + 3 files changed, 231 insertions(+), 127 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7e21e35d1fb0..f155ebedd103 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -21,16 +21,16 @@ import httplib2 import six +import google.auth.transport.requests +from google import resumable_media +from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import ResumableUpload + +from google.cloud._helpers import _bytes_to_unicode from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime from google.cloud.exceptions import NotFound from google.cloud.exceptions import make_exception -from google.cloud.iterator import HTTPIterator -from google.cloud.streaming.exceptions import HttpError -from google.cloud.streaming.http_wrapper import Request -from google.cloud.streaming.http_wrapper import make_api_request -from google.cloud.streaming.transfer import RESUMABLE_UPLOAD -from google.cloud.streaming.transfer import Upload from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery._helpers import _item_to_row from google.cloud.bigquery._helpers import _rows_page_start @@ -39,6 +39,16 @@ _TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'" _MARKER = object() +_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB +_BASE_UPLOAD_TEMPLATE = ( + u'https://www.googleapis.com/upload/bigquery/v2/projects/' + u'{project}/jobs?uploadType=') +_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart' +_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable' +_GENERIC_CONTENT_TYPE = u'*/*' +_READ_LESS_THAN_SIZE = ( + 'Size {:d} was specified but the file-like object only had ' + '{:d} bytes remaining.') class Table(object): @@ -815,15 +825,80 @@ def insert_data(self, return errors - @staticmethod - def _check_response_error(request, http_response): - """Helper for :meth:`upload_from_file`.""" - info = http_response.info - status = int(info['status']) - if not 200 <= status < 300: - faux_response = httplib2.Response({'status': status}) - raise make_exception(faux_response, http_response.content, - error_info=request.url) + def _make_transport(self, client): + """Make an authenticated transport with a client's credentials. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :rtype transport: + :class:`~google.auth.transport.requests.AuthorizedSession` + :returns: The transport (with credentials) that will + make authenticated requests. + """ + # Create a ``requests`` transport with the client's credentials. + transport = google.auth.transport.requests.AuthorizedSession( + client._credentials) + return transport + + def _initiate_resumable_upload(self, client, stream, + metadata, num_retries): + chunk_size = _DEFAULT_CHUNKSIZE + transport = self._make_transport(client) + headers = _get_upload_headers(client._connection.USER_AGENT) + upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) + upload = ResumableUpload(upload_url, chunk_size, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + upload.initiate( + transport, stream, metadata, _GENERIC_CONTENT_TYPE, + stream_final=False) + + return upload, transport + + def _do_resumable_upload(self, client, stream, metadata, num_retries): + upload, transport = self._initiate_resumable_upload( + client, stream, metadata, num_retries) + + while not upload.finished: + response = upload.transmit_next_chunk(transport) + + return response + + def _do_multipart_upload(self, client, stream, metadata, + size, num_retries): + data = stream.read(size) + if len(data) < size: + msg = _READ_LESS_THAN_SIZE.format(size, len(data)) + raise ValueError(msg) + + transport = self._make_transport(client) + headers = _get_upload_headers(client._connection.USER_AGENT) + + upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project) + upload = MultipartUpload(upload_url, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + response = upload.transmit( + transport, data, metadata, _GENERIC_CONTENT_TYPE) + + return response + + def _do_upload(self, client, stream, metadata, size, num_retries): + if size is None: + response = self._do_resumable_upload( + client, stream, metadata, num_retries) + else: + response = self._do_multipart_upload( + client, stream, metadata, size, num_retries) + + return response.json() # pylint: disable=too-many-arguments,too-many-locals def upload_from_file(self, @@ -846,10 +921,6 @@ def upload_from_file(self, job_name=None): """Upload the contents of this table from a file-like object. - The content type of the upload will either be - - The value passed in to the function (if any) - - ``text/csv``. - :type file_obj: file :param file_obj: A file handle opened in binary mode for reading. @@ -860,7 +931,7 @@ def upload_from_file(self, :type rewind: bool :param rewind: If True, seek to the beginning of the file handle before - writing the file to Cloud Storage. + writing the file. :type size: int :param size: The number of bytes to read from the file handle. @@ -911,16 +982,16 @@ def upload_from_file(self, :param write_disposition: job configuration option; see :meth:`google.cloud.bigquery.job.LoadJob`. - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back - to the ``client`` stored on the current dataset. + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the current table. :type job_name: str :param job_name: Optional. The id of the job. Generated if not explicitly passed in. - :rtype: :class:`google.cloud.bigquery.jobs.LoadTableFromStorageJob` + :rtype: :class:`~google.cloud.bigquery.jobs.LoadTableFromStorageJob` + :returns: the job instance used to load the data (e.g., for querying status). Note that the job is already started: do not call ``job.begin()``. @@ -929,54 +1000,10 @@ def upload_from_file(self, a file opened in text mode. """ client = self._require_client(client) - connection = client._connection - content_type = 'application/octet-stream' - - # Rewind the file if desired. - if rewind: - file_obj.seek(0, os.SEEK_SET) - - mode = getattr(file_obj, 'mode', None) - - if mode is not None and mode not in ('rb', 'r+b', 'rb+'): - raise ValueError( - "Cannot upload files opened in text mode: use " - "open(filename, mode='rb') or open(filename, mode='r+b')") - - # Get the basic stats about the file. - total_bytes = size - if total_bytes is None: - if hasattr(file_obj, 'fileno'): - total_bytes = os.fstat(file_obj.fileno()).st_size - else: - raise ValueError('total bytes could not be determined. Please ' - 'pass an explicit size.') - headers = { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip, deflate', - 'User-Agent': connection.USER_AGENT, - 'content-type': 'application/json', - } - - metadata = { - 'configuration': { - 'load': { - 'sourceFormat': source_format, - 'destinationTable': { - 'projectId': self._dataset.project, - 'datasetId': self._dataset.name, - 'tableId': self.name, - } - } - } - } - - if len(self._schema) > 0: - load_config = metadata['configuration']['load'] - load_config['schema'] = { - 'fields': _build_schema_resource(self._schema) - } - + _maybe_rewind(file_obj, rewind=rewind) + _check_mode(file_obj) + metadata = _get_upload_metadata( + source_format, self._schema, self._dataset, self.name) _configure_job_metadata(metadata, allow_jagged_rows, allow_quoted_newlines, create_disposition, encoding, field_delimiter, @@ -984,47 +1011,12 @@ def upload_from_file(self, quote_character, skip_leading_rows, write_disposition, job_name) - upload = Upload(file_obj, content_type, total_bytes, - auto_transfer=False) - - url_builder = _UrlBuilder() - upload_config = _UploadConfig() - - # Base URL may change once we know simple vs. resumable. - base_url = connection.API_BASE_URL + '/upload' - path = '/projects/%s/jobs' % (self._dataset.project,) - upload_url = connection.build_api_url(api_base_url=base_url, path=path) - - # Use apitools 'Upload' facility. - request = Request(upload_url, 'POST', headers, - body=json.dumps(metadata)) - - upload.configure_request(upload_config, request, url_builder) - query_params = url_builder.query_params - base_url = connection.API_BASE_URL + '/upload' - request.url = connection.build_api_url(api_base_url=base_url, - path=path, - query_params=query_params) try: - upload.initialize_upload(request, connection.http) - except HttpError as err_response: - faux_response = httplib2.Response(err_response.response) - raise make_exception(faux_response, err_response.content, - error_info=request.url) - - if upload.strategy == RESUMABLE_UPLOAD: - http_response = upload.stream_file(use_chunks=True) - else: - http_response = make_api_request(connection.http, request, - retries=num_retries) - - self._check_response_error(request, http_response) - - response_content = http_response.content - if not isinstance(response_content, - six.string_types): # pragma: NO COVER Python3 - response_content = response_content.decode('utf-8') - return client.job_from_resource(json.loads(response_content)) + created_json = self._do_upload( + client, file_obj, metadata, size, num_retries) + return client.job_from_resource(created_json) + except resumable_media.InvalidResponse as exc: + _raise_from_invalid_response(exc) # pylint: enable=too-many-arguments,too-many-locals @@ -1124,18 +1116,129 @@ def _build_schema_resource(fields): return infos -class _UploadConfig(object): - """Faux message FBO apitools' 'configure_request'.""" - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'/upload/bigquery/v2/projects/{project}/jobs' +def _item_to_row(iterator, resource): + """Convert a JSON row to the native object. + + .. note:: + + This assumes that the ``schema`` attribute has been + added to the iterator after being created, which + should be done by the caller. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: An item to be converted to a row. + + :rtype: tuple + :returns: The next row in the page. + """ + return _row_from_json(resource, iterator.schema) + + +# pylint: disable=unused-argument +def _rows_page_start(iterator, page, response): + """Grab total rows after a :class:`~google.cloud.iterator.Page` started. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type page: :class:`~google.cloud.iterator.Page` + :param page: The page that was just created. + + :type response: dict + :param response: The JSON API response for a page of rows in a table. + """ + total_rows = response.get('totalRows') + if total_rows is not None: + total_rows = int(total_rows) + iterator.total_rows = total_rows +# pylint: enable=unused-argument + + +def _convert_timestamp(value): + """Helper for :meth:`Table.insert_data`.""" + if isinstance(value, datetime.datetime): + value = _microseconds_from_datetime(value) * 1e-6 + return value + + +def _maybe_rewind(stream, rewind=False): + """Rewind the stream if desired. + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. -class _UrlBuilder(object): - """Faux builder FBO apitools' 'configure_request'""" - def __init__(self): - self.query_params = {} - self._relative_path = '' + :type rewind: bool + :param rewind: Indicates if we should seek to the beginning of the stream. + """ + if rewind: + stream.seek(0, os.SEEK_SET) + + +def _check_mode(stream): + mode = getattr(stream, 'mode', None) + + if mode is not None and mode not in ('rb', 'r+b', 'rb+'): + raise ValueError( + "Cannot upload files opened in text mode: use " + "open(filename, mode='rb') or open(filename, mode='r+b')") + + +def _get_total_bytes(stream, size): + total_bytes = size + if total_bytes is None: + if hasattr(stream, 'fileno'): + total_bytes = os.fstat(stream.fileno()).st_size + else: + raise ValueError('total bytes could not be determined. Please ' + 'pass an explicit size.') + + return total_bytes + + +def _get_upload_headers(user_agent): + return { + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate', + 'User-Agent': user_agent, + 'content-type': 'application/json', + } + + +def _get_upload_metadata(source_format, schema, dataset, name): + return { + 'configuration': { + 'load': { + 'sourceFormat': source_format, + 'schema': { + 'fields': _build_schema_resource(schema), + }, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': name, + }, + }, + }, + } + + +def _raise_from_invalid_response(error, error_info=None): + """Re-wrap and raise an ``InvalidResponse`` exception. + + :type error: :exc:`google.resumable_media.InvalidResponse` + :param error: A caught exception from the ``google-resumable-media`` + library. + + :type error_info: str + :param error_info: (Optional) Extra information about the failed request. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` corresponding + to the failed status code + """ + response = error.response + faux_response = httplib2.Response({'status': response.status_code}) + raise make_exception(faux_response, response.content, + error_info=error_info, use_json=False) diff --git a/bigquery/nox.py b/bigquery/nox.py index 19a8f5761701..cf1d54b54309 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -67,7 +67,7 @@ def system_tests(session, python_version): session.install('.') # Run py.test against the system tests. - session.run('py.test', '--quiet', 'tests/system.py') + session.run('py.test', 'tests/system.py', '--pdb') @nox.session diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 7d967a3e4901..d03d1364cf40 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -368,6 +368,7 @@ def _make_transport(self, client): :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. + :rtype transport: :class:`~google.auth.transport.requests.AuthorizedSession` :returns: The transport (with credentials) that will From 74a4b37c7c301c481ab991fe2bc2fa00d4b7d820 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 10 May 2017 14:49:48 -0700 Subject: [PATCH 02/10] Adding docstrings and removing an unused method. --- bigquery/google/cloud/bigquery/table.py | 141 ++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 11 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index f155ebedd103..42d1ffe20810 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -843,6 +843,29 @@ def _make_transport(self, client): def _initiate_resumable_upload(self, client, stream, metadata, num_retries): + """Initiate a resumable upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: tuple + :returns: + Pair of + + * The :class:`~google.resumable_media.requests.ResumableUpload` + that was created + * The ``transport`` used to initiate the upload. + """ chunk_size = _DEFAULT_CHUNKSIZE transport = self._make_transport(client) headers = _get_upload_headers(client._connection.USER_AGENT) @@ -860,6 +883,25 @@ def _initiate_resumable_upload(self, client, stream, return upload, transport def _do_resumable_upload(self, client, stream, metadata, num_retries): + """Perform a resumable upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the final chunk + is uploaded. + """ upload, transport = self._initiate_resumable_upload( client, stream, metadata, num_retries) @@ -870,6 +912,32 @@ def _do_resumable_upload(self, client, stream, metadata, num_retries): def _do_multipart_upload(self, client, stream, metadata, size, num_retries): + """Perform a multipart upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the multipart + upload request. + :raises: :exc:`ValueError` if the ``stream`` has fewer than ``size`` + bytes remaining. + """ data = stream.read(size) if len(data) < size: msg = _READ_LESS_THAN_SIZE.format(size, len(data)) @@ -891,6 +959,35 @@ def _do_multipart_upload(self, client, stream, metadata, return response def _do_upload(self, client, stream, metadata, size, num_retries): + """Determine an upload strategy and then perform the upload. + + If ``size`` is :data:`None`, then a resumable upload will be used, + otherwise the content and the metadata will be uploaded + in a single multipart upload request. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: dict + :returns: The parsed JSON from the "200 OK" response. This will be the + **only** response in the multipart case and it will be the + **final** response in the resumable case. + """ if size is None: response = self._do_resumable_upload( client, stream, metadata, num_retries) @@ -1178,6 +1275,14 @@ def _maybe_rewind(stream, rewind=False): def _check_mode(stream): + """Check that a stream was opened in read-binary mode. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute + and is not among ``rb``, ``r+b`` or ``rb+``. + """ mode = getattr(stream, 'mode', None) if mode is not None and mode not in ('rb', 'r+b', 'rb+'): @@ -1186,19 +1291,15 @@ def _check_mode(stream): "open(filename, mode='rb') or open(filename, mode='r+b')") -def _get_total_bytes(stream, size): - total_bytes = size - if total_bytes is None: - if hasattr(stream, 'fileno'): - total_bytes = os.fstat(stream.fileno()).st_size - else: - raise ValueError('total bytes could not be determined. Please ' - 'pass an explicit size.') - - return total_bytes +def _get_upload_headers(user_agent): + """Get the headers for an upload request. + :type user_agent: str + :param user_agent: The user-agent for requests. -def _get_upload_headers(user_agent): + :rtype: dict + :returns: The headers to be used for the request. + """ return { 'Accept': 'application/json', 'Accept-Encoding': 'gzip, deflate', @@ -1208,6 +1309,24 @@ def _get_upload_headers(user_agent): def _get_upload_metadata(source_format, schema, dataset, name): + """Get base metadata for creating a table. + + :type source_format: str + :param source_format: one of 'CSV' or 'NEWLINE_DELIMITED_JSON'. + job configuration option. + + :type schema: list + :param schema: List of :class:`SchemaField` associated with a table. + + :type dataset: :class:`~google.cloud.bigquery.dataset.Dataset` + :param dataset: A dataset which contains a table. + + :type name: str + :param name: The name of the table. + + :rtype: dict + :returns: The metadata dictionary. + """ return { 'configuration': { 'load': { From 47cc00ac9ec46e0240030b09329198af84369490 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Wed, 28 Jun 2017 15:58:26 -0700 Subject: [PATCH 03/10] Resolve some rebase craziness --- bigquery/google/cloud/bigquery/table.py | 41 +- bigquery/setup.py | 3 + bigquery/tests/unit/test_table.py | 580 ++++++++++++------------ 3 files changed, 295 insertions(+), 329 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 42d1ffe20810..ab4e09bbce45 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -31,6 +31,7 @@ from google.cloud._helpers import _millis_from_datetime from google.cloud.exceptions import NotFound from google.cloud.exceptions import make_exception +from google.cloud.iterator import HTTPIterator from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery._helpers import _item_to_row from google.cloud.bigquery._helpers import _rows_page_start @@ -1211,46 +1212,6 @@ def _build_schema_resource(fields): info['fields'] = _build_schema_resource(field.fields) infos.append(info) return infos - - -def _item_to_row(iterator, resource): - """Convert a JSON row to the native object. - - .. note:: - - This assumes that the ``schema`` attribute has been - added to the iterator after being created, which - should be done by the caller. - - :type iterator: :class:`~google.cloud.iterator.Iterator` - :param iterator: The iterator that is currently in use. - - :type resource: dict - :param resource: An item to be converted to a row. - - :rtype: tuple - :returns: The next row in the page. - """ - return _row_from_json(resource, iterator.schema) - - -# pylint: disable=unused-argument -def _rows_page_start(iterator, page, response): - """Grab total rows after a :class:`~google.cloud.iterator.Page` started. - - :type iterator: :class:`~google.cloud.iterator.Iterator` - :param iterator: The iterator that is currently in use. - - :type page: :class:`~google.cloud.iterator.Page` - :param page: The page that was just created. - - :type response: dict - :param response: The JSON API response for a page of rows in a table. - """ - total_rows = response.get('totalRows') - if total_rows is not None: - total_rows = int(total_rows) - iterator.total_rows = total_rows # pylint: enable=unused-argument diff --git a/bigquery/setup.py b/bigquery/setup.py index 6d61064c88ba..c83cf3d32dd0 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -52,6 +52,9 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.25.0, < 0.26dev', + 'google-auth >= 1.0.0', + 'google-resumable-media >= 0.1.1', + 'requests >= 2.0.0', ] setup( diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index f535e8799628..183e65e4a502 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1553,295 +1553,297 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - def test_upload_from_file_text_mode_file_failure(self): - - class TextModeFile(object): - mode = 'r' - - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) - dataset = _Dataset(client) - file_obj = TextModeFile() - table = self._make_one(self.TABLE_NAME, dataset=dataset) - with self.assertRaises(ValueError): - table.upload_from_file(file_obj, 'CSV', size=1234) - - def test_upload_from_file_binary_mode_no_failure(self): - self._upload_from_file_helper(input_file_mode='r+b') - - def test_upload_from_file_size_failure(self): - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) - dataset = _Dataset(client) - file_obj = object() - table = self._make_one(self.TABLE_NAME, dataset=dataset) - with self.assertRaises(ValueError): - table.upload_from_file(file_obj, 'CSV', size=None) - - def test_upload_from_file_multipart_w_400(self): - import csv - import datetime - from six.moves.http_client import BAD_REQUEST - from google.cloud._testing import _NamedTemporaryFile - from google.cloud._helpers import UTC - from google.cloud.exceptions import BadRequest - - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - response = {'status': BAD_REQUEST} - conn = _Connection( - (response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) - dataset = _Dataset(client) - table = self._make_one(self.TABLE_NAME, dataset=dataset) - - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - with open(temp.name, 'rb') as file_obj: - with self.assertRaises(BadRequest): - table.upload_from_file( - file_obj, 'CSV', rewind=True) - - def _upload_from_file_helper(self, **kw): - import csv - import datetime - from six.moves.http_client import OK - from google.cloud._helpers import UTC - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.bigquery.table import SchemaField - - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - PATH = 'projects/%s/jobs' % (self.PROJECT,) - response = {'status': OK} - conn = _Connection( - (response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) - expected_job = object() - if 'client' in kw: - kw['client']._job = expected_job - else: - client._job = expected_job - input_file_mode = kw.pop('input_file_mode', 'rb') - dataset = _Dataset(client) - full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - age = SchemaField('age', 'INTEGER', mode='REQUIRED') - joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') - table = self._make_one(self.TABLE_NAME, dataset=dataset, - schema=[full_name, age, joined]) - ROWS = [ - ('Phred Phlyntstone', 32, WHEN), - ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), - ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), - ('Bhettye Rhubble', 27, None), - ] - - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerows(ROWS) - - with open(temp.name, input_file_mode) as file_obj: - BODY = file_obj.read() - explicit_size = kw.pop('_explicit_size', False) - if explicit_size: - kw['size'] = len(BODY) - job = table.upload_from_file( - file_obj, 'CSV', rewind=True, **kw) - - self.assertIs(job, expected_job) - return conn.http._requested, PATH, BODY - - def test_upload_from_file_w_bound_client_multipart(self): - import json - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._helpers import _to_bytes - - requested, PATH, BODY = self._upload_from_file_helper() - parse_chunk = _email_chunk_parser() - - self.assertEqual(len(requested), 1) - req = requested[0] - self.assertEqual(req['method'], 'POST') - uri = req['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/%s' % PATH) - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'multipart'}) - - ctype, boundary = [x.strip() - for x in req['headers']['content-type'].split(';')] - self.assertEqual(ctype, 'multipart/related') - self.assertTrue(boundary.startswith('boundary="==')) - self.assertTrue(boundary.endswith('=="')) - - divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - self.assertEqual(len(chunks), 2) - - text_msg = parse_chunk(chunks[0].strip()) - self.assertEqual(dict(text_msg._headers), - {'Content-Type': 'application/json', - 'MIME-Version': '1.0'}) - metadata = json.loads(text_msg._payload) - load_config = metadata['configuration']['load'] - DESTINATION_TABLE = { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, - } - self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - self.assertEqual(load_config['sourceFormat'], 'CSV') - - app_msg = parse_chunk(chunks[1].strip()) - self.assertEqual(dict(app_msg._headers), - {'Content-Type': 'application/octet-stream', - 'Content-Transfer-Encoding': 'binary', - 'MIME-Version': '1.0'}) - body = BODY.decode('ascii').rstrip() - body_lines = [line.strip() for line in body.splitlines()] - payload_lines = app_msg._payload.rstrip().splitlines() - self.assertEqual(payload_lines, body_lines) - - def test_upload_from_file_resumable_with_400(self): - import csv - import datetime - import mock - from six.moves.http_client import BAD_REQUEST - from google.cloud.exceptions import BadRequest - from google.cloud._helpers import UTC - from google.cloud._testing import _NamedTemporaryFile - - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - initial_response = {'status': BAD_REQUEST} - conn = _Connection( - (initial_response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) - - class _UploadConfig(object): - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'' # force resumable - dataset = _Dataset(client) - table = self._make_one(self.TABLE_NAME, dataset=dataset) - - with mock.patch('google.cloud.bigquery.table._UploadConfig', - new=_UploadConfig): - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - with open(temp.name, 'rb') as file_obj: - with self.assertRaises(BadRequest): - table.upload_from_file( - file_obj, 'CSV', rewind=True) - - # pylint: disable=too-many-statements - def test_upload_from_file_w_explicit_client_resumable(self): - import json - import mock - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - - UPLOAD_PATH = 'https://example.com/upload/test' - initial_response = {'status': OK, 'location': UPLOAD_PATH} - upload_response = {'status': OK} - conn = _Connection( - (initial_response, b'{}'), - (upload_response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) - - class _UploadConfig(object): - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'' # force resumable - - with mock.patch('google.cloud.bigquery.table._UploadConfig', - new=_UploadConfig): - orig_requested, PATH, BODY = self._upload_from_file_helper( - allow_jagged_rows=False, - allow_quoted_newlines=False, - create_disposition='CREATE_IF_NEEDED', - encoding='utf8', - field_delimiter=',', - ignore_unknown_values=False, - max_bad_records=0, - quote_character='"', - skip_leading_rows=1, - write_disposition='WRITE_APPEND', - client=client, - _explicit_size=True) - - self.assertEqual(len(orig_requested), 0) - - requested = conn.http._requested - self.assertEqual(len(requested), 2) - req = requested[0] - self.assertEqual(req['method'], 'POST') - uri = req['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/%s' % PATH) - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable'}) - - self.assertEqual(req['headers']['content-type'], 'application/json') - metadata = json.loads(req['body']) - load_config = metadata['configuration']['load'] - DESTINATION_TABLE = { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, - } - self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - self.assertEqual(load_config['sourceFormat'], 'CSV') - self.assertEqual(load_config['allowJaggedRows'], False) - self.assertEqual(load_config['allowQuotedNewlines'], False) - self.assertEqual(load_config['createDisposition'], 'CREATE_IF_NEEDED') - self.assertEqual(load_config['encoding'], 'utf8') - self.assertEqual(load_config['fieldDelimiter'], ',') - self.assertEqual(load_config['ignoreUnknownValues'], False) - self.assertEqual(load_config['maxBadRecords'], 0) - self.assertEqual(load_config['quote'], '"') - self.assertEqual(load_config['skipLeadingRows'], 1) - self.assertEqual(load_config['writeDisposition'], 'WRITE_APPEND') - - req = requested[1] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['uri'], UPLOAD_PATH) - headers = req['headers'] - length = len(BODY) - self.assertEqual(headers['Content-Type'], 'application/octet-stream') - self.assertEqual(headers['Content-Range'], - 'bytes 0-%d/%d' % (length - 1, length)) - self.assertEqual(headers['content-length'], '%d' % (length,)) - self.assertEqual(req['body'], BODY) + # Presently failing tests below. + + # def test_upload_from_file_text_mode_file_failure(self): + + # class TextModeFile(object): + # mode = 'r' + + # conn = _Connection() + # client = _Client(project=self.PROJECT, connection=conn) + # dataset = _Dataset(client) + # file_obj = TextModeFile() + # table = self._make_one(self.TABLE_NAME, dataset=dataset) + # with self.assertRaises(ValueError): + # table.upload_from_file(file_obj, 'CSV', size=1234) + + # def test_upload_from_file_binary_mode_no_failure(self): + # self._upload_from_file_helper(input_file_mode='r+b') + + # def test_upload_from_file_size_failure(self): + # conn = _Connection() + # client = _Client(project=self.PROJECT, connection=conn) + # dataset = _Dataset(client) + # file_obj = object() + # table = self._make_one(self.TABLE_NAME, dataset=dataset) + # with self.assertRaises(ValueError): + # table.upload_from_file(file_obj, 'CSV', size=None) + + # def test_upload_from_file_multipart_w_400(self): + # import csv + # import datetime + # from six.moves.http_client import BAD_REQUEST + # from google.cloud._testing import _NamedTemporaryFile + # from google.cloud._helpers import UTC + # from google.cloud.exceptions import BadRequest + + # WHEN_TS = 1437767599.006 + # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( + # tzinfo=UTC) + # response = {'status': BAD_REQUEST} + # conn = _Connection( + # (response, b'{}'), + # ) + # client = _Client(project=self.PROJECT, connection=conn) + # dataset = _Dataset(client) + # table = self._make_one(self.TABLE_NAME, dataset=dataset) + + # with _NamedTemporaryFile() as temp: + # with open(temp.name, 'w') as file_obj: + # writer = csv.writer(file_obj) + # writer.writerow(('full_name', 'age', 'joined')) + # writer.writerow(('Phred Phlyntstone', 32, WHEN)) + + # with open(temp.name, 'rb') as file_obj: + # with self.assertRaises(BadRequest): + # table.upload_from_file( + # file_obj, 'CSV', rewind=True) + + # def _upload_from_file_helper(self, **kw): + # import csv + # import datetime + # from six.moves.http_client import OK + # from google.cloud._helpers import UTC + # from google.cloud._testing import _NamedTemporaryFile + # from google.cloud.bigquery.table import SchemaField + + # WHEN_TS = 1437767599.006 + # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( + # tzinfo=UTC) + # PATH = 'projects/%s/jobs' % (self.PROJECT,) + # response = {'status': OK} + # conn = _Connection( + # (response, b'{}'), + # ) + # client = _Client(project=self.PROJECT, connection=conn) + # expected_job = object() + # if 'client' in kw: + # kw['client']._job = expected_job + # else: + # client._job = expected_job + # input_file_mode = kw.pop('input_file_mode', 'rb') + # dataset = _Dataset(client) + # full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + # age = SchemaField('age', 'INTEGER', mode='REQUIRED') + # joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') + # table = self._make_one(self.TABLE_NAME, dataset=dataset, + # schema=[full_name, age, joined]) + # ROWS = [ + # ('Phred Phlyntstone', 32, WHEN), + # ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), + # ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), + # ('Bhettye Rhubble', 27, None), + # ] + + # with _NamedTemporaryFile() as temp: + # with open(temp.name, 'w') as file_obj: + # writer = csv.writer(file_obj) + # writer.writerow(('full_name', 'age', 'joined')) + # writer.writerows(ROWS) + + # with open(temp.name, input_file_mode) as file_obj: + # BODY = file_obj.read() + # explicit_size = kw.pop('_explicit_size', False) + # if explicit_size: + # kw['size'] = len(BODY) + # job = table.upload_from_file( + # file_obj, 'CSV', rewind=True, **kw) + + # self.assertIs(job, expected_job) + # return conn.http._requested, PATH, BODY + + # def test_upload_from_file_w_bound_client_multipart(self): + # import json + # from six.moves.urllib.parse import parse_qsl + # from six.moves.urllib.parse import urlsplit + # from google.cloud._helpers import _to_bytes + + # requested, PATH, BODY = self._upload_from_file_helper() + # parse_chunk = _email_chunk_parser() + + # self.assertEqual(len(requested), 1) + # req = requested[0] + # self.assertEqual(req['method'], 'POST') + # uri = req['uri'] + # scheme, netloc, path, qs, _ = urlsplit(uri) + # self.assertEqual(scheme, 'http') + # self.assertEqual(netloc, 'example.com') + # self.assertEqual(path, '/%s' % PATH) + # self.assertEqual(dict(parse_qsl(qs)), + # {'uploadType': 'multipart'}) + + # ctype, boundary = [x.strip() + # for x in req['headers']['content-type'].split(';')] + # self.assertEqual(ctype, 'multipart/related') + # self.assertTrue(boundary.startswith('boundary="==')) + # self.assertTrue(boundary.endswith('=="')) + + # divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) + # chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog + # self.assertEqual(len(chunks), 2) + + # text_msg = parse_chunk(chunks[0].strip()) + # self.assertEqual(dict(text_msg._headers), + # {'Content-Type': 'application/json', + # 'MIME-Version': '1.0'}) + # metadata = json.loads(text_msg._payload) + # load_config = metadata['configuration']['load'] + # DESTINATION_TABLE = { + # 'projectId': self.PROJECT, + # 'datasetId': self.DS_NAME, + # 'tableId': self.TABLE_NAME, + # } + # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) + # self.assertEqual(load_config['sourceFormat'], 'CSV') + + # app_msg = parse_chunk(chunks[1].strip()) + # self.assertEqual(dict(app_msg._headers), + # {'Content-Type': 'application/octet-stream', + # 'Content-Transfer-Encoding': 'binary', + # 'MIME-Version': '1.0'}) + # body = BODY.decode('ascii').rstrip() + # body_lines = [line.strip() for line in body.splitlines()] + # payload_lines = app_msg._payload.rstrip().splitlines() + # self.assertEqual(payload_lines, body_lines) + + # def test_upload_from_file_resumable_with_400(self): + # import csv + # import datetime + # import mock + # from six.moves.http_client import BAD_REQUEST + # from google.cloud.exceptions import BadRequest + # from google.cloud._helpers import UTC + # from google.cloud._testing import _NamedTemporaryFile + + # WHEN_TS = 1437767599.006 + # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( + # tzinfo=UTC) + # initial_response = {'status': BAD_REQUEST} + # conn = _Connection( + # (initial_response, b'{}'), + # ) + # client = _Client(project=self.PROJECT, connection=conn) + + # class _UploadConfig(object): + # accept = ['*/*'] + # max_size = None + # resumable_multipart = True + # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' + # simple_multipart = True + # simple_path = u'' # force resumable + # dataset = _Dataset(client) + # table = self._make_one(self.TABLE_NAME, dataset=dataset) + + # with mock.patch('google.cloud.bigquery.table._UploadConfig', + # new=_UploadConfig): + # with _NamedTemporaryFile() as temp: + # with open(temp.name, 'w') as file_obj: + # writer = csv.writer(file_obj) + # writer.writerow(('full_name', 'age', 'joined')) + # writer.writerow(('Phred Phlyntstone', 32, WHEN)) + + # with open(temp.name, 'rb') as file_obj: + # with self.assertRaises(BadRequest): + # table.upload_from_file( + # file_obj, 'CSV', rewind=True) + + # # pylint: disable=too-many-statements + # def test_upload_from_file_w_explicit_client_resumable(self): + # import json + # import mock + # from six.moves.http_client import OK + # from six.moves.urllib.parse import parse_qsl + # from six.moves.urllib.parse import urlsplit + + # UPLOAD_PATH = 'https://example.com/upload/test' + # initial_response = {'status': OK, 'location': UPLOAD_PATH} + # upload_response = {'status': OK} + # conn = _Connection( + # (initial_response, b'{}'), + # (upload_response, b'{}'), + # ) + # client = _Client(project=self.PROJECT, connection=conn) + + # class _UploadConfig(object): + # accept = ['*/*'] + # max_size = None + # resumable_multipart = True + # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' + # simple_multipart = True + # simple_path = u'' # force resumable + + # with mock.patch('google.cloud.bigquery.table._UploadConfig', + # new=_UploadConfig): + # orig_requested, PATH, BODY = self._upload_from_file_helper( + # allow_jagged_rows=False, + # allow_quoted_newlines=False, + # create_disposition='CREATE_IF_NEEDED', + # encoding='utf8', + # field_delimiter=',', + # ignore_unknown_values=False, + # max_bad_records=0, + # quote_character='"', + # skip_leading_rows=1, + # write_disposition='WRITE_APPEND', + # client=client, + # _explicit_size=True) + + # self.assertEqual(len(orig_requested), 0) + + # requested = conn.http._requested + # self.assertEqual(len(requested), 2) + # req = requested[0] + # self.assertEqual(req['method'], 'POST') + # uri = req['uri'] + # scheme, netloc, path, qs, _ = urlsplit(uri) + # self.assertEqual(scheme, 'http') + # self.assertEqual(netloc, 'example.com') + # self.assertEqual(path, '/%s' % PATH) + # self.assertEqual(dict(parse_qsl(qs)), + # {'uploadType': 'resumable'}) + + # self.assertEqual(req['headers']['content-type'], 'application/json') + # metadata = json.loads(req['body']) + # load_config = metadata['configuration']['load'] + # DESTINATION_TABLE = { + # 'projectId': self.PROJECT, + # 'datasetId': self.DS_NAME, + # 'tableId': self.TABLE_NAME, + # } + # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) + # self.assertEqual(load_config['sourceFormat'], 'CSV') + # self.assertEqual(load_config['allowJaggedRows'], False) + # self.assertEqual(load_config['allowQuotedNewlines'], False) + # self.assertEqual(load_config['createDisposition'], 'CREATE_IF_NEEDED') + # self.assertEqual(load_config['encoding'], 'utf8') + # self.assertEqual(load_config['fieldDelimiter'], ',') + # self.assertEqual(load_config['ignoreUnknownValues'], False) + # self.assertEqual(load_config['maxBadRecords'], 0) + # self.assertEqual(load_config['quote'], '"') + # self.assertEqual(load_config['skipLeadingRows'], 1) + # self.assertEqual(load_config['writeDisposition'], 'WRITE_APPEND') + + # req = requested[1] + # self.assertEqual(req['method'], 'PUT') + # self.assertEqual(req['uri'], UPLOAD_PATH) + # headers = req['headers'] + # length = len(BODY) + # self.assertEqual(headers['Content-Type'], 'application/octet-stream') + # self.assertEqual(headers['Content-Range'], + # 'bytes 0-%d/%d' % (length - 1, length)) + # self.assertEqual(headers['content-length'], '%d' % (length,)) + # self.assertEqual(req['body'], BODY) # pylint: enable=too-many-statements def test_upload_from_file_w_jobid(self): From 8eccb2816ef02357af606f82bd64237e4b5119e5 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Wed, 28 Jun 2017 17:07:13 -0700 Subject: [PATCH 04/10] Add initial simple test for uploading a file to a table --- bigquery/tests/unit/test_table_upload.py | 97 ++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 bigquery/tests/unit/test_table_upload.py diff --git a/bigquery/tests/unit/test_table_upload.py b/bigquery/tests/unit/test_table_upload.py new file mode 100644 index 000000000000..d41c887587ae --- /dev/null +++ b/bigquery/tests/unit/test_table_upload.py @@ -0,0 +1,97 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv +import io +import json + +from google import resumable_media +import google.auth.transport.requests +import mock +import requests +from six.moves import http_client +from six.moves import StringIO + +FIELDS = (u'name', u'age') +ROWS = [ + (u'Phred Phlyntstone', 32), + (u'Bharney Rhubble', 33), + (u'Wylma Phlyntstone', 29), + (u'Bhettye Rhubble', 27), +] + + +def rows_to_csv(fields, rows): + """Convert the rows into a CSV-format unicode string.""" + out = StringIO() + writer = csv.writer(out) + writer.writerow(fields) + writer.writerows(rows) + return out.getvalue() + + +def make_table(): + from google.cloud.bigquery import _http + from google.cloud.bigquery import client + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + mock_connection = mock.Mock(spec=_http.Connection) + mock_client = mock.Mock(spec=client.Client) + mock_client._connection = mock_connection + mock_client._credentials = mock.sentinel.credentials + mock_client.project = 'project_id' + + dataset = dataset.Dataset('test_dataset', mock_client) + table = table.Table('test_table', dataset) + + return table + + +def make_response(status_code, content, headers={}): + return mock.Mock( + content=content, headers=headers, status_code=status_code, + spec=requests.Response) + + +def make_resumable_upload_responses(size): + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + initial_response = make_response( + http_client.OK, b'', {'location': resumable_url}) + data_response = make_response( + resumable_media.PERMANENT_REDIRECT, + b'', {'range': 'bytes=0-{:d}'.format(size - 1)}) + final_response = make_response( + http_client.OK, + json.dumps({'size': size}), + {'Content-Type': 'application/json'}) + return [initial_response, data_response, final_response] + + +def test_upload_from_file_simple(): + table = make_table() + + csv_file = io.BytesIO( + rows_to_csv(FIELDS, ROWS).encode('utf-8')) + csv_file_size = len(csv_file.getvalue()) + + mock_transport = mock.Mock( + spec=google.auth.transport.requests.AuthorizedSession) + transport_patch = mock.patch.object( + table, '_make_transport', return_value=mock_transport) + + with transport_patch: + mock_transport.request.side_effect = make_resumable_upload_responses( + csv_file_size) + table.upload_from_file(csv_file, source_format='CSV') From 14065bf87fd0a03db6efe030b3a847c99b398d46 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Wed, 19 Jul 2017 20:14:07 -0700 Subject: [PATCH 05/10] Add complete tests for resumable --- bigquery/google/cloud/bigquery/table.py | 12 +- bigquery/nox.py | 4 +- bigquery/tests/unit/test_table.py | 672 +++++++++++------------ bigquery/tests/unit/test_table_upload.py | 97 ---- 4 files changed, 327 insertions(+), 458 deletions(-) delete mode 100644 bigquery/tests/unit/test_table_upload.py diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index ab4e09bbce45..ed8ec3fcbdc1 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -15,7 +15,6 @@ """Define API Datasets.""" import datetime -import json import os import httplib2 @@ -26,7 +25,6 @@ from google.resumable_media.requests import MultipartUpload from google.resumable_media.requests import ResumableUpload -from google.cloud._helpers import _bytes_to_unicode from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime from google.cloud.exceptions import NotFound @@ -50,6 +48,7 @@ _READ_LESS_THAN_SIZE = ( 'Size {:d} was specified but the file-like object only had ' '{:d} bytes remaining.') +_DEFAULT_NUM_RETRIES = 6 class Table(object): @@ -1004,7 +1003,7 @@ def upload_from_file(self, source_format, rewind=False, size=None, - num_retries=6, + num_retries=_DEFAULT_NUM_RETRIES, allow_jagged_rows=None, allow_quoted_newlines=None, create_disposition=None, @@ -1215,13 +1214,6 @@ def _build_schema_resource(fields): # pylint: enable=unused-argument -def _convert_timestamp(value): - """Helper for :meth:`Table.insert_data`.""" - if isinstance(value, datetime.datetime): - value = _microseconds_from_datetime(value) * 1e-6 - return value - - def _maybe_rewind(stream, rewind=False): """Rewind the stream if desired. diff --git a/bigquery/nox.py b/bigquery/nox.py index cf1d54b54309..149b9443ca6c 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -40,8 +40,8 @@ def unit_tests(session, python_version): # Run py.test against the unit tests. session.run('py.test', '--quiet', '--cov=google.cloud.bigquery', '--cov=tests.unit', '--cov-append', - '--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97', - 'tests/unit', + '--cov-config=.coveragerc', '--cov-report=term-missing', '--cov-fail-under=97', + 'tests/unit', *session.posargs ) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 183e65e4a502..1b650f868555 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import email +import io +import json import unittest +import mock +from six.moves import http_client +import pytest + class _SchemaBase(object): @@ -1553,314 +1560,319 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - # Presently failing tests below. - - # def test_upload_from_file_text_mode_file_failure(self): - - # class TextModeFile(object): - # mode = 'r' - - # conn = _Connection() - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # file_obj = TextModeFile() - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - # with self.assertRaises(ValueError): - # table.upload_from_file(file_obj, 'CSV', size=1234) - - # def test_upload_from_file_binary_mode_no_failure(self): - # self._upload_from_file_helper(input_file_mode='r+b') - - # def test_upload_from_file_size_failure(self): - # conn = _Connection() - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # file_obj = object() - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - # with self.assertRaises(ValueError): - # table.upload_from_file(file_obj, 'CSV', size=None) - - # def test_upload_from_file_multipart_w_400(self): - # import csv - # import datetime - # from six.moves.http_client import BAD_REQUEST - # from google.cloud._testing import _NamedTemporaryFile - # from google.cloud._helpers import UTC - # from google.cloud.exceptions import BadRequest - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # response = {'status': BAD_REQUEST} - # conn = _Connection( - # (response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - # with open(temp.name, 'rb') as file_obj: - # with self.assertRaises(BadRequest): - # table.upload_from_file( - # file_obj, 'CSV', rewind=True) - - # def _upload_from_file_helper(self, **kw): - # import csv - # import datetime - # from six.moves.http_client import OK - # from google.cloud._helpers import UTC - # from google.cloud._testing import _NamedTemporaryFile - # from google.cloud.bigquery.table import SchemaField - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # PATH = 'projects/%s/jobs' % (self.PROJECT,) - # response = {'status': OK} - # conn = _Connection( - # (response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - # expected_job = object() - # if 'client' in kw: - # kw['client']._job = expected_job - # else: - # client._job = expected_job - # input_file_mode = kw.pop('input_file_mode', 'rb') - # dataset = _Dataset(client) - # full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - # age = SchemaField('age', 'INTEGER', mode='REQUIRED') - # joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') - # table = self._make_one(self.TABLE_NAME, dataset=dataset, - # schema=[full_name, age, joined]) - # ROWS = [ - # ('Phred Phlyntstone', 32, WHEN), - # ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), - # ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), - # ('Bhettye Rhubble', 27, None), - # ] - - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerows(ROWS) - - # with open(temp.name, input_file_mode) as file_obj: - # BODY = file_obj.read() - # explicit_size = kw.pop('_explicit_size', False) - # if explicit_size: - # kw['size'] = len(BODY) - # job = table.upload_from_file( - # file_obj, 'CSV', rewind=True, **kw) - - # self.assertIs(job, expected_job) - # return conn.http._requested, PATH, BODY - - # def test_upload_from_file_w_bound_client_multipart(self): - # import json - # from six.moves.urllib.parse import parse_qsl - # from six.moves.urllib.parse import urlsplit - # from google.cloud._helpers import _to_bytes - - # requested, PATH, BODY = self._upload_from_file_helper() - # parse_chunk = _email_chunk_parser() - - # self.assertEqual(len(requested), 1) - # req = requested[0] - # self.assertEqual(req['method'], 'POST') - # uri = req['uri'] - # scheme, netloc, path, qs, _ = urlsplit(uri) - # self.assertEqual(scheme, 'http') - # self.assertEqual(netloc, 'example.com') - # self.assertEqual(path, '/%s' % PATH) - # self.assertEqual(dict(parse_qsl(qs)), - # {'uploadType': 'multipart'}) - - # ctype, boundary = [x.strip() - # for x in req['headers']['content-type'].split(';')] - # self.assertEqual(ctype, 'multipart/related') - # self.assertTrue(boundary.startswith('boundary="==')) - # self.assertTrue(boundary.endswith('=="')) - - # divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - # chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - # self.assertEqual(len(chunks), 2) - - # text_msg = parse_chunk(chunks[0].strip()) - # self.assertEqual(dict(text_msg._headers), - # {'Content-Type': 'application/json', - # 'MIME-Version': '1.0'}) - # metadata = json.loads(text_msg._payload) - # load_config = metadata['configuration']['load'] - # DESTINATION_TABLE = { - # 'projectId': self.PROJECT, - # 'datasetId': self.DS_NAME, - # 'tableId': self.TABLE_NAME, - # } - # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - # self.assertEqual(load_config['sourceFormat'], 'CSV') - - # app_msg = parse_chunk(chunks[1].strip()) - # self.assertEqual(dict(app_msg._headers), - # {'Content-Type': 'application/octet-stream', - # 'Content-Transfer-Encoding': 'binary', - # 'MIME-Version': '1.0'}) - # body = BODY.decode('ascii').rstrip() - # body_lines = [line.strip() for line in body.splitlines()] - # payload_lines = app_msg._payload.rstrip().splitlines() - # self.assertEqual(payload_lines, body_lines) - - # def test_upload_from_file_resumable_with_400(self): - # import csv - # import datetime - # import mock - # from six.moves.http_client import BAD_REQUEST - # from google.cloud.exceptions import BadRequest - # from google.cloud._helpers import UTC - # from google.cloud._testing import _NamedTemporaryFile - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # initial_response = {'status': BAD_REQUEST} - # conn = _Connection( - # (initial_response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - - # class _UploadConfig(object): - # accept = ['*/*'] - # max_size = None - # resumable_multipart = True - # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - # simple_multipart = True - # simple_path = u'' # force resumable - # dataset = _Dataset(client) - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - - # with mock.patch('google.cloud.bigquery.table._UploadConfig', - # new=_UploadConfig): - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - # with open(temp.name, 'rb') as file_obj: - # with self.assertRaises(BadRequest): - # table.upload_from_file( - # file_obj, 'CSV', rewind=True) - - # # pylint: disable=too-many-statements - # def test_upload_from_file_w_explicit_client_resumable(self): - # import json - # import mock - # from six.moves.http_client import OK - # from six.moves.urllib.parse import parse_qsl - # from six.moves.urllib.parse import urlsplit - - # UPLOAD_PATH = 'https://example.com/upload/test' - # initial_response = {'status': OK, 'location': UPLOAD_PATH} - # upload_response = {'status': OK} - # conn = _Connection( - # (initial_response, b'{}'), - # (upload_response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - - # class _UploadConfig(object): - # accept = ['*/*'] - # max_size = None - # resumable_multipart = True - # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - # simple_multipart = True - # simple_path = u'' # force resumable - - # with mock.patch('google.cloud.bigquery.table._UploadConfig', - # new=_UploadConfig): - # orig_requested, PATH, BODY = self._upload_from_file_helper( - # allow_jagged_rows=False, - # allow_quoted_newlines=False, - # create_disposition='CREATE_IF_NEEDED', - # encoding='utf8', - # field_delimiter=',', - # ignore_unknown_values=False, - # max_bad_records=0, - # quote_character='"', - # skip_leading_rows=1, - # write_disposition='WRITE_APPEND', - # client=client, - # _explicit_size=True) - - # self.assertEqual(len(orig_requested), 0) - - # requested = conn.http._requested - # self.assertEqual(len(requested), 2) - # req = requested[0] - # self.assertEqual(req['method'], 'POST') - # uri = req['uri'] - # scheme, netloc, path, qs, _ = urlsplit(uri) - # self.assertEqual(scheme, 'http') - # self.assertEqual(netloc, 'example.com') - # self.assertEqual(path, '/%s' % PATH) - # self.assertEqual(dict(parse_qsl(qs)), - # {'uploadType': 'resumable'}) - - # self.assertEqual(req['headers']['content-type'], 'application/json') - # metadata = json.loads(req['body']) - # load_config = metadata['configuration']['load'] - # DESTINATION_TABLE = { - # 'projectId': self.PROJECT, - # 'datasetId': self.DS_NAME, - # 'tableId': self.TABLE_NAME, - # } - # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - # self.assertEqual(load_config['sourceFormat'], 'CSV') - # self.assertEqual(load_config['allowJaggedRows'], False) - # self.assertEqual(load_config['allowQuotedNewlines'], False) - # self.assertEqual(load_config['createDisposition'], 'CREATE_IF_NEEDED') - # self.assertEqual(load_config['encoding'], 'utf8') - # self.assertEqual(load_config['fieldDelimiter'], ',') - # self.assertEqual(load_config['ignoreUnknownValues'], False) - # self.assertEqual(load_config['maxBadRecords'], 0) - # self.assertEqual(load_config['quote'], '"') - # self.assertEqual(load_config['skipLeadingRows'], 1) - # self.assertEqual(load_config['writeDisposition'], 'WRITE_APPEND') - - # req = requested[1] - # self.assertEqual(req['method'], 'PUT') - # self.assertEqual(req['uri'], UPLOAD_PATH) - # headers = req['headers'] - # length = len(BODY) - # self.assertEqual(headers['Content-Type'], 'application/octet-stream') - # self.assertEqual(headers['Content-Range'], - # 'bytes 0-%d/%d' % (length - 1, length)) - # self.assertEqual(headers['content-length'], '%d' % (length,)) - # self.assertEqual(req['body'], BODY) - # pylint: enable=too-many-statements - - def test_upload_from_file_w_jobid(self): - import json - from google.cloud._helpers import _to_bytes - - requested, PATH, BODY = self._upload_from_file_helper(job_name='foo') - parse_chunk = _email_chunk_parser() - req = requested[0] - ctype, boundary = [x.strip() - for x in req['headers']['content-type'].split(';')] - divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - text_msg = parse_chunk(chunks[0].strip()) - metadata = json.loads(text_msg._payload) - load_config = metadata['configuration']['load'] - self.assertEqual(load_config['jobReference'], {'jobId': 'foo'}) + +class TestTableUpload(object): + + @staticmethod + def _make_table(): + from google.cloud.bigquery import _http + from google.cloud.bigquery import client + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + connection = mock.create_autospec(_http.Connection, instance=True) + client = mock.create_autospec(client.Client, instance=True) + client._connection = connection + client._credentials = mock.sentinel.credentials + client.project = 'project_id' + + dataset = dataset.Dataset('test_dataset', client) + table = table.Table('test_table', dataset) + + return table + + @staticmethod + def _make_response(status_code, content='', headers={}): + """Make a mock HTTP response.""" + import requests + response = mock.create_autospec(requests.Response, instance=True) + response.content = content.encode('utf-8') + response.headers = headers + response.status_code = status_code + return response + + @classmethod + def _make_do_upload_patch(cls, table, method, side_effect=None): + """Patches the low-level upload helpers.""" + if side_effect is None: + side_effect = [cls._make_response( + http_client.OK, + json.dumps({}), + {'Content-Type': 'application/json'})] + return mock.patch.object( + table, method, side_effect=side_effect, autospec=True) + + EXPECTED_CONFIGURATION = { + 'configuration': { + 'load': { + 'sourceFormat': 'CSV', + 'schema': {'fields': []}, + 'destinationTable': { + 'projectId': 'project_id', + 'datasetId': 'test_dataset', + 'tableId': 'test_table' + } + } + } + } + + @staticmethod + def _make_file_obj(): + return io.BytesIO(b'hello, is it me you\'re looking for?') + + # High-level tests + + def test_upload_from_file_resumable(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file(file_obj, source_format='CSV') + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_file_resumable_metadata(self): + table = self._make_table() + file_obj = self._make_file_obj() + + config_args = { + 'source_format': 'CSV', + 'allow_jagged_rows': False, + 'allow_quoted_newlines': False, + 'create_disposition': 'CREATE_IF_NEEDED', + 'encoding': 'utf8', + 'field_delimiter': ',', + 'ignore_unknown_values': False, + 'max_bad_records': 0, + 'quote_character': '"', + 'skip_leading_rows': 1, + 'write_disposition': 'WRITE_APPEND', + 'job_name': 'oddjob' + } + + expected_config = { + 'configuration': { + 'load': { + 'sourceFormat': config_args['source_format'], + 'schema': {'fields': []}, + 'destinationTable': { + 'projectId': table._dataset._client.project, + 'datasetId': table.dataset_name, + 'tableId': table.name + }, + 'allowJaggedRows': config_args['allow_jagged_rows'], + 'allowQuotedNewlines': + config_args['allow_quoted_newlines'], + 'createDisposition': config_args['create_disposition'], + 'encoding': config_args['encoding'], + 'fieldDelimiter': config_args['field_delimiter'], + 'ignoreUnknownValues': + config_args['ignore_unknown_values'], + 'maxBadRecords': config_args['max_bad_records'], + 'quote': config_args['quote_character'], + 'skipLeadingRows': config_args['skip_leading_rows'], + 'writeDisposition': config_args['write_disposition'], + 'jobReference': {'jobId': config_args['job_name']} + } + } + } + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, **config_args) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + expected_config, + mock.ANY) + + def test_upload_from_file_multipart(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_size = 10 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_multipart_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', size=file_obj_size) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_from_file_with_retries(self): + table = self._make_table() + file_obj = self._make_file_obj() + num_retries = 20 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', num_retries=num_retries) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + num_retries) + + def test_upload_from_file_with_rewind(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj.seek(2) + + with self._make_do_upload_patch(table, '_do_resumable_upload'): + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert file_obj.tell() == 0 + + def test_upload_from_file_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + table = self._make_table() + file_obj = self._make_file_obj() + + response = self._make_response( + content='Someone is already in this spot.', + status_code=http_client.CONFLICT) + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload', + side_effect=InvalidResponse(response)) + + with do_upload_patch, pytest.raises(exceptions.Conflict) as exc_info: + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert exc_info.value.message == response.content.decode('utf-8') + assert exc_info.value.errors == [] + + def test_upload_from_file_bad_mode(self): + table = self._make_table() + file_obj = mock.Mock(spec=['mode']) + file_obj.mode = 'x' + + with pytest.raises(ValueError): + table.upload_from_file( + file_obj, source_format='CSV',) + + # Low-level tests + + @classmethod + def _make_resumable_upload_responses(cls, size): + """Make a series of responses for a successful resumable upload.""" + from google import resumable_media + + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + initial_response = cls._make_response( + http_client.OK, '', {'location': resumable_url}) + data_response = cls._make_response( + resumable_media.PERMANENT_REDIRECT, + '', {'range': 'bytes=0-{:d}'.format(size - 1)}) + final_response = cls._make_response( + http_client.OK, + json.dumps({'size': size}), + {'Content-Type': 'application/json'}) + return [initial_response, data_response, final_response] + + @staticmethod + def _make_transport_patch(table, responses=None): + """Patch a table's _make_transport method to return given responses.""" + import google.auth.transport.requests + + transport = mock.create_autospec( + google.auth.transport.requests.AuthorizedSession, instance=True) + transport.request.side_effect = responses + return mock.patch.object( + table, '_make_transport', return_value=transport, autospec=True) + + def test__do_resumable_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = self._make_resumable_upload_responses(file_obj_len) + + with self._make_transport_patch(table, responses) as transport: + result = table._do_resumable_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + None) + + assert json.loads(result.content) == {'size': file_obj_len} + + # Verify that configuration data was passed in with the initial + # request. + transport.return_value.request.assert_any_call( + 'POST', + mock.ANY, + data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'), + headers=mock.ANY) + + def test__do_multipart_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = [self._make_response(http_client.OK)] + + with self._make_transport_patch(table, responses) as transport: + table._do_multipart_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_len, + None) + + # Verify that configuration data was passed in with the initial + # request. + request_args = transport.return_value.request.mock_calls[0][2] + request_data = request_args['data'].decode('utf-8') + request_headers = request_args['headers'] + + request_content = email.message_from_string( + 'Content-Type: {}\r\n{}'.format( + request_headers['content-type'].decode('utf-8'), + request_data)) + + # There should be two payloads: the configuration and the binary daya. + configuration_data = request_content.get_payload(0).get_payload() + binary_data = request_content.get_payload(1).get_payload() + + assert json.loads(configuration_data) == self.EXPECTED_CONFIGURATION + assert binary_data.encode('utf-8') == file_obj.getvalue() + + def test__do_multipart_upload_wrong_size(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + + with pytest.raises(ValueError): + table._do_multipart_upload( + table._dataset._client, + file_obj, + {}, + file_obj_len+1, + None) class Test_parse_schema_resource(unittest.TestCase, _SchemaBase): @@ -2018,37 +2030,14 @@ def project(self): return self._client.project -class _Responder(object): - - def __init__(self, *responses): - self._responses = responses[:] - self._requested = [] - - def _respond(self, **kw): - self._requested.append(kw) - response, self._responses = self._responses[0], self._responses[1:] - return response - - -class _HTTP(_Responder): - - connections = {} # For google-apitools debugging. - - def request(self, uri, method, headers, body, **kw): - if hasattr(body, 'read'): - body = body.read() - return self._respond(uri=uri, method=method, headers=headers, - body=body, **kw) - - -class _Connection(_Responder): +class _Connection(object): API_BASE_URL = 'http://example.com' USER_AGENT = 'testing 1.2.3' def __init__(self, *responses): - super(_Connection, self).__init__(*responses) - self.http = _HTTP(*responses) + self._responses = responses[:] + self._requested = [] def api_request(self, **kw): from google.cloud.exceptions import NotFound @@ -2072,18 +2061,3 @@ def build_api_url(self, path, query_params=None, qs = urlencode(query_params or {}) scheme, netloc, _, _, _ = urlsplit(api_base_url) return urlunsplit((scheme, netloc, path, qs, '')) - - -def _email_chunk_parser(): - import six - - if six.PY3: # pragma: NO COVER Python3 - from email.parser import BytesParser - - parser = BytesParser() - return parser.parsebytes - else: - from email.parser import Parser - - parser = Parser() - return parser.parsestr diff --git a/bigquery/tests/unit/test_table_upload.py b/bigquery/tests/unit/test_table_upload.py deleted file mode 100644 index d41c887587ae..000000000000 --- a/bigquery/tests/unit/test_table_upload.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import csv -import io -import json - -from google import resumable_media -import google.auth.transport.requests -import mock -import requests -from six.moves import http_client -from six.moves import StringIO - -FIELDS = (u'name', u'age') -ROWS = [ - (u'Phred Phlyntstone', 32), - (u'Bharney Rhubble', 33), - (u'Wylma Phlyntstone', 29), - (u'Bhettye Rhubble', 27), -] - - -def rows_to_csv(fields, rows): - """Convert the rows into a CSV-format unicode string.""" - out = StringIO() - writer = csv.writer(out) - writer.writerow(fields) - writer.writerows(rows) - return out.getvalue() - - -def make_table(): - from google.cloud.bigquery import _http - from google.cloud.bigquery import client - from google.cloud.bigquery import dataset - from google.cloud.bigquery import table - - mock_connection = mock.Mock(spec=_http.Connection) - mock_client = mock.Mock(spec=client.Client) - mock_client._connection = mock_connection - mock_client._credentials = mock.sentinel.credentials - mock_client.project = 'project_id' - - dataset = dataset.Dataset('test_dataset', mock_client) - table = table.Table('test_table', dataset) - - return table - - -def make_response(status_code, content, headers={}): - return mock.Mock( - content=content, headers=headers, status_code=status_code, - spec=requests.Response) - - -def make_resumable_upload_responses(size): - resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' - initial_response = make_response( - http_client.OK, b'', {'location': resumable_url}) - data_response = make_response( - resumable_media.PERMANENT_REDIRECT, - b'', {'range': 'bytes=0-{:d}'.format(size - 1)}) - final_response = make_response( - http_client.OK, - json.dumps({'size': size}), - {'Content-Type': 'application/json'}) - return [initial_response, data_response, final_response] - - -def test_upload_from_file_simple(): - table = make_table() - - csv_file = io.BytesIO( - rows_to_csv(FIELDS, ROWS).encode('utf-8')) - csv_file_size = len(csv_file.getvalue()) - - mock_transport = mock.Mock( - spec=google.auth.transport.requests.AuthorizedSession) - transport_patch = mock.patch.object( - table, '_make_transport', return_value=mock_transport) - - with transport_patch: - mock_transport.request.side_effect = make_resumable_upload_responses( - csv_file_size) - table.upload_from_file(csv_file, source_format='CSV') From 053a17ef610df7779ba487be9248bbf5d5f1d7c0 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 20 Jul 2017 11:39:25 -0700 Subject: [PATCH 06/10] Restoring nox.py for BigQuery and bumping coverage to 100%. --- bigquery/nox.py | 28 +++-- bigquery/tests/unit/test_table.py | 172 +++++++++++++++++++++++++++--- storage/tests/unit/test_blob.py | 8 +- 3 files changed, 181 insertions(+), 27 deletions(-) diff --git a/bigquery/nox.py b/bigquery/nox.py index 149b9443ca6c..6684d415eb87 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -19,7 +19,9 @@ import nox -LOCAL_DEPS = ('../core/',) +LOCAL_DEPS = ( + os.path.join('..', 'core'), +) @nox.session @@ -38,10 +40,17 @@ def unit_tests(session, python_version): session.install('-e', '.') # Run py.test against the unit tests. - session.run('py.test', '--quiet', - '--cov=google.cloud.bigquery', '--cov=tests.unit', '--cov-append', - '--cov-config=.coveragerc', '--cov-report=term-missing', '--cov-fail-under=97', - 'tests/unit', *session.posargs + session.run( + 'py.test', + '--quiet', + '--cov=google.cloud.bigquery', + '--cov=tests.unit', + '--cov-append', + '--cov-config=.coveragerc', + '--cov-report=', + '--cov-fail-under=97', + os.path.join('tests', 'unit'), + *session.posargs ) @@ -63,11 +72,14 @@ def system_tests(session, python_version): # Install all test dependencies, then install this package into the # virutalenv's dist-packages. session.install('mock', 'pytest', *LOCAL_DEPS) - session.install('../storage/', '../test_utils/') + session.install( + os.path.join('..', 'storage'), + os.path.join('..', 'test_utils'), + ) session.install('.') # Run py.test against the system tests. - session.run('py.test', 'tests/system.py', '--pdb') + session.run('py.test', '--quiet', os.path.join('tests', 'system.py')) @nox.session @@ -81,7 +93,7 @@ def lint(session): session.install('flake8', 'pylint', 'gcp-devrel-py-tools', *LOCAL_DEPS) session.install('.') - session.run('flake8', 'google/cloud/bigquery') + session.run('flake8', os.path.join('google', 'cloud', 'bigquery')) session.run('flake8', 'tests') session.run( 'gcp-devrel-py-tools', 'run-pylint', diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 1b650f868555..aab711913c91 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -38,7 +38,8 @@ def _verifySchema(self, schema, resource): class TestTable(unittest.TestCase, _SchemaBase): - PROJECT = 'project' + + PROJECT = 'prahj-ekt' DS_NAME = 'dataset-name' TABLE_NAME = 'table-name' @@ -1560,6 +1561,161 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) + @mock.patch('google.auth.transport.requests.AuthorizedSession') + def test__make_transport(self, session_factory): + client = mock.Mock(spec=[u'_credentials']) + table = self._make_one(self.TABLE_NAME, None) + transport = table._make_transport(client) + + self.assertIs(transport, session_factory.return_value) + session_factory.assert_called_once_with(client._credentials) + + @staticmethod + def _mock_requests_response(status_code, headers, content=b''): + return mock.Mock( + content=content, headers=headers, status_code=status_code, + spec=['content', 'headers', 'status_code']) + + def _mock_transport(self, status_code, headers, content=b''): + fake_transport = mock.Mock(spec=['request']) + fake_response = self._mock_requests_response( + status_code, headers, content=content) + fake_transport.request.return_value = fake_response + return fake_transport + + def _initiate_resumable_upload_helper(self, num_retries=None): + from google.resumable_media.requests import ResumableUpload + from google.cloud.bigquery.table import _DEFAULT_CHUNKSIZE + from google.cloud.bigquery.table import _GENERIC_CONTENT_TYPE + from google.cloud.bigquery.table import _get_upload_headers + from google.cloud.bigquery.table import _get_upload_metadata + + connection = _Connection() + client = _Client(self.PROJECT, connection=connection) + dataset = _Dataset(client) + table = self._make_one(self.TABLE_NAME, dataset) + + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=hey-you' + response_headers = {'location': resumable_url} + fake_transport = self._mock_transport( + http_client.OK, response_headers) + table._make_transport = mock.Mock( + return_value=fake_transport, spec=[]) + + # Create some mock arguments and call the method under test. + data = b'goodbye gudbi gootbee' + stream = io.BytesIO(data) + metadata = _get_upload_metadata( + 'CSV', table._schema, table._dataset, table.name) + upload, transport = table._initiate_resumable_upload( + client, stream, metadata, num_retries) + + # Check the returned values. + self.assertIsInstance(upload, ResumableUpload) + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + self.PROJECT + + '/jobs?uploadType=resumable') + self.assertEqual(upload.upload_url, upload_url) + expected_headers = _get_upload_headers(connection.USER_AGENT) + self.assertEqual(upload._headers, expected_headers) + self.assertFalse(upload.finished) + self.assertEqual(upload._chunk_size, _DEFAULT_CHUNKSIZE) + self.assertIs(upload._stream, stream) + self.assertIsNone(upload._total_bytes) + self.assertEqual(upload._content_type, _GENERIC_CONTENT_TYPE) + self.assertEqual(upload.resumable_url, resumable_url) + + retry_strategy = upload._retry_strategy + self.assertEqual(retry_strategy.max_sleep, 64.0) + if num_retries is None: + self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) + self.assertIsNone(retry_strategy.max_retries) + else: + self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertEqual(retry_strategy.max_retries, num_retries) + self.assertIs(transport, fake_transport) + # Make sure we never read from the stream. + self.assertEqual(stream.tell(), 0) + + # Check the mocks. + table._make_transport.assert_called_once_with(client) + request_headers = expected_headers.copy() + request_headers['x-upload-content-type'] = _GENERIC_CONTENT_TYPE + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=json.dumps(metadata).encode('utf-8'), + headers=request_headers, + ) + + def test__initiate_resumable_upload(self): + self._initiate_resumable_upload_helper() + + def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_upload_helper(num_retries=11) + + def _do_multipart_upload_success_helper( + self, get_boundary, num_retries=None): + from google.cloud.bigquery.table import _get_upload_headers + from google.cloud.bigquery.table import _get_upload_metadata + + connection = _Connection() + client = _Client(self.PROJECT, connection=connection) + dataset = _Dataset(client) + table = self._make_one(self.TABLE_NAME, dataset) + + # Create mocks to be checked for doing transport. + fake_transport = self._mock_transport(http_client.OK, {}) + table._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + + # Create some mock arguments. + data = b'Bzzzz-zap \x00\x01\xf4' + stream = io.BytesIO(data) + metadata = _get_upload_metadata( + 'CSV', table._schema, table._dataset, table.name) + size = len(data) + response = table._do_multipart_upload( + client, stream, metadata, size, num_retries) + + # Check the mocks and the returned value. + self.assertIs(response, fake_transport.request.return_value) + self.assertEqual(stream.tell(), size) + table._make_transport.assert_called_once_with(client) + get_boundary.assert_called_once_with() + + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + self.PROJECT + + '/jobs?uploadType=multipart') + payload = ( + b'--==0==\r\n' + + b'content-type: application/json; charset=UTF-8\r\n\r\n' + + json.dumps(metadata).encode('utf-8') + b'\r\n' + + b'--==0==\r\n' + + b'content-type: */*\r\n\r\n' + + data + b'\r\n' + + b'--==0==--') + headers = _get_upload_headers(connection.USER_AGENT) + headers['content-type'] = b'multipart/related; boundary="==0=="' + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=payload, + headers=headers, + ) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_retry(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary, num_retries=8) + class TestTableUpload(object): @@ -1996,9 +2152,6 @@ def __init__(self, project='project', connection=None): self.project = project self._connection = connection - def job_from_resource(self, resource): # pylint: disable=unused-argument - return self._job - def run_sync_query(self, query): return _Query(query, self) @@ -2050,14 +2203,3 @@ def api_request(self, **kw): raise NotFound('miss') else: return response - - def build_api_url(self, path, query_params=None, - api_base_url=API_BASE_URL): - from six.moves.urllib.parse import urlencode - from six.moves.urllib.parse import urlsplit - from six.moves.urllib.parse import urlunsplit - - # Mimic the build_api_url interface. - qs = urlencode(query_params or {}) - scheme, netloc, _, _, _ = urlsplit(api_base_url) - return urlunsplit((scheme, netloc, path, qs, '')) diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index 250a05bd28f4..e2227adbd94a 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -775,7 +775,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None, blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments. - client = mock.sentinel.mock + client = mock.sentinel.client data = b'data here hear hier' stream = io.BytesIO(data) content_type = u'application/xml' @@ -865,7 +865,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments and call the method under test. - client = mock.sentinel.mock + client = mock.sentinel.client data = b'hello hallo halo hi-low' stream = io.BytesIO(data) content_type = u'text/plain' @@ -1033,7 +1033,7 @@ def _do_resumable_helper(self, use_size=False, num_retries=None): blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments and call the method under test. - client = mock.sentinel.mock + client = mock.sentinel.client stream = io.BytesIO(data) content_type = u'text/html' response = blob._do_resumable_upload( @@ -1271,7 +1271,7 @@ def _create_resumable_upload_session_helper(self, origin=None, # Create some mock arguments and call the method under test. content_type = u'text/plain' size = 10000 - client = mock.sentinel.mock + client = mock.sentinel.client new_url = blob.create_resumable_upload_session( content_type=content_type, size=size, origin=origin, client=client) From 3325120277cdc4d5385e6523a00899a5bcdf022f Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 20 Jul 2017 11:41:10 -0700 Subject: [PATCH 07/10] Fixing json.loads() bug with bytes/unicode. --- bigquery/tests/unit/test_table.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index aab711913c91..ba79a55e4bc6 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1975,7 +1975,8 @@ def test__do_resumable_upload(self): self.EXPECTED_CONFIGURATION, None) - assert json.loads(result.content) == {'size': file_obj_len} + content = result.content.decode('utf-8') + assert json.loads(content) == {'size': file_obj_len} # Verify that configuration data was passed in with the initial # request. From 2ae7cf377ff6abcfb5b0310b77111406a9ae810e Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 20 Jul 2017 20:17:10 -0700 Subject: [PATCH 08/10] Adding note to `TestTableUpload` class. --- bigquery/tests/unit/test_table.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index ba79a55e4bc6..620eee11fd3d 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1718,6 +1718,9 @@ def test__do_multipart_upload_with_retry(self, get_boundary): class TestTableUpload(object): + # NOTE: This is a "partner" to `TestTable` meant to test some of the + # "upload" portions of `Table`. It also uses `pytest`-style tests + # rather than `unittest`-style. @staticmethod def _make_table(): From 3fc3e691740e2b730c651af184731f1582a0957a Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 21 Jul 2017 11:06:59 -0700 Subject: [PATCH 09/10] Upgrading google-resumable-media dependency to 0.2.1. --- bigquery/setup.py | 2 +- storage/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/setup.py b/bigquery/setup.py index c83cf3d32dd0..eeb2d90549d8 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.25.0, < 0.26dev', 'google-auth >= 1.0.0', - 'google-resumable-media >= 0.1.1', + 'google-resumable-media >= 0.2.1', 'requests >= 2.0.0', ] diff --git a/storage/setup.py b/storage/setup.py index d18624f3c13d..8d11055fac77 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.25.0, < 0.26dev', 'google-auth >= 1.0.0', - 'google-resumable-media >= 0.1.1', + 'google-resumable-media >= 0.2.1', 'requests >= 2.0.0', ] From a3b4a3f8bb788cee98be49ba112ab9aea57469d7 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 21 Jul 2017 13:11:43 -0700 Subject: [PATCH 10/10] Fixing bug: only added schema key if schema is non-empty. --- bigquery/google/cloud/bigquery/table.py | 25 +++++----- bigquery/nox.py | 7 ++- bigquery/tests/unit/test_table.py | 66 ++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 14 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index ed8ec3fcbdc1..f7752bb8fc36 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1280,19 +1280,22 @@ def _get_upload_metadata(source_format, schema, dataset, name): :rtype: dict :returns: The metadata dictionary. """ + load_config = { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': name, + }, + } + if schema: + load_config['schema'] = { + 'fields': _build_schema_resource(schema), + } + return { 'configuration': { - 'load': { - 'sourceFormat': source_format, - 'schema': { - 'fields': _build_schema_resource(schema), - }, - 'destinationTable': { - 'projectId': dataset.project, - 'datasetId': dataset.name, - 'tableId': name, - }, - }, + 'load': load_config, }, } diff --git a/bigquery/nox.py b/bigquery/nox.py index 6684d415eb87..989965443159 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -79,7 +79,12 @@ def system_tests(session, python_version): session.install('.') # Run py.test against the system tests. - session.run('py.test', '--quiet', os.path.join('tests', 'system.py')) + session.run( + 'py.test', + '--quiet', + os.path.join('tests', 'system.py'), + *session.posargs + ) @nox.session diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 620eee11fd3d..502c0495f9c9 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1765,7 +1765,6 @@ def _make_do_upload_patch(cls, table, method, side_effect=None): 'configuration': { 'load': { 'sourceFormat': 'CSV', - 'schema': {'fields': []}, 'destinationTable': { 'projectId': 'project_id', 'datasetId': 'test_dataset', @@ -1821,7 +1820,6 @@ def test_upload_file_resumable_metadata(self): 'configuration': { 'load': { 'sourceFormat': config_args['source_format'], - 'schema': {'fields': []}, 'destinationTable': { 'projectId': table._dataset._client.project, 'datasetId': table.dataset_name, @@ -2148,6 +2146,70 @@ def test_w_subfields(self): 'mode': 'REQUIRED'}]}) +class Test__get_upload_metadata(unittest.TestCase): + + @staticmethod + def _call_fut(source_format, schema, dataset, name): + from google.cloud.bigquery.table import _get_upload_metadata + + return _get_upload_metadata(source_format, schema, dataset, name) + + def test_empty_schema(self): + source_format = 'AVRO' + dataset = mock.Mock(project='prediction', spec=['name', 'project']) + dataset.name = 'market' # mock.Mock() treats `name` specially. + table_name = 'chairs' + metadata = self._call_fut(source_format, [], dataset, table_name) + + expected = { + 'configuration': { + 'load': { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': table_name, + }, + }, + }, + } + self.assertEqual(metadata, expected) + + def test_with_schema(self): + from google.cloud.bigquery.table import SchemaField + + source_format = 'CSV' + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + dataset = mock.Mock(project='blind', spec=['name', 'project']) + dataset.name = 'movie' # mock.Mock() treats `name` specially. + table_name = 'teebull-neem' + metadata = self._call_fut( + source_format, [full_name], dataset, table_name) + + expected = { + 'configuration': { + 'load': { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': table_name, + }, + 'schema': { + 'fields': [ + { + 'name': full_name.name, + 'type': full_name.field_type, + 'mode': full_name.mode, + }, + ], + }, + }, + }, + } + self.assertEqual(metadata, expected) + + class _Client(object): _query_results = ()