diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 91ff3703662f8..28acc8f9494ed 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2792,6 +2792,8 @@ def execute(self, context: Any): impersonation_chain=self.impersonation_chain, ) self.hook = hook + if self.project_id is None: + self.project_id = hook.project_id self.job_id = hook.generate_job_id( job_id=self.job_id, @@ -2831,8 +2833,7 @@ def execute(self, context: Any): QueryJob._JOB_TYPE: ["destinationTable"], } - project_id = self.project_id or hook.project_id - if project_id: + if self.project_id: for job_type, tables_prop in job_types.items(): job_configuration = job.to_api_repr()["configuration"] if job_type in job_configuration: @@ -2842,7 +2843,7 @@ def execute(self, context: Any): persist_kwargs = { "context": context, "task_instance": self, - "project_id": project_id, + "project_id": self.project_id, "table_id": table, } if not isinstance(table, str): @@ -2851,11 +2852,11 @@ def execute(self, context: Any): persist_kwargs["project_id"] = table["projectId"] BigQueryTableLink.persist(**persist_kwargs) self.job_id = job.job_id - project_id = self.project_id or self.hook.project_id - if project_id: + + if self.project_id: job_id_path = convert_job_id( job_id=self.job_id, # type: ignore[arg-type] - project_id=project_id, + project_id=self.project_id, location=self.location, ) context["ti"].xcom_push(key="job_id_path", value=job_id_path) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 9aea410e78b02..64e31913fb778 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1475,6 +1475,45 @@ def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instanc exc.value.trigger, BigQueryInsertJobTrigger ), "Trigger is not a BigQueryInsertJobTrigger" + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_bigquery_insert_job_operator_async_inherits_hook_project_id_when_non_given( + self, mock_hook, create_task_instance_of_operator + ): + """ + Asserts that a deferred task of type BigQueryInsertJobTrigger will assume the project_id + of the hook that is used within the BigQueryInsertJobOperator when there is no + project_id passed to the BigQueryInsertJobOperator. + """ + job_id = "123456" + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + } + } + mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID + + ti = create_task_instance_of_operator( + BigQueryInsertJobOperator, + dag_id="dag_id", + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + deferrable=True, + project_id=None, + ) + + with pytest.raises(TaskDeferred) as exc: + ti.task.execute(MagicMock()) + + assert isinstance( + exc.value.trigger, BigQueryInsertJobTrigger + ), "Trigger is not a BigQueryInsertJobTrigger" + + assert exc.value.trigger.project_id == TEST_GCP_PROJECT_ID + def test_bigquery_insert_job_operator_execute_failure(self): """Tests that an AirflowException is raised in case of error event""" configuration = {