Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def compute_table_name(row):
'BigQuerySource',
'BigQuerySink',
'WriteToBigQuery',
'SCHEMA_AUTODETECT',
]


Expand Down Expand Up @@ -786,6 +787,7 @@ def _create_table_if_needed(self, table_reference, schema=None):
table_reference, schema)

table_schema = self.get_table_schema(schema)

if table_reference.projectId is None:
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')
Expand Down Expand Up @@ -880,6 +882,10 @@ def _flush_batch(self, destination):
(destination, row))) for row in failed_rows]


# Flag to be passed to WriteToBigQuery to force schema autodetection
SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT'


class WriteToBigQuery(PTransform):
"""Write data to BigQuery.

Expand Down Expand Up @@ -943,8 +949,9 @@ def __init__(self,
fields, repeated fields, or specifying a BigQuery mode for fields
(mode will always be set to ``'NULLABLE'``).
If a callable, then it should receive a destination (in the form of
a TableReference or a string, and return a str, dict or TableSchema, and
it should return a str, dict or TableSchema.
a TableReference or a string, and return a str, dict or TableSchema.
One may also pass ``SCHEMA_AUTODETECT`` here, and BigQuery will try to
infer the schema for the files that are being loaded.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:

Expand Down Expand Up @@ -1004,7 +1011,10 @@ def __init__(self,
create_disposition)
self.write_disposition = BigQueryDisposition.validate_write(
write_disposition)
self.schema = WriteToBigQuery.get_dict_table_schema(schema)
if schema == SCHEMA_AUTODETECT:
self.schema = schema
else:
self.schema = WriteToBigQuery.get_dict_table_schema(schema)
self.batch_size = batch_size
self.kms_key = kms_key
self.test_client = test_client
Expand Down Expand Up @@ -1126,6 +1136,11 @@ def expand(self, pcoll):

method_to_use = self._compute_method(p, p.options)

if (method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS
and self.schema == SCHEMA_AUTODETECT):
raise ValueError('Schema auto-detection is not supported for streaming '
'inserts into BigQuery. Only for File Loads.')

if method_to_use == WriteToBigQuery.Method.STREAMING_INSERTS:
# TODO: Support load jobs for streaming pipelines.
bigquery_write_fn = BigQueryWriteFn(
Expand Down
8 changes: 2 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,8 @@ def __init__(self,

def display_data(self):
result = {'create_disposition': str(self.create_disposition),
'write_disposition': str(self.write_disposition),
'additional_bq_parameters': str(self.additional_bq_parameters)}
if self.schema is not None:
result['schema'] = str(self.schema)
else:
result['schema'] = 'AUTODETECT'
'write_disposition': str(self.write_disposition)}
result['schema'] = str(self.schema)

return result

Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ def test_value_provider_transform(self):
| "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=value_provider.StaticValueProvider(
str, '%s:%s' % (self.project, output_table_2)),
schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
additional_bq_parameters=lambda _: additional_bq_parameters,
method='FILE_LOADS'))

Expand Down
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ def _insert_load_job(self,
create_disposition=None,
additional_load_parameters=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
Expand All @@ -335,11 +336,11 @@ def _insert_load_job(self,
load=bigquery.JobConfigurationLoad(
sourceUris=source_uris,
destinationTable=table_reference,
schema=schema,
schema=job_schema,
writeDisposition=write_disposition,
createDisposition=create_disposition,
sourceFormat='NEWLINE_DELIMITED_JSON',
autodetect=schema is None,
autodetect=schema == 'SCHEMA_AUTODETECT',
**additional_load_parameters
)
),
Expand Down