Skip to content

Commit

Permalink
Fix: GCS To BigQuery source_object (#16160)
Browse files Browse the repository at this point in the history
* Fix: GCS To BigQuery source_object #16008

Fix GCS To BigQuery source_object to accept both str and list

* convert source_objects to list if not list

converting source_objects to list instead of modifying the logic part

* add tests
  • Loading branch information
tegardp committed May 31, 2021
1 parent b7d1039 commit 99d1535
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ class GCSToBigQueryOperator(BaseOperator):
:param bucket: The bucket to load from. (templated)
:type bucket: str
:param source_objects: List of Google Cloud Storage URIs to load from. (templated)
:param source_objects: String or List of Google Cloud Storage URIs to load from. (templated)
If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
:type source_objects: list[str]
:type source_objects: str, list[str]
:param destination_project_dataset_table: The dotted
``(<project>.|<project>:)<dataset>.<table>`` BigQuery table to load data into.
If ``<project>`` is not included, project will be the project defined in
Expand Down Expand Up @@ -219,7 +219,7 @@ def __init__(
if time_partitioning is None:
time_partitioning = {}
self.bucket = bucket
self.source_objects = source_objects
self.source_objects = source_objects if isinstance(source_objects, list) else [source_objects]
self.schema_object = schema_object

# BQ config
Expand Down
73 changes: 73 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TEST_BUCKET = 'test-bucket'
MAX_ID_KEY = 'id'
TEST_SOURCE_OBJECTS = ['test/objects/*']
TEST_SOURCE_OBJECTS_AS_STRING = 'test/objects/*'
LABELS = {'k1': 'v1'}
DESCRIPTION = "Test Description"

Expand Down Expand Up @@ -216,3 +217,75 @@ def test_description_external_table(self, bq_hook):
description=DESCRIPTION,
)
# fmt: on

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook')
def test_source_objects_as_list(self, bq_hook):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
)

operator.execute(None)

bq_hook.return_value.get_conn.return_value.cursor.return_value.run_load.assert_called_once_with(
destination_project_dataset_table=mock.ANY,
schema_fields=mock.ANY,
source_uris=[f'gs://{TEST_BUCKET}/{source_object}' for source_object in TEST_SOURCE_OBJECTS],
source_format=mock.ANY,
autodetect=mock.ANY,
create_disposition=mock.ANY,
skip_leading_rows=mock.ANY,
write_disposition=mock.ANY,
field_delimiter=mock.ANY,
max_bad_records=mock.ANY,
quote_character=mock.ANY,
ignore_unknown_values=mock.ANY,
allow_quoted_newlines=mock.ANY,
allow_jagged_rows=mock.ANY,
encoding=mock.ANY,
schema_update_options=mock.ANY,
src_fmt_configs=mock.ANY,
time_partitioning=mock.ANY,
cluster_fields=mock.ANY,
encryption_configuration=mock.ANY,
labels=mock.ANY,
description=mock.ANY,
)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook')
def test_source_objects_as_string(self, bq_hook):
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS_AS_STRING,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
)

operator.execute(None)

bq_hook.return_value.get_conn.return_value.cursor.return_value.run_load.assert_called_once_with(
destination_project_dataset_table=mock.ANY,
schema_fields=mock.ANY,
source_uris=[f'gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}'],
source_format=mock.ANY,
autodetect=mock.ANY,
create_disposition=mock.ANY,
skip_leading_rows=mock.ANY,
write_disposition=mock.ANY,
field_delimiter=mock.ANY,
max_bad_records=mock.ANY,
quote_character=mock.ANY,
ignore_unknown_values=mock.ANY,
allow_quoted_newlines=mock.ANY,
allow_jagged_rows=mock.ANY,
encoding=mock.ANY,
schema_update_options=mock.ANY,
src_fmt_configs=mock.ANY,
time_partitioning=mock.ANY,
cluster_fields=mock.ANY,
encryption_configuration=mock.ANY,
labels=mock.ANY,
description=mock.ANY,
)

0 comments on commit 99d1535

Please sign in to comment.