Skip to content

Commit

Permalink
Cloud Storage Transfer Operators assets & system tests migration (AIP…
Browse files Browse the repository at this point in the history
…-47) (#26072)
  • Loading branch information
bkossakowska committed Sep 21, 2022
1 parent cf73cb7 commit e61d823
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 106 deletions.
Expand Up @@ -164,8 +164,8 @@ def create_transfer_job(self, body: dict) -> dict:
:rtype: dict
"""
body = self._inject_project_id(body, BODY, PROJECT_ID)
try:

try:
transfer_job = (
self.get_conn().transferJobs().create(body=body).execute(num_retries=self.num_retries)
)
Expand Down
127 changes: 127 additions & 0 deletions airflow/providers/google/cloud/links/cloud_storage_transfer.py
@@ -0,0 +1,127 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Storage Transfer Service links."""

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.providers.google.cloud.links.base import BaseGoogleLink

if TYPE_CHECKING:
from airflow.utils.context import Context

CLOUD_STORAGE_TRANSFER_BASE_LINK = "https://console.cloud.google.com/transfer"

CLOUD_STORAGE_TRANSFER_LIST_LINK = CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs?project={project_id}"

CLOUD_STORAGE_TRANSFER_JOB_LINK = (
CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs/transferJobs%2F{transfer_job}/runs?project={project_id}"
)

CLOUD_STORAGE_TRANSFER_OPERATION_LINK = (
CLOUD_STORAGE_TRANSFER_BASE_LINK
+ "/jobs/transferJobs%2F{transfer_job}/runs/transferOperations%2F{transfer_operation}"
+ "?project={project_id}"
)


class CloudStorageTransferLinkHelper:
"""Helper class for Storage Transfer links"""

@staticmethod
def extract_parts(operation_name: str | None):
if not operation_name:
return "", ""
transfer_operation = operation_name.split("/")[1]
transfer_job = operation_name.split("-")[1]
return transfer_operation, transfer_job


class CloudStorageTransferListLink(BaseGoogleLink):
"""Helper class for constructing Cloud Storage Transfer Link"""

name = "Cloud Storage Transfer"
key = "cloud_storage_transfer"
format_str = CLOUD_STORAGE_TRANSFER_LIST_LINK

@staticmethod
def persist(
context: Context,
task_instance,
project_id: str,
):
task_instance.xcom_push(
context,
key=CloudStorageTransferListLink.key,
value={"project_id": project_id},
)


class CloudStorageTransferJobLink(BaseGoogleLink):
"""Helper class for constructing Storage Transfer Job Link"""

name = "Cloud Storage Transfer Job"
key = "cloud_storage_transfer_job"
format_str = CLOUD_STORAGE_TRANSFER_JOB_LINK

@staticmethod
def persist(
task_instance,
context: Context,
project_id: str,
job_name: str,
):

job_name = job_name.split('/')[1] if job_name else ""

task_instance.xcom_push(
context,
key=CloudStorageTransferJobLink.key,
value={
"project_id": project_id,
"transfer_job": job_name,
},
)


class CloudStorageTransferDetailsLink(BaseGoogleLink):
"""Helper class for constructing Cloud Storage Transfer Operation Link"""

name = "Cloud Storage Transfer Details"
key = "cloud_storage_transfer_details"
format_str = CLOUD_STORAGE_TRANSFER_OPERATION_LINK

@staticmethod
def persist(
task_instance,
context: Context,
project_id: str,
operation_name: str,
):
transfer_operation, transfer_job = CloudStorageTransferLinkHelper.extract_parts(operation_name)

task_instance.xcom_push(
context,
key=CloudStorageTransferDetailsLink.key,
value={
"project_id": project_id,
"transfer_job": transfer_job,
"transfer_operation": transfer_operation,
},
)
Expand Up @@ -55,6 +55,11 @@
CloudDataTransferServiceHook,
GcpTransferJobsStatus,
)
from airflow.providers.google.cloud.links.cloud_storage_transfer import (
CloudStorageTransferDetailsLink,
CloudStorageTransferJobLink,
CloudStorageTransferListLink,
)
from airflow.providers.google.cloud.utils.helpers import normalize_directory_path

if TYPE_CHECKING:
Expand Down Expand Up @@ -218,6 +223,7 @@ class CloudDataTransferServiceCreateJobOperator(BaseOperator):
'google_impersonation_chain',
)
# [END gcp_transfer_job_create_template_fields]
operator_extra_links = (CloudStorageTransferJobLink(),)

def __init__(
self,
Expand All @@ -226,6 +232,7 @@ def __init__(
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
project_id: str | None = None,
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
Expand All @@ -234,6 +241,7 @@ def __init__(
self.aws_conn_id = aws_conn_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.project_id = project_id
self.google_impersonation_chain = google_impersonation_chain
self._validate_inputs()

Expand All @@ -247,7 +255,18 @@ def execute(self, context: Context) -> dict:
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.google_impersonation_chain,
)
return hook.create_transfer_job(body=self.body)
result = hook.create_transfer_job(body=self.body)

project_id = self.project_id or hook.project_id
if project_id:
CloudStorageTransferJobLink.persist(
context=context,
task_instance=self,
project_id=project_id,
job_name=result[NAME],
)

return result


class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
Expand Down Expand Up @@ -291,6 +310,7 @@ class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
'google_impersonation_chain',
)
# [END gcp_transfer_job_update_template_fields]
operator_extra_links = (CloudStorageTransferJobLink(),)

def __init__(
self,
Expand All @@ -300,12 +320,14 @@ def __init__(
aws_conn_id: str = 'aws_default',
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
project_id: str | None = None,
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.job_name = job_name
self.body = body
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.aws_conn_id = aws_conn_id
Expand All @@ -324,6 +346,16 @@ def execute(self, context: Context) -> dict:
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.google_impersonation_chain,
)

project_id = self.project_id or hook.project_id
if project_id:
CloudStorageTransferJobLink.persist(
context=context,
task_instance=self,
project_id=project_id,
job_name=self.job_name,
)

return hook.update_transfer_job(job_name=self.job_name, body=self.body)


Expand Down Expand Up @@ -426,10 +458,12 @@ class CloudDataTransferServiceGetOperationOperator(BaseOperator):
'google_impersonation_chain',
)
# [END gcp_transfer_operation_get_template_fields]
operator_extra_links = (CloudStorageTransferDetailsLink(),)

def __init__(
self,
*,
project_id: str | None = None,
operation_name: str,
gcp_conn_id: str = "google_cloud_default",
api_version: str = "v1",
Expand All @@ -438,6 +472,7 @@ def __init__(
) -> None:
super().__init__(**kwargs)
self.operation_name = operation_name
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.google_impersonation_chain = google_impersonation_chain
Expand All @@ -454,6 +489,16 @@ def execute(self, context: Context) -> dict:
impersonation_chain=self.google_impersonation_chain,
)
operation = hook.get_transfer_operation(operation_name=self.operation_name)

project_id = self.project_id or hook.project_id
if project_id:
CloudStorageTransferDetailsLink.persist(
context=context,
task_instance=self,
project_id=project_id,
operation_name=self.operation_name,
)

return operation


Expand Down Expand Up @@ -488,10 +533,12 @@ class CloudDataTransferServiceListOperationsOperator(BaseOperator):
'google_impersonation_chain',
)
# [END gcp_transfer_operations_list_template_fields]
operator_extra_links = (CloudStorageTransferListLink(),)

def __init__(
self,
request_filter: dict | None = None,
project_id: str | None = None,
gcp_conn_id: str = 'google_cloud_default',
api_version: str = 'v1',
google_impersonation_chain: str | Sequence[str] | None = None,
Expand All @@ -508,6 +555,7 @@ def __init__(

super().__init__(**kwargs)
self.filter = request_filter
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self.google_impersonation_chain = google_impersonation_chain
Expand All @@ -525,6 +573,15 @@ def execute(self, context: Context) -> list[dict]:
)
operations_list = hook.list_transfer_operations(request_filter=self.filter)
self.log.info(operations_list)

project_id = self.project_id or hook.project_id
if project_id:
CloudStorageTransferListLink.persist(
context=context,
task_instance=self,
project_id=project_id,
)

return operations_list


Expand Down
Expand Up @@ -26,6 +26,7 @@
NAME,
CloudDataTransferServiceHook,
)
from airflow.providers.google.cloud.links.cloud_storage_transfer import CloudStorageTransferJobLink
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
Expand Down Expand Up @@ -65,6 +66,7 @@ class CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
'impersonation_chain',
)
# [END gcp_transfer_job_sensor_template_fields]
operator_extra_links = (CloudStorageTransferJobLink(),)

def __init__(
self,
Expand Down Expand Up @@ -103,4 +105,13 @@ def poke(self, context: Context) -> bool:
if check:
self.xcom_push(key="sensed_operations", value=operations, context=context)

project_id = self.project_id or hook.project_id
if project_id:
CloudStorageTransferJobLink.persist(
context=context,
task_instance=self,
project_id=project_id,
job_name=self.job_name,
)

return check
3 changes: 3 additions & 0 deletions airflow/providers/google/provider.yaml
Expand Up @@ -1017,6 +1017,9 @@ extra-links:
- airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsDetailsLink
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsListLink
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferListLink
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferJobLink
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferDetailsLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink

Expand Down
Expand Up @@ -58,7 +58,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
:language: python
:start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
:end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]
Expand Down Expand Up @@ -138,12 +138,12 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
:language: python
:start-after: [START howto_operator_gcp_transfer_update_job_body]
:end-before: [END howto_operator_gcp_transfer_update_job_body]

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_update_job]
Expand Down

0 comments on commit e61d823

Please sign in to comment.