Skip to content

Commit

Permalink
bigquery: support external data definition for tables (#4193)
Browse files Browse the repository at this point in the history
  • Loading branch information
jba authored and tswast committed Oct 16, 2017
1 parent 7ea861f commit 3af22d7
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 164 deletions.
47 changes: 47 additions & 0 deletions bigquery/google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,50 @@ def __hash__(self):

def __repr__(self):
return 'SchemaField{}'.format(self._key())


def _parse_schema_resource(info):
"""Parse a resource fragment into a schema field.
:type info: mapping
:param info: should contain a "fields" key to be parsed
:rtype: list of :class:`SchemaField`, or ``NoneType``
:returns: a list of parsed fields, or ``None`` if no "fields" key is
present in ``info``.
"""
if 'fields' not in info:
return ()

schema = []
for r_field in info['fields']:
name = r_field['name']
field_type = r_field['type']
mode = r_field.get('mode', 'NULLABLE')
description = r_field.get('description')
sub_fields = _parse_schema_resource(r_field)
schema.append(
SchemaField(name, field_type, mode, description, sub_fields))
return schema


def _build_schema_resource(fields):
"""Generate a resource fragment for a schema.
:type fields: sequence of :class:`SchemaField`
:param fields: schema to be dumped
:rtype: mapping
:returns: a mapping describing the schema of the supplied fields.
"""
infos = []
for field in fields:
info = {'name': field.name,
'type': field.field_type,
'mode': field.mode}
if field.description is not None:
info['description'] = field.description
if field.fields:
info['fields'] = _build_schema_resource(field.fields)
infos.append(info)
return infos
94 changes: 45 additions & 49 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _millis_from_datetime
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.external_config import ExternalConfig


_TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'client.get_table()'"
Expand Down Expand Up @@ -159,13 +162,15 @@ class Table(object):

all_fields = [
'description', 'friendly_name', 'expires', 'location',
'partitioning_type', 'view_use_legacy_sql', 'view_query', 'schema'
'partitioning_type', 'view_use_legacy_sql', 'view_query', 'schema',
'external_data_configuration',
]

def __init__(self, table_ref, schema=()):
self._project = table_ref.project
self._table_id = table_ref.table_id
self._dataset_id = table_ref.dataset_id
self._external_config = None
self._properties = {}
# Let the @property do validation.
self.schema = schema
Expand Down Expand Up @@ -537,10 +542,37 @@ def view_use_legacy_sql(self, value):

@property
def streaming_buffer(self):
"""Information about a table's streaming buffer.
:rtype: :class:`StreamingBuffer`
:returns: Streaming buffer information, returned from get_table.
"""
sb = self._properties.get('streamingBuffer')
if sb is not None:
return StreamingBuffer(sb)

@property
def external_data_configuration(self):
"""Configuration for an external data source.
If not set, None is returned.
:rtype: :class:`ExternalConfig`, or ``NoneType``
:returns: The external configuration, or None (the default).
"""
return self._external_config

@external_data_configuration.setter
def external_data_configuration(self, value):
"""Sets the configuration for an external data source.
:type value: :class:`ExternalConfig`, or ``NoneType``
:param value: The ExternalConfig, or None to unset.
"""
if not (value is None or isinstance(value, ExternalConfig)):
raise ValueError("Pass an ExternalConfig or None")
self._external_config = value

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a table given its API representation
Expand Down Expand Up @@ -579,6 +611,9 @@ def _set_properties(self, api_response):
cleaned = api_response.copy()
schema = cleaned.pop('schema', {'fields': ()})
self.schema = _parse_schema_resource(schema)
ec = cleaned.pop('externalDataConfiguration', None)
if ec:
self.external_data_configuration = ExternalConfig.from_api_repr(ec)
if 'creationTime' in cleaned:
cleaned['creationTime'] = float(cleaned['creationTime'])
if 'lastModifiedTime' in cleaned:
Expand Down Expand Up @@ -614,12 +649,20 @@ def _populate_schema_resource(self, resource):
'fields': _build_schema_resource(self._schema),
}

def _populate_external_config(self, resource):
if not self.external_data_configuration:
resource['externalDataConfiguration'] = None
else:
resource['externalDataConfiguration'] = ExternalConfig.to_api_repr(
self.external_data_configuration)

custom_resource_fields = {
'expires': _populate_expires_resource,
'partitioning_type': _populate_partitioning_type_resource,
'view_query': _populate_view_query_resource,
'view_use_legacy_sql': _populate_view_use_legacy_sql_resource,
'schema': _populate_schema_resource
'schema': _populate_schema_resource,
'external_data_configuration': _populate_external_config,
}

def _build_resource(self, filter_fields):
Expand Down Expand Up @@ -690,50 +733,3 @@ def __init__(self, resource):
# time is in milliseconds since the epoch.
self.oldest_entry_time = _datetime_from_microseconds(
1000.0 * int(resource['oldestEntryTime']))


def _parse_schema_resource(info):
"""Parse a resource fragment into a schema field.
:type info: mapping
:param info: should contain a "fields" key to be parsed
:rtype: list of :class:`SchemaField`, or ``NoneType``
:returns: a list of parsed fields, or ``None`` if no "fields" key is
present in ``info``.
"""
if 'fields' not in info:
return ()

schema = []
for r_field in info['fields']:
name = r_field['name']
field_type = r_field['type']
mode = r_field.get('mode', 'NULLABLE')
description = r_field.get('description')
sub_fields = _parse_schema_resource(r_field)
schema.append(
SchemaField(name, field_type, mode, description, sub_fields))
return schema


def _build_schema_resource(fields):
"""Generate a resource fragment for a schema.
:type fields: sequence of :class:`SchemaField`
:param fields: schema to be dumped
:rtype: mapping
:returns: a mapping describing the schema of the supplied fields.
"""
infos = []
for field in fields:
info = {'name': field.name,
'type': field.field_type,
'mode': field.mode}
if field.description is not None:
info['description'] = field.description
if field.fields:
info['fields'] = _build_schema_resource(field.fields)
infos.append(info)
return infos
37 changes: 35 additions & 2 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ def test_query_table_def(self):
('Bhettye Rhubble', 27),
]
gs_url = self._write_csv_to_storage(
'bq_load_test' + unique_resource_id(), 'person_ages.csv',
'bq_external_test' + unique_resource_id(), 'person_ages.csv',
('Full Name', 'Age'), rows)

job_config = bigquery.QueryJobConfig()
Expand All @@ -1270,7 +1270,7 @@ def test_query_table_def(self):
]
ec.options.skip_leading_rows = 1 # skip the header row
job_config.table_definitions = {table_id: ec}
sql = 'SELECT * from %s' % table_id
sql = 'SELECT * FROM %s' % table_id

got_rows = Config.CLIENT.query_rows(sql, job_config=job_config)

Expand All @@ -1279,6 +1279,39 @@ def test_query_table_def(self):
self.assertEqual(sorted(row_tuples, key=by_age),
sorted(rows, key=by_age))

def test_query_external_table(self):
rows = [
('Phred Phlyntstone', 32),
('Bharney Rhubble', 33),
('Wylma Phlyntstone', 29),
('Bhettye Rhubble', 27),
]
gs_url = self._write_csv_to_storage(
'bq_external_test' + unique_resource_id(), 'person_ages.csv',
('Full Name', 'Age'), rows)
dataset_id = _make_dataset_id('query_external_table')
dataset = self.temp_dataset(dataset_id)
table_id = 'flintstones'
full_name = bigquery.SchemaField('full_name', 'STRING',
mode='REQUIRED')
age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
table_arg = Table(dataset.table(table_id), schema=[full_name, age])
ec = bigquery.ExternalConfig('CSV')
ec.source_uris = [gs_url]
ec.options.skip_leading_rows = 1 # skip the header row
table_arg.external_data_configuration = ec
table = Config.CLIENT.create_table(table_arg)
self.to_delete.insert(0, table)

sql = 'SELECT * FROM %s.%s' % (dataset_id, table_id)

got_rows = Config.CLIENT.query_rows(sql)

row_tuples = [r.values() for r in got_rows]
by_age = operator.itemgetter(1)
self.assertEqual(sorted(row_tuples, key=by_age),
sorted(rows, key=by_age))

def test_create_rows_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
50 changes: 50 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,56 @@ def test_create_table_w_schema_and_query(self):
self.assertEqual(got.schema, schema)
self.assertEqual(got.view_query, query)

def test_create_table_w_external(self):
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.external_config import ExternalConfig

path = 'projects/%s/datasets/%s/tables' % (
self.PROJECT, self.DS_ID)
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
resource = {
'id': '%s:%s:%s' % (self.PROJECT, self.DS_ID, self.TABLE_ID),
'tableReference': {
'projectId': self.PROJECT,
'datasetId': self.DS_ID,
'tableId': self.TABLE_ID
},
'externalDataConfiguration': {
'sourceFormat': 'CSV',
'autodetect': True,
},
}
conn = client._connection = _Connection(resource)
table = Table(self.TABLE_REF)
ec = ExternalConfig('CSV')
ec.autodetect = True
table.external_data_configuration = ec

got = client.create_table(table)

self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s' % path)
sent = {
'tableReference': {
'projectId': self.PROJECT,
'datasetId': self.DS_ID,
'tableId': self.TABLE_ID,
},
'externalDataConfiguration': {
'sourceFormat': 'CSV',
'autodetect': True,
}
}
self.assertEqual(req['data'], sent)
self.assertEqual(got.table_id, self.TABLE_ID)
self.assertEqual(got.project, self.PROJECT)
self.assertEqual(got.dataset_id, self.DS_ID)
self.assertEqual(got.external_data_configuration.source_format, 'CSV')
self.assertEqual(got.external_data_configuration.autodetect, True)

def test_get_table(self):
path = 'projects/%s/datasets/%s/tables/%s' % (
self.PROJECT, self.DS_ID, self.TABLE_ID)
Expand Down

0 comments on commit 3af22d7

Please sign in to comment.