Skip to content

Commit

Permalink
CloudTasks assets & system tests migration (AIP-47) (#23282)
Browse files Browse the repository at this point in the history
  • Loading branch information
bhirsz committed May 4, 2022
1 parent fdf1a53 commit 3977e17
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 138 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/hooks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def purge_queue(
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> List[Queue]:
) -> Queue:
"""
Purges a queue by deleting all of its tasks from Cloud Tasks.
Expand Down Expand Up @@ -333,7 +333,7 @@ def pause_queue(
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> List[Queue]:
) -> Queue:
"""
Pauses a queue in Cloud Tasks.
Expand Down Expand Up @@ -368,7 +368,7 @@ def resume_queue(
retry: Union[Retry, _MethodDefault] = DEFAULT,
timeout: Optional[float] = None,
metadata: Sequence[Tuple[str, str]] = (),
) -> List[Queue]:
) -> Queue:
"""
Resumes a queue in Cloud Tasks.
Expand Down
81 changes: 81 additions & 0 deletions airflow/providers/google/cloud/links/cloud_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Cloud Tasks links."""
from typing import TYPE_CHECKING, Optional

from airflow.models import BaseOperator
from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

CLOUD_TASKS_BASE_LINK = "https://pantheon.corp.google.com/cloudtasks"
CLOUD_TASKS_QUEUE_LINK = CLOUD_TASKS_BASE_LINK + "/queue/{location}/{queue_id}/tasks?project={project_id}"
CLOUD_TASKS_LINK = CLOUD_TASKS_BASE_LINK + "?project={project_id}"


class CloudTasksQueueLink(BaseGoogleLink):
"""Helper class for constructing Cloud Task Queue Link"""

name = "Cloud Tasks Queue"
key = "cloud_task_queue"
format_str = CLOUD_TASKS_QUEUE_LINK

@staticmethod
def extract_parts(queue_name: Optional[str]):
"""
Extract project_id, location and queue id from queue name:
projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID
"""
if not queue_name:
return "", "", ""
parts = queue_name.split("/")
return parts[1], parts[3], parts[5]

@staticmethod
def persist(
operator_instance: BaseOperator,
context: "Context",
queue_name: Optional[str],
):
project_id, location, queue_id = CloudTasksQueueLink.extract_parts(queue_name)
operator_instance.xcom_push(
context,
key=CloudTasksQueueLink.key,
value={"project_id": project_id, "location": location, "queue_id": queue_id},
)


class CloudTasksLink(BaseGoogleLink):
"""Helper class for constructing Cloud Task Link"""

name = "Cloud Tasks"
key = "cloud_task"
format_str = CLOUD_TASKS_LINK

@staticmethod
def persist(
operator_instance: BaseOperator,
context: "Context",
project_id: Optional[str],
):
operator_instance.xcom_push(
context,
key=CloudTasksLink.key,
value={"project_id": project_id},
)
69 changes: 68 additions & 1 deletion airflow/providers/google/cloud/operators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.tasks import CloudTasksHook
from airflow.providers.google.cloud.links.cloud_tasks import CloudTasksLink, CloudTasksQueueLink

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -82,6 +83,7 @@ class CloudTasksQueueCreateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -134,7 +136,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)

CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -186,6 +192,7 @@ class CloudTasksQueueUpdateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -229,6 +236,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -270,6 +282,7 @@ class CloudTasksQueueGetOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -307,6 +320,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -349,6 +367,7 @@ class CloudTasksQueuesListOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksLink(),)

def __init__(
self,
Expand Down Expand Up @@ -389,6 +408,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksLink.persist(
operator_instance=self,
context=context,
project_id=self.project_id or hook.project_id,
)
return [Queue.to_dict(q) for q in queues]


Expand Down Expand Up @@ -505,6 +529,7 @@ class CloudTasksQueuePurgeOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -542,6 +567,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -583,6 +613,7 @@ class CloudTasksQueuePauseOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -620,6 +651,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -661,6 +697,7 @@ class CloudTasksQueueResumeOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -698,6 +735,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=queue.name,
)
return Queue.to_dict(queue)


Expand Down Expand Up @@ -747,6 +789,7 @@ class CloudTasksTaskCreateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -793,6 +836,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=task.name,
)
return Task.to_dict(task)


Expand Down Expand Up @@ -838,6 +886,7 @@ class CloudTasksTaskGetOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -881,6 +930,11 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=task.name,
)
return Task.to_dict(task)


Expand Down Expand Up @@ -926,6 +980,7 @@ class CloudTasksTasksListOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -969,6 +1024,12 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=f"projects/{self.project_id or hook.project_id}/"
f"locations/{self.location}/queues/{self.queue_name}",
)
return [Task.to_dict(t) for t in tasks]


Expand Down Expand Up @@ -1094,6 +1155,7 @@ class CloudTasksTaskRunOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
operator_extra_links = (CloudTasksQueueLink(),)

def __init__(
self,
Expand Down Expand Up @@ -1137,4 +1199,9 @@ def execute(self, context: 'Context'):
timeout=self.timeout,
metadata=self.metadata,
)
CloudTasksQueueLink.persist(
operator_instance=self,
context=context,
queue_name=task.name,
)
return Task.to_dict(task)
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,8 @@ extra-links:
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
- airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
- airflow.providers.google.cloud.links.cloud_tasks.CloudTasksQueueLink
- airflow.providers.google.cloud.links.cloud_tasks.CloudTasksLink
- airflow.providers.google.cloud.links.dataproc.DataprocLink
- airflow.providers.google.cloud.links.dataproc.DataprocListLink
- airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDetailedLink
Expand Down
Loading

0 comments on commit 3977e17

Please sign in to comment.