Skip to content

Commit

Permalink
bigquery: support external table definitions for query jobs (#4191)
Browse files Browse the repository at this point in the history
Also, set ExternalConfig.options based on source_format, and
make read-only.

Also, change from_api_repr functions in external_config.py so that
they don't modify their resource argument. This simplifies tests.
  • Loading branch information
jba authored and tswast committed Oct 16, 2017
1 parent ab392d5 commit 7ea861f
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 238 deletions.
8 changes: 8 additions & 0 deletions bigquery/google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,11 @@ def _should_retry(exc):
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""


def _int_or_none(value):
"""Helper: deserialize int value from JSON string."""
if isinstance(value, int):
return value
if value is not None:
return int(value)
233 changes: 117 additions & 116 deletions bigquery/google/cloud/bigquery/external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,122 +29,10 @@
from google.cloud.bigquery._helpers import _bytes_to_json
from google.cloud.bigquery._helpers import _TypedApiResourceProperty
from google.cloud.bigquery._helpers import _ListApiResourceProperty
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _build_schema_resource
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery.job import _int_or_none


class ExternalConfig(object):
"""Description of an external data source.
:type source_format: str
:param source_format: the format of the external data. See
the ``source_format`` property on this class.
"""

def __init__(self, source_format):
self._properties = {'sourceFormat': source_format}
self._options = None

@property
def source_format(self):
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat
"""
return self._properties['sourceFormat']

autodetect = _TypedApiResourceProperty(
'autodetect', 'autodetect', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).autodetect
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.autodetect
"""

compression = _TypedApiResourceProperty(
'compression', 'compression', six.string_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).compression
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.compression
"""

ignore_unknown_values = _TypedApiResourceProperty(
'ignore_unknown_values', 'ignoreUnknownValues', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).ignoreUnknownValues
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.ignoreUnknownValues
"""

max_bad_records = _TypedApiResourceProperty(
'max_bad_records', 'maxBadRecords', six.integer_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).maxBadRecords
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.maxBadRecords
"""

source_uris = _ListApiResourceProperty(
'source_uris', 'sourceUris', six.string_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceUris
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceUris
"""

schema = _ListApiResourceProperty('schema', 'schema', SchemaField)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).schema
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.schema
"""

@property
def options(self):
"""Source-specific options. A subclass of ExternalConfigOptions."""
return self._options

@options.setter
def options(self, value):
if self.source_format != value._SOURCE_FORMAT:
raise ValueError(
'source format %s does not match option type %s' % (
self.source_format, value.__class__.__name__))
self._options = value

def to_api_repr(self):
"""Build an API representation of this object.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
"""
config = copy.deepcopy(self._properties)
if self.schema:
config['schema'] = {'fields': _build_schema_resource(self.schema)}
if self.options is not None:
config[self.options._RESOURCE_NAME] = self.options.to_api_repr()
return config

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a CSVOptions given its API representation
:type resource: dict
:param resource:
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.external_config.CSVOptions`
:returns: Configuration parsed from ``resource``.
"""
config = cls(resource['sourceFormat'])
schema = resource.pop('schema', None)
for optcls in (BigtableOptions, CSVOptions, GoogleSheetsOptions):
opts = resource.pop(optcls._RESOURCE_NAME, None)
if opts is not None:
config.options = optcls.from_api_repr(opts)
break
config._properties = copy.deepcopy(resource)
if schema:
config.schema = _parse_schema_resource(schema)
return config


class BigtableColumn(object):
Expand Down Expand Up @@ -220,9 +108,9 @@ def from_api_repr(cls, resource):
:rtype: :class:`google.cloud.bigquery.external_config.BigtableColumn`
:returns: Configuration parsed from ``resource``.
"""
qe = resource.pop('qualifierEncoded', None)
config = cls()
config._properties = copy.deepcopy(resource)
qe = resource.get('qualifierEncoded')
if qe:
config.qualifier_encoded = base64.standard_b64decode(_to_bytes(qe))
return config
Expand Down Expand Up @@ -436,7 +324,7 @@ def from_api_repr(cls, resource):
:rtype: :class:`google.cloud.bigquery.external_config.CSVOptions`
:returns: Configuration parsed from ``resource``.
"""
slr = resource.pop('skipLeadingRows', None)
slr = resource.get('skipLeadingRows')
config = cls()
config._properties = copy.deepcopy(resource)
config.skip_leading_rows = _int_or_none(slr)
Expand Down Expand Up @@ -484,8 +372,121 @@ def from_api_repr(cls, resource):
:class:`google.cloud.bigquery.external_config.GoogleSheetsOptions`
:returns: Configuration parsed from ``resource``.
"""
slr = resource.pop('skipLeadingRows', None)
slr = resource.get('skipLeadingRows')
config = cls()
config._properties = copy.deepcopy(resource)
config.skip_leading_rows = _int_or_none(slr)
return config


_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions)


class ExternalConfig(object):
"""Description of an external data source.
:type source_format: str
:param source_format: the format of the external data. See
the ``source_format`` property on this class.
"""

def __init__(self, source_format):
self._properties = {'sourceFormat': source_format}
self._options = None
for optcls in _OPTION_CLASSES:
if source_format == optcls._SOURCE_FORMAT:
self._options = optcls()
break

@property
def source_format(self):
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceFormat
"""
return self._properties['sourceFormat']

@property
def options(self):
"""Source-specific options."""
return self._options

autodetect = _TypedApiResourceProperty(
'autodetect', 'autodetect', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).autodetect
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.autodetect
"""

compression = _TypedApiResourceProperty(
'compression', 'compression', six.string_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).compression
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.compression
"""

ignore_unknown_values = _TypedApiResourceProperty(
'ignore_unknown_values', 'ignoreUnknownValues', bool)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).ignoreUnknownValues
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.ignoreUnknownValues
"""

max_bad_records = _TypedApiResourceProperty(
'max_bad_records', 'maxBadRecords', six.integer_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).maxBadRecords
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.maxBadRecords
"""

source_uris = _ListApiResourceProperty(
'source_uris', 'sourceUris', six.string_types)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceUris
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.sourceUris
"""

schema = _ListApiResourceProperty('schema', 'schema', SchemaField)
"""See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).schema
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.schema
"""

def to_api_repr(self):
"""Build an API representation of this object.
:rtype: dict
:returns: A dictionary in the format used by the BigQuery API.
"""
config = copy.deepcopy(self._properties)
if self.schema:
config['schema'] = {'fields': _build_schema_resource(self.schema)}
if self.options is not None:
r = self.options.to_api_repr()
if r != {}:
config[self.options._RESOURCE_NAME] = r
return config

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a CSVOptions given its API representation
:type resource: dict
:param resource:
An extract job configuration in the same representation as is
returned from the API.
:rtype: :class:`google.cloud.bigquery.external_config.CSVOptions`
:returns: Configuration parsed from ``resource``.
"""
config = cls(resource['sourceFormat'])
schema = resource.get('schema')
for optcls in _OPTION_CLASSES:
opts = resource.get(optcls._RESOURCE_NAME)
if opts is not None:
config._options = optcls.from_api_repr(opts)
break
config._properties = copy.deepcopy(resource)
if schema:
config.schema = _parse_schema_resource(schema)
return config
45 changes: 29 additions & 16 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.query import _AbstractQueryParameter
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
Expand All @@ -39,6 +40,7 @@
from google.cloud.bigquery._helpers import _ListApiResourceProperty
from google.cloud.bigquery._helpers import _TypedApiResourceProperty
from google.cloud.bigquery._helpers import DEFAULT_RETRY
from google.cloud.bigquery._helpers import _int_or_none

_DONE_STATE = 'DONE'
_STOPPED_REASON = 'stopped'
Expand All @@ -65,22 +67,6 @@
}


def _bool_or_none(value):
"""Helper: deserialize boolean value from JSON string."""
if isinstance(value, bool):
return value
if value is not None:
return value.lower() in ['t', 'true', '1']


def _int_or_none(value):
"""Helper: deserialize int value from JSON string."""
if isinstance(value, int):
return value
if value is not None:
return int(value)


def _error_result_to_exception(error_result):
"""Maps BigQuery error reasons to an exception.
Expand Down Expand Up @@ -1315,6 +1301,14 @@ def _to_api_repr_udf_resources(value):
]


def _from_api_repr_table_defs(resource):
return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}


def _to_api_repr_table_defs(value):
return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}


class QueryJobConfig(object):
"""Configuration options for query jobs.
Expand Down Expand Up @@ -1469,6 +1463,16 @@ def from_api_repr(cls, resource):
https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition
"""

table_definitions = _TypedApiResourceProperty(
'table_definitions', 'tableDefinitions', dict)
"""
Definitions for external tables. A dictionary from table names (strings)
to :class:`google.cloud.bigquery.external_config.ExternalConfig`.
See
https://g.co/cloud/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions
"""

_maximum_billing_tier = None
_maximum_bytes_billed = None

Expand All @@ -1478,6 +1482,8 @@ def from_api_repr(cls, resource):
'destinationTable': (
TableReference.from_api_repr, TableReference.to_api_repr),
'maximumBytesBilled': (int, str),
'tableDefinitions': (_from_api_repr_table_defs,
_to_api_repr_table_defs),
_QUERY_PARAMETERS_KEY: (
_from_api_repr_query_parameters, _to_api_repr_query_parameters),
_UDF_RESOURCES_KEY: (
Expand Down Expand Up @@ -1615,6 +1621,13 @@ def maximum_bytes_billed(self):
"""
return self._configuration.maximum_bytes_billed

@property
def table_definitions(self):
"""See
:class:`~google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
"""
return self._configuration.table_definitions

def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
configuration = self._configuration.to_api_repr()
Expand Down

0 comments on commit 7ea861f

Please sign in to comment.