From 11564a0297e3a4791836e26b260ce9428daa387e Mon Sep 17 00:00:00 2001 From: synsh <14250910+synsh@users.noreply.github.com> Date: Sun, 4 Feb 2024 05:09:50 +0530 Subject: [PATCH] Fixed the hardcoded default namespace value for GCP Data Fusion links. (#35379) * update * Add backward compatibility and cleanup * fix formatting --- .../providers/google/cloud/links/datafusion.py | 17 ++++++++++++----- .../google/cloud/operators/datafusion.py | 10 +++++++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/airflow/providers/google/cloud/links/datafusion.py b/airflow/providers/google/cloud/links/datafusion.py index 00afd700bdb2f..c31cf09b42e65 100644 --- a/airflow/providers/google/cloud/links/datafusion.py +++ b/airflow/providers/google/cloud/links/datafusion.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains Google Compute Engine links.""" +"""This module contains Google Data Fusion links.""" from __future__ import annotations from typing import TYPE_CHECKING, ClassVar @@ -30,8 +30,8 @@ BASE_LINK = "https://console.cloud.google.com/data-fusion" DATAFUSION_INSTANCE_LINK = BASE_LINK + "/locations/{region}/instances/{instance_name}?project={project_id}" -DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/default/pipelines" -DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/default/view/{pipeline_name}" +DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/{namespace}/pipelines" +DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/{namespace}/view/{pipeline_name}" class BaseGoogleLink(BaseOperatorLink): @@ -52,10 +52,13 @@ def get_link( ti_key: TaskInstanceKey, ) -> str: conf = XCom.get_value(key=self.key, ti_key=ti_key) + if not conf: return "" - if self.format_str.startswith("http"): - return self.format_str.format(**conf) + + # Add a default value for the 'namespace' parameter for backward compatibility. + conf.setdefault("namespace", "default") + return self.format_str.format(**conf) @@ -98,6 +101,7 @@ def persist( task_instance: BaseOperator, uri: str, pipeline_name: str, + namespace: str, ): task_instance.xcom_push( context=context, @@ -105,6 +109,7 @@ def persist( value={ "uri": uri, "pipeline_name": pipeline_name, + "namespace": namespace, }, ) @@ -121,11 +126,13 @@ def persist( context: Context, task_instance: BaseOperator, uri: str, + namespace: str, ): task_instance.xcom_push( context=context, key=DataFusionPipelinesLink.key, value={ "uri": uri, + "namespace": namespace, }, ) diff --git a/airflow/providers/google/cloud/operators/datafusion.py b/airflow/providers/google/cloud/operators/datafusion.py index 4f62b82407401..bba0206c38acd 100644 --- a/airflow/providers/google/cloud/operators/datafusion.py +++ b/airflow/providers/google/cloud/operators/datafusion.py @@ -537,6 +537,7 @@ def execute(self, context: Context) -> None: task_instance=self, uri=instance["serviceEndpoint"], pipeline_name=self.pipeline_name, + namespace=self.namespace, ) self.log.info("Pipeline %s created", self.pipeline_name) @@ -705,7 +706,12 @@ def execute(self, context: Context) -> dict: ) self.log.info("Pipelines: %s", pipelines) - DataFusionPipelinesLink.persist(context=context, task_instance=self, uri=service_endpoint) + DataFusionPipelinesLink.persist( + context=context, + task_instance=self, + uri=service_endpoint, + namespace=self.namespace, + ) return pipelines @@ -825,6 +831,7 @@ def execute(self, context: Context) -> str: task_instance=self, uri=instance["serviceEndpoint"], pipeline_name=self.pipeline_name, + namespace=self.namespace, ) if self.deferrable: @@ -954,6 +961,7 @@ def execute(self, context: Context) -> None: task_instance=self, uri=instance["serviceEndpoint"], pipeline_name=self.pipeline_name, + namespace=self.namespace, ) hook.stop_pipeline( pipeline_name=self.pipeline_name,