diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index ab4e09bbce457..ed8ec3fcbdc1a 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -15,7 +15,6 @@ """Define API Datasets.""" import datetime -import json import os import httplib2 @@ -26,7 +25,6 @@ from google.resumable_media.requests import MultipartUpload from google.resumable_media.requests import ResumableUpload -from google.cloud._helpers import _bytes_to_unicode from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime from google.cloud.exceptions import NotFound @@ -50,6 +48,7 @@ _READ_LESS_THAN_SIZE = ( 'Size {:d} was specified but the file-like object only had ' '{:d} bytes remaining.') +_DEFAULT_NUM_RETRIES = 6 class Table(object): @@ -1004,7 +1003,7 @@ def upload_from_file(self, source_format, rewind=False, size=None, - num_retries=6, + num_retries=_DEFAULT_NUM_RETRIES, allow_jagged_rows=None, allow_quoted_newlines=None, create_disposition=None, @@ -1215,13 +1214,6 @@ def _build_schema_resource(fields): # pylint: enable=unused-argument -def _convert_timestamp(value): - """Helper for :meth:`Table.insert_data`.""" - if isinstance(value, datetime.datetime): - value = _microseconds_from_datetime(value) * 1e-6 - return value - - def _maybe_rewind(stream, rewind=False): """Rewind the stream if desired. diff --git a/bigquery/nox.py b/bigquery/nox.py index cf1d54b543095..149b9443ca6c0 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -40,8 +40,8 @@ def unit_tests(session, python_version): # Run py.test against the unit tests. session.run('py.test', '--quiet', '--cov=google.cloud.bigquery', '--cov=tests.unit', '--cov-append', - '--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97', - 'tests/unit', + '--cov-config=.coveragerc', '--cov-report=term-missing', '--cov-fail-under=97', + 'tests/unit', *session.posargs ) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 183e65e4a502f..1b650f8685555 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import email +import io +import json import unittest +import mock +from six.moves import http_client +import pytest + class _SchemaBase(object): @@ -1553,314 +1560,319 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - # Presently failing tests below. - - # def test_upload_from_file_text_mode_file_failure(self): - - # class TextModeFile(object): - # mode = 'r' - - # conn = _Connection() - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # file_obj = TextModeFile() - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - # with self.assertRaises(ValueError): - # table.upload_from_file(file_obj, 'CSV', size=1234) - - # def test_upload_from_file_binary_mode_no_failure(self): - # self._upload_from_file_helper(input_file_mode='r+b') - - # def test_upload_from_file_size_failure(self): - # conn = _Connection() - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # file_obj = object() - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - # with self.assertRaises(ValueError): - # table.upload_from_file(file_obj, 'CSV', size=None) - - # def test_upload_from_file_multipart_w_400(self): - # import csv - # import datetime - # from six.moves.http_client import BAD_REQUEST - # from google.cloud._testing import _NamedTemporaryFile - # from google.cloud._helpers import UTC - # from google.cloud.exceptions import BadRequest - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # response = {'status': BAD_REQUEST} - # conn = _Connection( - # (response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - # dataset = _Dataset(client) - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - # with open(temp.name, 'rb') as file_obj: - # with self.assertRaises(BadRequest): - # table.upload_from_file( - # file_obj, 'CSV', rewind=True) - - # def _upload_from_file_helper(self, **kw): - # import csv - # import datetime - # from six.moves.http_client import OK - # from google.cloud._helpers import UTC - # from google.cloud._testing import _NamedTemporaryFile - # from google.cloud.bigquery.table import SchemaField - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # PATH = 'projects/%s/jobs' % (self.PROJECT,) - # response = {'status': OK} - # conn = _Connection( - # (response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - # expected_job = object() - # if 'client' in kw: - # kw['client']._job = expected_job - # else: - # client._job = expected_job - # input_file_mode = kw.pop('input_file_mode', 'rb') - # dataset = _Dataset(client) - # full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - # age = SchemaField('age', 'INTEGER', mode='REQUIRED') - # joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') - # table = self._make_one(self.TABLE_NAME, dataset=dataset, - # schema=[full_name, age, joined]) - # ROWS = [ - # ('Phred Phlyntstone', 32, WHEN), - # ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), - # ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), - # ('Bhettye Rhubble', 27, None), - # ] - - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerows(ROWS) - - # with open(temp.name, input_file_mode) as file_obj: - # BODY = file_obj.read() - # explicit_size = kw.pop('_explicit_size', False) - # if explicit_size: - # kw['size'] = len(BODY) - # job = table.upload_from_file( - # file_obj, 'CSV', rewind=True, **kw) - - # self.assertIs(job, expected_job) - # return conn.http._requested, PATH, BODY - - # def test_upload_from_file_w_bound_client_multipart(self): - # import json - # from six.moves.urllib.parse import parse_qsl - # from six.moves.urllib.parse import urlsplit - # from google.cloud._helpers import _to_bytes - - # requested, PATH, BODY = self._upload_from_file_helper() - # parse_chunk = _email_chunk_parser() - - # self.assertEqual(len(requested), 1) - # req = requested[0] - # self.assertEqual(req['method'], 'POST') - # uri = req['uri'] - # scheme, netloc, path, qs, _ = urlsplit(uri) - # self.assertEqual(scheme, 'http') - # self.assertEqual(netloc, 'example.com') - # self.assertEqual(path, '/%s' % PATH) - # self.assertEqual(dict(parse_qsl(qs)), - # {'uploadType': 'multipart'}) - - # ctype, boundary = [x.strip() - # for x in req['headers']['content-type'].split(';')] - # self.assertEqual(ctype, 'multipart/related') - # self.assertTrue(boundary.startswith('boundary="==')) - # self.assertTrue(boundary.endswith('=="')) - - # divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - # chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - # self.assertEqual(len(chunks), 2) - - # text_msg = parse_chunk(chunks[0].strip()) - # self.assertEqual(dict(text_msg._headers), - # {'Content-Type': 'application/json', - # 'MIME-Version': '1.0'}) - # metadata = json.loads(text_msg._payload) - # load_config = metadata['configuration']['load'] - # DESTINATION_TABLE = { - # 'projectId': self.PROJECT, - # 'datasetId': self.DS_NAME, - # 'tableId': self.TABLE_NAME, - # } - # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - # self.assertEqual(load_config['sourceFormat'], 'CSV') - - # app_msg = parse_chunk(chunks[1].strip()) - # self.assertEqual(dict(app_msg._headers), - # {'Content-Type': 'application/octet-stream', - # 'Content-Transfer-Encoding': 'binary', - # 'MIME-Version': '1.0'}) - # body = BODY.decode('ascii').rstrip() - # body_lines = [line.strip() for line in body.splitlines()] - # payload_lines = app_msg._payload.rstrip().splitlines() - # self.assertEqual(payload_lines, body_lines) - - # def test_upload_from_file_resumable_with_400(self): - # import csv - # import datetime - # import mock - # from six.moves.http_client import BAD_REQUEST - # from google.cloud.exceptions import BadRequest - # from google.cloud._helpers import UTC - # from google.cloud._testing import _NamedTemporaryFile - - # WHEN_TS = 1437767599.006 - # WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - # tzinfo=UTC) - # initial_response = {'status': BAD_REQUEST} - # conn = _Connection( - # (initial_response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - - # class _UploadConfig(object): - # accept = ['*/*'] - # max_size = None - # resumable_multipart = True - # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - # simple_multipart = True - # simple_path = u'' # force resumable - # dataset = _Dataset(client) - # table = self._make_one(self.TABLE_NAME, dataset=dataset) - - # with mock.patch('google.cloud.bigquery.table._UploadConfig', - # new=_UploadConfig): - # with _NamedTemporaryFile() as temp: - # with open(temp.name, 'w') as file_obj: - # writer = csv.writer(file_obj) - # writer.writerow(('full_name', 'age', 'joined')) - # writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - # with open(temp.name, 'rb') as file_obj: - # with self.assertRaises(BadRequest): - # table.upload_from_file( - # file_obj, 'CSV', rewind=True) - - # # pylint: disable=too-many-statements - # def test_upload_from_file_w_explicit_client_resumable(self): - # import json - # import mock - # from six.moves.http_client import OK - # from six.moves.urllib.parse import parse_qsl - # from six.moves.urllib.parse import urlsplit - - # UPLOAD_PATH = 'https://example.com/upload/test' - # initial_response = {'status': OK, 'location': UPLOAD_PATH} - # upload_response = {'status': OK} - # conn = _Connection( - # (initial_response, b'{}'), - # (upload_response, b'{}'), - # ) - # client = _Client(project=self.PROJECT, connection=conn) - - # class _UploadConfig(object): - # accept = ['*/*'] - # max_size = None - # resumable_multipart = True - # resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - # simple_multipart = True - # simple_path = u'' # force resumable - - # with mock.patch('google.cloud.bigquery.table._UploadConfig', - # new=_UploadConfig): - # orig_requested, PATH, BODY = self._upload_from_file_helper( - # allow_jagged_rows=False, - # allow_quoted_newlines=False, - # create_disposition='CREATE_IF_NEEDED', - # encoding='utf8', - # field_delimiter=',', - # ignore_unknown_values=False, - # max_bad_records=0, - # quote_character='"', - # skip_leading_rows=1, - # write_disposition='WRITE_APPEND', - # client=client, - # _explicit_size=True) - - # self.assertEqual(len(orig_requested), 0) - - # requested = conn.http._requested - # self.assertEqual(len(requested), 2) - # req = requested[0] - # self.assertEqual(req['method'], 'POST') - # uri = req['uri'] - # scheme, netloc, path, qs, _ = urlsplit(uri) - # self.assertEqual(scheme, 'http') - # self.assertEqual(netloc, 'example.com') - # self.assertEqual(path, '/%s' % PATH) - # self.assertEqual(dict(parse_qsl(qs)), - # {'uploadType': 'resumable'}) - - # self.assertEqual(req['headers']['content-type'], 'application/json') - # metadata = json.loads(req['body']) - # load_config = metadata['configuration']['load'] - # DESTINATION_TABLE = { - # 'projectId': self.PROJECT, - # 'datasetId': self.DS_NAME, - # 'tableId': self.TABLE_NAME, - # } - # self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - # self.assertEqual(load_config['sourceFormat'], 'CSV') - # self.assertEqual(load_config['allowJaggedRows'], False) - # self.assertEqual(load_config['allowQuotedNewlines'], False) - # self.assertEqual(load_config['createDisposition'], 'CREATE_IF_NEEDED') - # self.assertEqual(load_config['encoding'], 'utf8') - # self.assertEqual(load_config['fieldDelimiter'], ',') - # self.assertEqual(load_config['ignoreUnknownValues'], False) - # self.assertEqual(load_config['maxBadRecords'], 0) - # self.assertEqual(load_config['quote'], '"') - # self.assertEqual(load_config['skipLeadingRows'], 1) - # self.assertEqual(load_config['writeDisposition'], 'WRITE_APPEND') - - # req = requested[1] - # self.assertEqual(req['method'], 'PUT') - # self.assertEqual(req['uri'], UPLOAD_PATH) - # headers = req['headers'] - # length = len(BODY) - # self.assertEqual(headers['Content-Type'], 'application/octet-stream') - # self.assertEqual(headers['Content-Range'], - # 'bytes 0-%d/%d' % (length - 1, length)) - # self.assertEqual(headers['content-length'], '%d' % (length,)) - # self.assertEqual(req['body'], BODY) - # pylint: enable=too-many-statements - - def test_upload_from_file_w_jobid(self): - import json - from google.cloud._helpers import _to_bytes - - requested, PATH, BODY = self._upload_from_file_helper(job_name='foo') - parse_chunk = _email_chunk_parser() - req = requested[0] - ctype, boundary = [x.strip() - for x in req['headers']['content-type'].split(';')] - divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - text_msg = parse_chunk(chunks[0].strip()) - metadata = json.loads(text_msg._payload) - load_config = metadata['configuration']['load'] - self.assertEqual(load_config['jobReference'], {'jobId': 'foo'}) + +class TestTableUpload(object): + + @staticmethod + def _make_table(): + from google.cloud.bigquery import _http + from google.cloud.bigquery import client + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + connection = mock.create_autospec(_http.Connection, instance=True) + client = mock.create_autospec(client.Client, instance=True) + client._connection = connection + client._credentials = mock.sentinel.credentials + client.project = 'project_id' + + dataset = dataset.Dataset('test_dataset', client) + table = table.Table('test_table', dataset) + + return table + + @staticmethod + def _make_response(status_code, content='', headers={}): + """Make a mock HTTP response.""" + import requests + response = mock.create_autospec(requests.Response, instance=True) + response.content = content.encode('utf-8') + response.headers = headers + response.status_code = status_code + return response + + @classmethod + def _make_do_upload_patch(cls, table, method, side_effect=None): + """Patches the low-level upload helpers.""" + if side_effect is None: + side_effect = [cls._make_response( + http_client.OK, + json.dumps({}), + {'Content-Type': 'application/json'})] + return mock.patch.object( + table, method, side_effect=side_effect, autospec=True) + + EXPECTED_CONFIGURATION = { + 'configuration': { + 'load': { + 'sourceFormat': 'CSV', + 'schema': {'fields': []}, + 'destinationTable': { + 'projectId': 'project_id', + 'datasetId': 'test_dataset', + 'tableId': 'test_table' + } + } + } + } + + @staticmethod + def _make_file_obj(): + return io.BytesIO(b'hello, is it me you\'re looking for?') + + # High-level tests + + def test_upload_from_file_resumable(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file(file_obj, source_format='CSV') + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_file_resumable_metadata(self): + table = self._make_table() + file_obj = self._make_file_obj() + + config_args = { + 'source_format': 'CSV', + 'allow_jagged_rows': False, + 'allow_quoted_newlines': False, + 'create_disposition': 'CREATE_IF_NEEDED', + 'encoding': 'utf8', + 'field_delimiter': ',', + 'ignore_unknown_values': False, + 'max_bad_records': 0, + 'quote_character': '"', + 'skip_leading_rows': 1, + 'write_disposition': 'WRITE_APPEND', + 'job_name': 'oddjob' + } + + expected_config = { + 'configuration': { + 'load': { + 'sourceFormat': config_args['source_format'], + 'schema': {'fields': []}, + 'destinationTable': { + 'projectId': table._dataset._client.project, + 'datasetId': table.dataset_name, + 'tableId': table.name + }, + 'allowJaggedRows': config_args['allow_jagged_rows'], + 'allowQuotedNewlines': + config_args['allow_quoted_newlines'], + 'createDisposition': config_args['create_disposition'], + 'encoding': config_args['encoding'], + 'fieldDelimiter': config_args['field_delimiter'], + 'ignoreUnknownValues': + config_args['ignore_unknown_values'], + 'maxBadRecords': config_args['max_bad_records'], + 'quote': config_args['quote_character'], + 'skipLeadingRows': config_args['skip_leading_rows'], + 'writeDisposition': config_args['write_disposition'], + 'jobReference': {'jobId': config_args['job_name']} + } + } + } + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, **config_args) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + expected_config, + mock.ANY) + + def test_upload_from_file_multipart(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_size = 10 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_multipart_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', size=file_obj_size) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_from_file_with_retries(self): + table = self._make_table() + file_obj = self._make_file_obj() + num_retries = 20 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', num_retries=num_retries) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + num_retries) + + def test_upload_from_file_with_rewind(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj.seek(2) + + with self._make_do_upload_patch(table, '_do_resumable_upload'): + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert file_obj.tell() == 0 + + def test_upload_from_file_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + table = self._make_table() + file_obj = self._make_file_obj() + + response = self._make_response( + content='Someone is already in this spot.', + status_code=http_client.CONFLICT) + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload', + side_effect=InvalidResponse(response)) + + with do_upload_patch, pytest.raises(exceptions.Conflict) as exc_info: + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert exc_info.value.message == response.content.decode('utf-8') + assert exc_info.value.errors == [] + + def test_upload_from_file_bad_mode(self): + table = self._make_table() + file_obj = mock.Mock(spec=['mode']) + file_obj.mode = 'x' + + with pytest.raises(ValueError): + table.upload_from_file( + file_obj, source_format='CSV',) + + # Low-level tests + + @classmethod + def _make_resumable_upload_responses(cls, size): + """Make a series of responses for a successful resumable upload.""" + from google import resumable_media + + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + initial_response = cls._make_response( + http_client.OK, '', {'location': resumable_url}) + data_response = cls._make_response( + resumable_media.PERMANENT_REDIRECT, + '', {'range': 'bytes=0-{:d}'.format(size - 1)}) + final_response = cls._make_response( + http_client.OK, + json.dumps({'size': size}), + {'Content-Type': 'application/json'}) + return [initial_response, data_response, final_response] + + @staticmethod + def _make_transport_patch(table, responses=None): + """Patch a table's _make_transport method to return given responses.""" + import google.auth.transport.requests + + transport = mock.create_autospec( + google.auth.transport.requests.AuthorizedSession, instance=True) + transport.request.side_effect = responses + return mock.patch.object( + table, '_make_transport', return_value=transport, autospec=True) + + def test__do_resumable_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = self._make_resumable_upload_responses(file_obj_len) + + with self._make_transport_patch(table, responses) as transport: + result = table._do_resumable_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + None) + + assert json.loads(result.content) == {'size': file_obj_len} + + # Verify that configuration data was passed in with the initial + # request. + transport.return_value.request.assert_any_call( + 'POST', + mock.ANY, + data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'), + headers=mock.ANY) + + def test__do_multipart_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = [self._make_response(http_client.OK)] + + with self._make_transport_patch(table, responses) as transport: + table._do_multipart_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_len, + None) + + # Verify that configuration data was passed in with the initial + # request. + request_args = transport.return_value.request.mock_calls[0][2] + request_data = request_args['data'].decode('utf-8') + request_headers = request_args['headers'] + + request_content = email.message_from_string( + 'Content-Type: {}\r\n{}'.format( + request_headers['content-type'].decode('utf-8'), + request_data)) + + # There should be two payloads: the configuration and the binary daya. + configuration_data = request_content.get_payload(0).get_payload() + binary_data = request_content.get_payload(1).get_payload() + + assert json.loads(configuration_data) == self.EXPECTED_CONFIGURATION + assert binary_data.encode('utf-8') == file_obj.getvalue() + + def test__do_multipart_upload_wrong_size(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + + with pytest.raises(ValueError): + table._do_multipart_upload( + table._dataset._client, + file_obj, + {}, + file_obj_len+1, + None) class Test_parse_schema_resource(unittest.TestCase, _SchemaBase): @@ -2018,37 +2030,14 @@ def project(self): return self._client.project -class _Responder(object): - - def __init__(self, *responses): - self._responses = responses[:] - self._requested = [] - - def _respond(self, **kw): - self._requested.append(kw) - response, self._responses = self._responses[0], self._responses[1:] - return response - - -class _HTTP(_Responder): - - connections = {} # For google-apitools debugging. - - def request(self, uri, method, headers, body, **kw): - if hasattr(body, 'read'): - body = body.read() - return self._respond(uri=uri, method=method, headers=headers, - body=body, **kw) - - -class _Connection(_Responder): +class _Connection(object): API_BASE_URL = 'http://example.com' USER_AGENT = 'testing 1.2.3' def __init__(self, *responses): - super(_Connection, self).__init__(*responses) - self.http = _HTTP(*responses) + self._responses = responses[:] + self._requested = [] def api_request(self, **kw): from google.cloud.exceptions import NotFound @@ -2072,18 +2061,3 @@ def build_api_url(self, path, query_params=None, qs = urlencode(query_params or {}) scheme, netloc, _, _, _ = urlsplit(api_base_url) return urlunsplit((scheme, netloc, path, qs, '')) - - -def _email_chunk_parser(): - import six - - if six.PY3: # pragma: NO COVER Python3 - from email.parser import BytesParser - - parser = BytesParser() - return parser.parsebytes - else: - from email.parser import Parser - - parser = Parser() - return parser.parsestr diff --git a/bigquery/tests/unit/test_table_upload.py b/bigquery/tests/unit/test_table_upload.py deleted file mode 100644 index d41c887587aed..0000000000000 --- a/bigquery/tests/unit/test_table_upload.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import csv -import io -import json - -from google import resumable_media -import google.auth.transport.requests -import mock -import requests -from six.moves import http_client -from six.moves import StringIO - -FIELDS = (u'name', u'age') -ROWS = [ - (u'Phred Phlyntstone', 32), - (u'Bharney Rhubble', 33), - (u'Wylma Phlyntstone', 29), - (u'Bhettye Rhubble', 27), -] - - -def rows_to_csv(fields, rows): - """Convert the rows into a CSV-format unicode string.""" - out = StringIO() - writer = csv.writer(out) - writer.writerow(fields) - writer.writerows(rows) - return out.getvalue() - - -def make_table(): - from google.cloud.bigquery import _http - from google.cloud.bigquery import client - from google.cloud.bigquery import dataset - from google.cloud.bigquery import table - - mock_connection = mock.Mock(spec=_http.Connection) - mock_client = mock.Mock(spec=client.Client) - mock_client._connection = mock_connection - mock_client._credentials = mock.sentinel.credentials - mock_client.project = 'project_id' - - dataset = dataset.Dataset('test_dataset', mock_client) - table = table.Table('test_table', dataset) - - return table - - -def make_response(status_code, content, headers={}): - return mock.Mock( - content=content, headers=headers, status_code=status_code, - spec=requests.Response) - - -def make_resumable_upload_responses(size): - resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' - initial_response = make_response( - http_client.OK, b'', {'location': resumable_url}) - data_response = make_response( - resumable_media.PERMANENT_REDIRECT, - b'', {'range': 'bytes=0-{:d}'.format(size - 1)}) - final_response = make_response( - http_client.OK, - json.dumps({'size': size}), - {'Content-Type': 'application/json'}) - return [initial_response, data_response, final_response] - - -def test_upload_from_file_simple(): - table = make_table() - - csv_file = io.BytesIO( - rows_to_csv(FIELDS, ROWS).encode('utf-8')) - csv_file_size = len(csv_file.getvalue()) - - mock_transport = mock.Mock( - spec=google.auth.transport.requests.AuthorizedSession) - transport_patch = mock.patch.object( - table, '_make_transport', return_value=mock_transport) - - with transport_patch: - mock_transport.request.side_effect = make_resumable_upload_responses( - csv_file_size) - table.upload_from_file(csv_file, source_format='CSV')