diff --git a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py index 85c32a540dc8e..c00dc2cebc502 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/bigquery.py @@ -2340,7 +2340,12 @@ def prepare_template(self) -> None: with open(self.configuration) as file: self.configuration = json.loads(file.read()) - def _add_job_labels(self) -> None: + def _add_job_labels(self, hook: BigQueryHook | None = None) -> None: + if hook and hook.labels and "labels" not in self.configuration: + self.configuration["labels"] = dict(hook.labels) + elif hook and hook.labels and isinstance(self.configuration.get("labels"), dict): + self.configuration["labels"] = {**hook.labels, **self.configuration["labels"]} + dag_label = self.dag_id.lower() task_label = self.task_id.lower().replace(".", "-") @@ -2409,6 +2414,8 @@ def execute(self, context: Any): if self.project_id is None: self.project_id = hook.project_id + self._add_job_labels(hook) + # Handles Operator retries when a user does not explicitly set a job_id. # For example, if a previous job failed due to a 429 "Too Many Requests" error, # the Operator will retry and resubmit the job. We need to ensure we don't lose diff --git a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py index 2655132009bb9..70427e794e63b 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_bigquery.py +++ b/providers/google/tests/unit/google/cloud/operators/test_bigquery.py @@ -1890,6 +1890,7 @@ def test_execute_respects_explicit_no_labels(self, mock_hook): }, "labels": None, } + mock_hook.return_value.labels = {"airflow-env": "test-env-2"} mock_hook.return_value.insert_job.return_value = MagicMock( state="DONE", job_id=real_job_id, error_result=False ) @@ -1905,6 +1906,57 @@ def test_execute_respects_explicit_no_labels(self, mock_hook): op.execute(context=MagicMock()) assert configuration["labels"] is None + @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") + def test_execute_inherits_labels_from_connection(self, mock_hook): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + }, + } + mock_hook.return_value.labels = {"airflow-env": "test-env-2"} + mock_hook.return_value.insert_job.return_value = MagicMock( + state="DONE", job_id=real_job_id, error_result=False + ) + mock_hook.return_value.generate_job_id.return_value = real_job_id + + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + ) + op.execute(context=MagicMock()) + + assert configuration["labels"]["airflow-env"] == "test-env-2" + assert configuration["labels"]["airflow-task"] == "insert_query_job" + + def test_add_job_labels_merges_connection_labels(self): + configuration = { + "query": { + "query": "SELECT * FROM any", + "useLegacySql": False, + }, + "labels": {"manual-label": "manual-value"}, + } + op = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + project_id=TEST_GCP_PROJECT_ID, + ) + + op._add_job_labels(hook=MagicMock(labels={"airflow-env": "test-env-2"})) + + assert configuration["labels"]["airflow-env"] == "test-env-2" + assert configuration["labels"]["manual-label"] == "manual-value" + assert configuration["labels"]["airflow-task"] == "insert_query_job" + def test_task_label_too_big(self): configuration = { "query": {