Skip to content

Commit

Permalink
GCSToBigQueryOperator allows autodetect None and infers schema (#28564)
Browse files Browse the repository at this point in the history
* GCSToBigQueryOperator allows autodetect None and infers schema
  • Loading branch information
vchiapaikeo committed Dec 24, 2022
1 parent 3df204b commit d7f5f6d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class GCSToBigQueryOperator(BaseOperator):
options and schema for CSV and JSON sources. (Default: ``True``).
Parameter must be set to True if 'schema_fields' and 'schema_object' are undefined.
It is suggested to set to True if table are create outside of Airflow.
If autodetect is None and no schema is provided (neither via schema_fields
nor a schema_object), assume the table already exists.
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
**Example**: ::
Expand Down Expand Up @@ -337,7 +339,10 @@ def execute(self, context: Context):
self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

if not self.schema_fields:
if not self.schema_object and not self.autodetect:
# Check for self.autodetect explicitly False. self.autodetect equal to None
# entails we do not want to detect schema from files. Instead, it means we
# rely on an already existing table's schema
if not self.schema_object and self.autodetect is False:
raise AirflowException(
"Table schema was not found. Neither schema object nor schema fields were specified"
)
Expand Down
95 changes: 95 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,101 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_
bq_hook.return_value.insert_job.assert_has_calls(calls)
gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_autodetect_none_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
]
hook.return_value.generate_job_id.return_value = pytest.real_job_id
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)
operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
write_disposition=WRITE_DISPOSITION,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
external_table=True,
autodetect=None,
)

operator.execute(context=MagicMock())

hook.return_value.create_empty_table.assert_called_once_with(
exists_ok=True,
location=None,
project_id=PROJECT_ID,
table_resource={
"tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE},
"labels": {},
"externalDataConfiguration": {
"autodetect": None,
"sourceFormat": "CSV",
"sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"compression": "NONE",
"ignoreUnknownValues": False,
"csvOptions": {
"skipLeadingRows": None,
"fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"allowJaggedRows": False,
"encoding": "UTF-8",
},
},
},
)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_autodetect_none_without_external_table_should_execute_successfully(self, hook):
hook.return_value.insert_job.side_effect = [
MagicMock(job_id=pytest.real_job_id, error_result=False),
pytest.real_job_id,
]
hook.return_value.generate_job_id.return_value = pytest.real_job_id
hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE)

operator = GCSToBigQueryOperator(
task_id=TASK_ID,
bucket=TEST_BUCKET,
source_objects=TEST_SOURCE_OBJECTS,
destination_project_dataset_table=TEST_EXPLICIT_DEST,
write_disposition=WRITE_DISPOSITION,
autodetect=None,
external_table=False,
)

operator.execute(context=MagicMock())

calls = [
call(
configuration={
"load": {
"autodetect": None,
"createDisposition": "CREATE_IF_NEEDED",
"destinationTable": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE},
"sourceFormat": "CSV",
"sourceUris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS}"],
"writeDisposition": "WRITE_TRUNCATE",
"ignoreUnknownValues": False,
"skipLeadingRows": None,
"fieldDelimiter": ",",
"quote": None,
"allowQuotedNewlines": False,
"encoding": "UTF-8",
}
},
project_id=hook.return_value.project_id,
location=None,
job_id=pytest.real_job_id,
timeout=None,
retry=DEFAULT_RETRY,
nowait=True,
)
]

hook.return_value.insert_job.assert_has_calls(calls)

@mock.patch(GCS_TO_BQ_PATH.format("BigQueryHook"))
def test_execute_should_throw_ex_when_no_bucket_specified(self, hook):
hook.return_value.insert_job.side_effect = [
Expand Down

0 comments on commit d7f5f6d

Please sign in to comment.