Skip to content

Commit

Permalink
fix(bigquery.py): pass correct project_id to triggerer (#35200)
Browse files Browse the repository at this point in the history
  • Loading branch information
mokshasoul committed Dec 17, 2023
1 parent 3b1aaf1 commit ffb003a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
13 changes: 7 additions & 6 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -2792,6 +2792,8 @@ def execute(self, context: Any):
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
if self.project_id is None:
self.project_id = hook.project_id

self.job_id = hook.generate_job_id(
job_id=self.job_id,
Expand Down Expand Up @@ -2831,8 +2833,7 @@ def execute(self, context: Any):
QueryJob._JOB_TYPE: ["destinationTable"],
}

project_id = self.project_id or hook.project_id
if project_id:
if self.project_id:
for job_type, tables_prop in job_types.items():
job_configuration = job.to_api_repr()["configuration"]
if job_type in job_configuration:
Expand All @@ -2842,7 +2843,7 @@ def execute(self, context: Any):
persist_kwargs = {
"context": context,
"task_instance": self,
"project_id": project_id,
"project_id": self.project_id,
"table_id": table,
}
if not isinstance(table, str):
Expand All @@ -2851,11 +2852,11 @@ def execute(self, context: Any):
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
project_id = self.project_id or self.hook.project_id
if project_id:

if self.project_id:
job_id_path = convert_job_id(
job_id=self.job_id, # type: ignore[arg-type]
project_id=project_id,
project_id=self.project_id,
location=self.location,
)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
Expand Down
39 changes: 39 additions & 0 deletions tests/providers/google/cloud/operators/test_bigquery.py
Expand Up @@ -1475,6 +1475,45 @@ def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instanc
exc.value.trigger, BigQueryInsertJobTrigger
), "Trigger is not a BigQueryInsertJobTrigger"

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_insert_job_operator_async_inherits_hook_project_id_when_non_given(
self, mock_hook, create_task_instance_of_operator
):
"""
Asserts that a deferred task of type BigQueryInsertJobTrigger will assume the project_id
of the hook that is used within the BigQueryInsertJobOperator when there is no
project_id passed to the BigQueryInsertJobOperator.
"""
job_id = "123456"

configuration = {
"query": {
"query": "SELECT * FROM any",
"useLegacySql": False,
}
}
mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID

ti = create_task_instance_of_operator(
BigQueryInsertJobOperator,
dag_id="dag_id",
task_id="insert_query_job",
configuration=configuration,
location=TEST_DATASET_LOCATION,
job_id=job_id,
deferrable=True,
project_id=None,
)

with pytest.raises(TaskDeferred) as exc:
ti.task.execute(MagicMock())

assert isinstance(
exc.value.trigger, BigQueryInsertJobTrigger
), "Trigger is not a BigQueryInsertJobTrigger"

assert exc.value.trigger.project_id == TEST_GCP_PROJECT_ID

def test_bigquery_insert_job_operator_execute_failure(self):
"""Tests that an AirflowException is raised in case of error event"""
configuration = {
Expand Down

0 comments on commit ffb003a

Please sign in to comment.