Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-XXX] GCS SFTP #361

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 69 additions & 0 deletions airflow/example_dags/example_gcs_to_sftp.py
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage to Google Cloud Storage transfer operators.
"""

import os

import airflow
from airflow import models
from airflow.operators.gcs_to_sftp import GoogleCloudStorageToSFTPOperator

default_args = {"start_date": airflow.utils.dates.days_ago(1)}

BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
OBJECT_SRC_1 = "parent-1.bin"
OBJECT_SRC_2 = "parent-2.bin"
OBJECT_SRC_3 = "subdir-1/*"
SFTP_PATH_1 = "/tmp/single-file/"
SFTP_PATH_2 = "/tmp/dirs/"


with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None
) as dag:
copy_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_1,
sftp_path=SFTP_PATH_1,
)

move_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(
task_id="file-move-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_2,
sftp_path=SFTP_PATH_1,
move_object=True,
)

copy_dir_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(
task_id="dir-copy-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_3,
sftp_path=SFTP_PATH_2,
)

move_dir_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(
task_id="dir-move-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC_3,
sftp_path=SFTP_PATH_2,
)
165 changes: 165 additions & 0 deletions airflow/operators/gcs_to_sftp.py
@@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google Cloud Storage to SFTP operator.
"""
import os
from tempfile import NamedTemporaryFile
from typing import Optional

from airflow import AirflowException
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

WILDCARD = "*"


class GoogleCloudStorageToSFTPOperator(BaseOperator):
"""
Transfer files from a Google Cloud Storage bucket to SFTP server.

:param source_bucket: The source Google cloud storage bucket where the
object is. (templated)
:type source_bucket: str
:param source_object: The source name of the object to copy in the Google cloud
storage bucket. (templated)
You can use only one wildcard for objects (filenames) within your
bucket. The wildcard can appear inside the object name or at the
end of the object name. Appending a wildcard to the bucket name is
unsupported.
:type source_object: str
:param sftp_path: The sftp remote path. This is the specified directory path for
uploading to the SFTP server.
:type sftp_path: str
:param move_object: When move object is True, the object is moved instead
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
:type move_object: bool
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
"""

template_fields = ("source_bucket", "source_object", "sftp_path")
ui_color = "#f0eee4"

# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
sftp_path: str,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)

self.source_bucket = source_bucket
self.source_object = source_object
self.sftp_path = sftp_path
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
self.delegate_to = delegate_to
self.sftp_dirs = None

def execute(self, context):
gcs_hook = GoogleCloudStorageHook(
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)

sftp_hook = SFTPHook(self.sftp_conn_id)

if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)

prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = gcs_hook.list(
self.source_bucket, prefix=prefix, delimiter=delimiter
)

for source_object in objects:
destination_path = os.path.join(self.sftp_path, source_object)
self._copy_single_object(
gcs_hook, sftp_hook, source_object, destination_path
)

self.log.info(
"Done. Uploaded '%d' files to %s", len(objects), self.sftp_path
)
else:
destination_path = os.path.join(self.sftp_path, self.source_object)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_object, destination_path
)
self.log.info(
"Done. Uploaded '%s' file to %s", self.source_object, destination_path
)

def _copy_single_object(
self,
gcs_hook: GoogleCloudStorageHook,
sftp_hook: SFTPHook,
source_object: str,
destination_path: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of gs://%s/%s to %s",
self.source_bucket,
source_object,
destination_path,
)

dir_path = os.path.dirname(destination_path)
sftp_hook.create_directory(dir_path)

with NamedTemporaryFile("w") as tmp:
gcs_hook.download(
bucket_name=self.source_bucket,
object_name=source_object,
filename=tmp.name,
)
sftp_hook.store_file(destination_path, tmp.name)

if self.move_object:
self.log.info(
"Executing delete of gs://%s/%s", self.source_bucket, source_object
)
gcs_hook.delete(self.source_bucket, source_object)
102 changes: 102 additions & 0 deletions airflow/operators/sftp_to_gcs.py
@@ -0,0 +1,102 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains SFTP to Google Cloud Storage operator.
"""
from tempfile import NamedTemporaryFile
from typing import Optional

from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SFTPToGoogleCloudStorageOperator(BaseOperator):
"""
Transfer files to Google Cloud Storage from SFTP server.

:param sftp_path: The sftp remote path. This is the specified file path
for downloading the file from the SFTP server.
:type sftp_path: str
:param destination_path: Destination path within the specified bucket, it must be the full file path
to destination object on GCS, including GCS object (ex. `path/to/file.txt`) (templated)
:type destination_path: str
:param bucket: The bucket to upload to. (templated)
:type bucket: str
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param google_cloud_storage_conn_id: (Deprecated) The connection ID used to connect to Google Cloud
Platform. This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
:type google_cloud_storage_conn_id: str
:param mime_type: The mime-type string
:type mime_type: str
:param delegate_to: The account to impersonate, if any
:type delegate_to: str
:param gzip: Allows for file to be compressed and uploaded as gzip
:type gzip: bool
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: str
"""

template_fields = ("sftp_path", "dst", "bucket")

@apply_defaults
def __init__(
self,
sftp_path: str,
destination_path: str,
bucket: str,
gcp_conn_id: str = "google_cloud_default",
mime_type: str = "application/octet-stream",
delegate_to: Optional[str] = None,
gzip: bool = False,
sftp_conn_id: str = "ssh_default",
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)

self.sftp_path = sftp_path
self.destination_path = destination_path
self.bucket = bucket
self.gcp_conn_id = gcp_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip
self.sftp_conn_id = sftp_conn_id

def execute(self, context):
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)

ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
sftp_client = ssh_hook.get_conn().open_sftp()

with NamedTemporaryFile("w") as f:
sftp_client.get(self.sftp_path, f.name)

gcs_hook.upload(
bucket_name=self.bucket,
object_name=self.destination_path,
mime_type=self.mime_type,
filename=f.name,
gzip=self.gzip,
)