Skip to content

Commit

Permalink
Refactor of links in Dataproc. (#31895)
Browse files Browse the repository at this point in the history
Co-authored-by: Beata Kossakowska <bkossakowska@google.com>
  • Loading branch information
bkossakowska and Beata Kossakowska committed Aug 3, 2023
1 parent baa1bc0 commit 1ea7ae8
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 117 deletions.
165 changes: 157 additions & 8 deletions airflow/providers/google/cloud/links/dataproc.py
Expand Up @@ -18,32 +18,51 @@
"""This module contains Google Dataproc links."""
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperatorLink, XCom
from airflow.providers.google.cloud.links.base import BASE_LINK
from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

DATAPROC_BASE_LINK = BASE_LINK + "/dataproc"
DATAPROC_JOB_LOG_LINK = DATAPROC_BASE_LINK + "/jobs/{resource}?region={region}&project={project_id}"
DATAPROC_JOB_LINK = DATAPROC_BASE_LINK + "/jobs/{job_id}?region={region}&project={project_id}"

DATAPROC_CLUSTER_LINK = (
DATAPROC_BASE_LINK + "/clusters/{resource}/monitoring?region={region}&project={project_id}"
DATAPROC_BASE_LINK + "/clusters/{cluster_id}/monitoring?region={region}&project={project_id}"
)
DATAPROC_WORKFLOW_TEMPLATE_LINK = (
DATAPROC_BASE_LINK + "/workflows/templates/{region}/{resource}?project={project_id}"
DATAPROC_BASE_LINK + "/workflows/templates/{region}/{workflow_template_id}?project={project_id}"
)
DATAPROC_WORKFLOW_LINK = DATAPROC_BASE_LINK + "/workflows/instances/{region}/{resource}?project={project_id}"
DATAPROC_BATCH_LINK = DATAPROC_BASE_LINK + "/batches/{region}/{resource}/monitoring?project={project_id}"
DATAPROC_WORKFLOW_LINK = (
DATAPROC_BASE_LINK + "/workflows/instances/{region}/{workflow_id}?project={project_id}"
)

DATAPROC_BATCH_LINK = DATAPROC_BASE_LINK + "/batches/{region}/{batch_id}/monitoring?project={project_id}"
DATAPROC_BATCHES_LINK = DATAPROC_BASE_LINK + "/batches?project={project_id}"
DATAPROC_JOB_LINK_DEPRECATED = DATAPROC_BASE_LINK + "/jobs/{resource}?region={region}&project={project_id}"
DATAPROC_CLUSTER_LINK_DEPRECATED = (
DATAPROC_BASE_LINK + "/clusters/{resource}/monitoring?region={region}&project={project_id}"
)


class DataprocLink(BaseOperatorLink):
"""Helper class for constructing Dataproc resource link."""
"""
Helper class for constructing Dataproc resource link.
.. warning::
This link is deprecated.
"""

warnings.warn(
"This DataprocLink is deprecated.",
AirflowProviderDeprecationWarning,
)
name = "Dataproc resource"
key = "conf"

Expand Down Expand Up @@ -82,8 +101,14 @@ def get_link(


class DataprocListLink(BaseOperatorLink):
"""Helper class for constructing list of Dataproc resources link."""
"""
Helper class for constructing list of Dataproc resources link.
.. warning::
This link is deprecated.
"""

warnings.warn("This DataprocListLink is deprecated.", AirflowProviderDeprecationWarning)
name = "Dataproc resources"
key = "list_conf"

Expand Down Expand Up @@ -116,3 +141,127 @@ def get_link(
if list_conf
else ""
)


class DataprocClusterLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Cluster Link."""

name = "Dataproc Cluster"
key = "dataproc_cluster"
format_str = DATAPROC_CLUSTER_LINK

@staticmethod
def persist(
context: Context,
operator: BaseOperator,
cluster_id: str,
region: str,
project_id: str,
):
operator.xcom_push(
context,
key=DataprocClusterLink.key,
value={"cluster_id": cluster_id, "region": region, "project_id": project_id},
)


class DataprocJobLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Job Link."""

name = "Dataproc Job"
key = "dataproc_job"
format_str = DATAPROC_JOB_LINK

@staticmethod
def persist(
context: Context,
operator: BaseOperator,
job_id: str,
region: str,
project_id: str,
):
operator.xcom_push(
context,
key=DataprocJobLink.key,
value={"job_id": job_id, "region": region, "project_id": project_id},
)


class DataprocWorkflowLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Workflow Link."""

name = "Dataproc Workflow"
key = "dataproc_workflow"
format_str = DATAPROC_WORKFLOW_LINK

@staticmethod
def persist(context: Context, operator: BaseOperator, workflow_id: str, project_id: str, region: str):
operator.xcom_push(
context,
key=DataprocWorkflowLink.key,
value={"workflow_id": workflow_id, "region": region, "project_id": project_id},
)


class DataprocWorkflowTemplateLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Workflow Template Link."""

name = "Dataproc Workflow Template"
key = "dataproc_workflow_template"
format_str = DATAPROC_WORKFLOW_TEMPLATE_LINK

@staticmethod
def persist(
context: Context,
operator: BaseOperator,
workflow_template_id: str,
project_id: str,
region: str,
):
operator.xcom_push(
context,
key=DataprocWorkflowTemplateLink.key,
value={"workflow_template_id": workflow_template_id, "region": region, "project_id": project_id},
)


class DataprocBatchLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Batch Link."""

name = "Dataproc Batch"
key = "dataproc_batch"
format_str = DATAPROC_BATCH_LINK

@staticmethod
def persist(
context: Context,
operator: BaseOperator,
batch_id: str,
project_id: str,
region: str,
):
operator.xcom_push(
context,
key=DataprocBatchLink.key,
value={"batch_id": batch_id, "region": region, "project_id": project_id},
)


class DataprocBatchesListLink(BaseGoogleLink):
"""Helper class for constructing Dataproc Batches List Link."""

name = "Dataproc Batches List"
key = "dataproc_batches_list"
format_str = DATAPROC_BATCHES_LINK

@staticmethod
def persist(
context: Context,
operator: BaseOperator,
project_id: str,
):
operator.xcom_push(
context,
key=DataprocBatchesListLink.key,
value={"project_id": project_id},
)

0 comments on commit 1ea7ae8

Please sign in to comment.