diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 30964f4af8798..7f9c38c515a61 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -240,16 +240,18 @@ def execute(self, context: Context): ) job = self._submit_job(hook, job_id="") context["ti"].xcom_push(key="job_id", value=job.job_id) - self.defer( - timeout=self.execution_timeout, - trigger=BigQueryCheckTrigger( - conn_id=self.gcp_conn_id, - job_id=job.job_id, - project_id=hook.project_id, - poll_interval=self.poll_interval, - ), - method_name="execute_complete", - ) + if job.running(): + self.defer( + timeout=self.execution_timeout, + trigger=BigQueryCheckTrigger( + conn_id=self.gcp_conn_id, + job_id=job.job_id, + project_id=hook.project_id, + poll_interval=self.poll_interval, + ), + method_name="execute_complete", + ) + self.log.info("Current state of job %s is %s", job.job_id, job.state) def execute_complete(self, context: Context, event: dict[str, Any]) -> None: """ diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 4b6d31839f144..25b341a4c38a5 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -1595,6 +1595,32 @@ def test_bigquery_interval_check_operator_async(self, mock_hook, create_task_ins class TestBigQueryCheckOperator: + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.execute") + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.defer") + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_bigquery_check_operator_async_finish_before_deferred( + self, mock_hook, mock_defer, mock_execute, create_task_instance_of_operator + ): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + + mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False) + mock_hook.return_value.insert_job.return_value.running.return_value = False + + ti = create_task_instance_of_operator( + BigQueryCheckOperator, + dag_id="dag_id", + task_id="bq_check_operator_job", + sql="SELECT * FROM any", + location=TEST_DATASET_LOCATION, + deferrable=True, + ) + + ti.task.execute(MagicMock()) + assert not mock_defer.called + assert mock_execute.called + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_check_operator_async(self, mock_hook, create_task_instance_of_operator): """