diff --git a/airflow/example_dags/example_gcs_to_sftp.py b/airflow/example_dags/example_gcs_to_sftp.py new file mode 100644 index 0000000000000..1e5f49f30b35d --- /dev/null +++ b/airflow/example_dags/example_gcs_to_sftp.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Storage to Google Cloud Storage transfer operators. +""" + +import os + +import airflow +from airflow import models +from airflow.operators.gcs_to_sftp import GoogleCloudStorageToSFTPOperator + +default_args = {"start_date": airflow.utils.dates.days_ago(1)} + +BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp") +OBJECT_SRC_1 = "parent-1.bin" +OBJECT_SRC_2 = "parent-2.bin" +OBJECT_SRC_3 = "subdir-1/*" +SFTP_PATH_1 = "/tmp/single-file/" +SFTP_PATH_2 = "/tmp/dirs/" + + +with models.DAG( + "example_gcs_to_sftp", default_args=default_args, schedule_interval=None +) as dag: + copy_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator( + task_id="file-copy-gsc-to-sftp", + source_bucket=BUCKET_SRC, + source_object=OBJECT_SRC_1, + sftp_path=SFTP_PATH_1, + ) + + move_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator( + task_id="file-move-gsc-to-sftp", + source_bucket=BUCKET_SRC, + source_object=OBJECT_SRC_2, + sftp_path=SFTP_PATH_1, + move_object=True, + ) + + copy_dir_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator( + task_id="dir-copy-gsc-to-sftp", + source_bucket=BUCKET_SRC, + source_object=OBJECT_SRC_3, + sftp_path=SFTP_PATH_2, + ) + + move_dir_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator( + task_id="dir-move-gsc-to-sftp", + source_bucket=BUCKET_SRC, + source_object=OBJECT_SRC_3, + sftp_path=SFTP_PATH_2, + ) diff --git a/airflow/operators/gcs_to_sftp.py b/airflow/operators/gcs_to_sftp.py new file mode 100644 index 0000000000000..dd0dee884fd04 --- /dev/null +++ b/airflow/operators/gcs_to_sftp.py @@ -0,0 +1,165 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This module contains Google Cloud Storage to SFTP operator. +""" +import os +from tempfile import NamedTemporaryFile +from typing import Optional + +from airflow import AirflowException +from airflow.contrib.hooks.sftp_hook import SFTPHook +from airflow.gcp.hooks.gcs import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + +WILDCARD = "*" + + +class GoogleCloudStorageToSFTPOperator(BaseOperator): + """ + Transfer files from a Google Cloud Storage bucket to SFTP server. + + :param source_bucket: The source Google cloud storage bucket where the + object is. (templated) + :type source_bucket: str + :param source_object: The source name of the object to copy in the Google cloud + storage bucket. (templated) + You can use only one wildcard for objects (filenames) within your + bucket. The wildcard can appear inside the object name or at the + end of the object name. Appending a wildcard to the bucket name is + unsupported. + :type source_object: str + :param sftp_path: The sftp remote path. This is the specified directory path for + uploading to the SFTP server. + :type sftp_path: str + :param move_object: When move object is True, the object is moved instead + of copied to the new location. This is the equivalent of a mv command + as opposed to a cp command. + :type move_object: bool + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param sftp_conn_id: The sftp connection id. The name or identifier for + establishing a connection to the SFTP server. + :type sftp_conn_id: str + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + """ + + template_fields = ("source_bucket", "source_object", "sftp_path") + ui_color = "#f0eee4" + + # pylint: disable=too-many-arguments + @apply_defaults + def __init__( + self, + source_bucket: str, + source_object: str, + sftp_path: str, + move_object: bool = False, + gcp_conn_id: str = "google_cloud_default", + sftp_conn_id: str = "ssh_default", + delegate_to: Optional[str] = None, + *args, + **kwargs + ) -> None: + super().__init__(*args, **kwargs) + + self.source_bucket = source_bucket + self.source_object = source_object + self.sftp_path = sftp_path + self.move_object = move_object + self.gcp_conn_id = gcp_conn_id + self.sftp_conn_id = sftp_conn_id + self.delegate_to = delegate_to + self.sftp_dirs = None + + def execute(self, context): + gcs_hook = GoogleCloudStorageHook( + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to + ) + + sftp_hook = SFTPHook(self.sftp_conn_id) + + if WILDCARD in self.source_object: + total_wildcards = self.source_object.count(WILDCARD) + if total_wildcards > 1: + raise AirflowException( + "Only one wildcard '*' is allowed in source_object parameter. " + "Found {} in {}.".format(total_wildcards, self.source_object) + ) + + prefix, delimiter = self.source_object.split(WILDCARD, 1) + objects = gcs_hook.list( + self.source_bucket, prefix=prefix, delimiter=delimiter + ) + + for source_object in objects: + destination_path = os.path.join(self.sftp_path, source_object) + self._copy_single_object( + gcs_hook, sftp_hook, source_object, destination_path + ) + + self.log.info( + "Done. Uploaded '%d' files to %s", len(objects), self.sftp_path + ) + else: + destination_path = os.path.join(self.sftp_path, self.source_object) + self._copy_single_object( + gcs_hook, sftp_hook, self.source_object, destination_path + ) + self.log.info( + "Done. Uploaded '%s' file to %s", self.source_object, destination_path + ) + + def _copy_single_object( + self, + gcs_hook: GoogleCloudStorageHook, + sftp_hook: SFTPHook, + source_object: str, + destination_path: str, + ) -> None: + """ + Helper function to copy single object. + """ + self.log.info( + "Executing copy of gs://%s/%s to %s", + self.source_bucket, + source_object, + destination_path, + ) + + dir_path = os.path.dirname(destination_path) + sftp_hook.create_directory(dir_path) + + with NamedTemporaryFile("w") as tmp: + gcs_hook.download( + bucket_name=self.source_bucket, + object_name=source_object, + filename=tmp.name, + ) + sftp_hook.store_file(destination_path, tmp.name) + + if self.move_object: + self.log.info( + "Executing delete of gs://%s/%s", self.source_bucket, source_object + ) + gcs_hook.delete(self.source_bucket, source_object) diff --git a/airflow/operators/sftp_to_gcs.py b/airflow/operators/sftp_to_gcs.py new file mode 100644 index 0000000000000..1381654490fc0 --- /dev/null +++ b/airflow/operators/sftp_to_gcs.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This module contains SFTP to Google Cloud Storage operator. +""" +from tempfile import NamedTemporaryFile +from typing import Optional + +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.gcp.hooks.gcs import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class SFTPToGoogleCloudStorageOperator(BaseOperator): + """ + Transfer files to Google Cloud Storage from SFTP server. + + :param sftp_path: The sftp remote path. This is the specified file path + for downloading the file from the SFTP server. + :type sftp_path: str + :param destination_path: Destination path within the specified bucket, it must be the full file path + to destination object on GCS, including GCS object (ex. `path/to/file.txt`) (templated) + :type destination_path: str + :param bucket: The bucket to upload to. (templated) + :type bucket: str + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param google_cloud_storage_conn_id: (Deprecated) The connection ID used to connect to Google Cloud + Platform. This parameter has been deprecated. You should pass the gcp_conn_id parameter instead. + :type google_cloud_storage_conn_id: str + :param mime_type: The mime-type string + :type mime_type: str + :param delegate_to: The account to impersonate, if any + :type delegate_to: str + :param gzip: Allows for file to be compressed and uploaded as gzip + :type gzip: bool + :param sftp_conn_id: The sftp connection id. The name or identifier for + establishing a connection to the SFTP server. + :type sftp_conn_id: str + """ + + template_fields = ("sftp_path", "dst", "bucket") + + @apply_defaults + def __init__( + self, + sftp_path: str, + destination_path: str, + bucket: str, + gcp_conn_id: str = "google_cloud_default", + mime_type: str = "application/octet-stream", + delegate_to: Optional[str] = None, + gzip: bool = False, + sftp_conn_id: str = "ssh_default", + *args, + **kwargs + ) -> None: + super().__init__(*args, **kwargs) + + self.sftp_path = sftp_path + self.destination_path = destination_path + self.bucket = bucket + self.gcp_conn_id = gcp_conn_id + self.mime_type = mime_type + self.delegate_to = delegate_to + self.gzip = gzip + self.sftp_conn_id = sftp_conn_id + + def execute(self, context): + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to + ) + + ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) + sftp_client = ssh_hook.get_conn().open_sftp() + + with NamedTemporaryFile("w") as f: + sftp_client.get(self.sftp_path, f.name) + + gcs_hook.upload( + bucket_name=self.bucket, + object_name=self.destination_path, + mime_type=self.mime_type, + filename=f.name, + gzip=self.gzip, + ) diff --git a/tests/operators/test_gcs_to_sftp.py b/tests/operators/test_gcs_to_sftp.py new file mode 100644 index 0000000000000..36691ece6a206 --- /dev/null +++ b/tests/operators/test_gcs_to_sftp.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import unittest + +from airflow.exceptions import AirflowException +from airflow.operators.gcs_to_sftp import GoogleCloudStorageToSFTPOperator +from tests.compat import mock + +TASK_ID = "test-gcs-to-sftp-operator" +GCP_CONN_ID = "GCP_CONN_ID" +SFTP_CONN_ID = "SFTP_CONN_ID" +DELEGATE_TO = "DELEGATE_TO" + +TEST_BUCKET = "test-bucket" +DELIMITER = ".csv" +PREFIX = "TEST" +SOURCE_OBJECT_WILDCARD_PREFIX = "*test_object" +SOURCE_OBJECT_WILDCARD_SUFFIX = "test_object*" +SOURCE_OBJECT_WILDCARD_MIDDLE = "test*object" +SOURCE_OBJECT_WILDCARD_FILENAME = "test_object*.txt" +SOURCE_OBJECT_NO_WILDCARD = "test_object.txt" +SOURCE_OBJECT_MULTIPLE_WILDCARDS = "csv/*/test_*.csv" + +SOURCE_FILES_LIST = [ + "test_object/file1.txt", + "test_object/file2.txt", + "test_object/file3.json", +] + +DESTINATION_SFTP = "sftp_path" + + +# pylint: disable=unused-argument +class TestGoogleCloudStorageToSFTPOperator(unittest.TestCase): + @mock.patch("airflow.operators.gcs_to_sftp.GoogleCloudStorageHook") + @mock.patch("airflow.operators.gcs_to_sftp.SFTPHook") + def test_execute_copy_single_file(self, sftp_hook, gcs_hook): + task = GoogleCloudStorageToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_NO_WILDCARD, + sftp_path=DESTINATION_SFTP, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + delegate_to=DELEGATE_TO, + ) + task.execute({}) + gcs_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO + ) + sftp_hook.assert_called_once_with(SFTP_CONN_ID) + + args, kwargs = gcs_hook.return_value.download.call_args + self.assertEqual(kwargs["bucket_name"], TEST_BUCKET) + self.assertEqual(kwargs["object_name"], SOURCE_OBJECT_NO_WILDCARD) + + args, kwargs = sftp_hook.return_value.store_file.call_args + self.assertEqual( + args[0], os.path.join(DESTINATION_SFTP, SOURCE_OBJECT_NO_WILDCARD) + ) + + gcs_hook.return_value.delete.assert_not_called() + + @mock.patch("airflow.operators.gcs_to_sftp.GoogleCloudStorageHook") + @mock.patch("airflow.operators.gcs_to_sftp.SFTPHook") + def test_execute_move_single_file(self, sftp_hook, gcs_hook): + task = GoogleCloudStorageToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_NO_WILDCARD, + sftp_path=DESTINATION_SFTP, + move_object=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + delegate_to=DELEGATE_TO, + ) + task.execute(None) + gcs_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO + ) + sftp_hook.assert_called_once_with(SFTP_CONN_ID) + + args, kwargs = gcs_hook.return_value.download.call_args + self.assertEqual(kwargs["bucket_name"], TEST_BUCKET) + self.assertEqual(kwargs["object_name"], SOURCE_OBJECT_NO_WILDCARD) + + args, kwargs = sftp_hook.return_value.store_file.call_args + self.assertEqual( + args[0], os.path.join(DESTINATION_SFTP, SOURCE_OBJECT_NO_WILDCARD) + ) + + gcs_hook.return_value.delete.assert_called_once_with( + TEST_BUCKET, SOURCE_OBJECT_NO_WILDCARD + ) + + @mock.patch("airflow.operators.gcs_to_sftp.GoogleCloudStorageHook") + @mock.patch("airflow.operators.gcs_to_sftp.SFTPHook") + def test_execute_copy_with_wildcard(self, sftp_hook, gcs_hook): + gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2] + operator = GoogleCloudStorageToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_WILDCARD_FILENAME, + sftp_path=DESTINATION_SFTP, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + delegate_to=DELEGATE_TO, + ) + operator.execute(None) + + gcs_hook.return_value.list.assert_called_with( + TEST_BUCKET, delimiter=".txt", prefix="test_object" + ) + + call_one, call_two = gcs_hook.return_value.download.call_args_list + self.assertEqual(call_one[1]["bucket_name"], TEST_BUCKET) + self.assertEqual(call_one[1]["object_name"], "test_object/file1.txt") + + self.assertEqual(call_two[1]["bucket_name"], TEST_BUCKET) + self.assertEqual(call_two[1]["object_name"], "test_object/file2.txt") + + @mock.patch("airflow.operators.gcs_to_sftp.GoogleCloudStorageHook") + @mock.patch("airflow.operators.gcs_to_sftp.SFTPHook") + def test_execute_move_with_wildcard(self, sftp_hook, gcs_hook): + gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2] + operator = GoogleCloudStorageToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_WILDCARD_FILENAME, + sftp_path=DESTINATION_SFTP, + move_object=True, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + delegate_to=DELEGATE_TO, + ) + operator.execute(None) + + gcs_hook.return_value.list.assert_called_with( + TEST_BUCKET, delimiter=".txt", prefix="test_object" + ) + + call_one, call_two = gcs_hook.return_value.delete.call_args_list + self.assertEqual(call_one[0], (TEST_BUCKET, "test_object/file1.txt")) + self.assertEqual(call_two[0], (TEST_BUCKET, "test_object/file2.txt")) + + @mock.patch("airflow.operators.gcs_to_sftp.GoogleCloudStorageHook") + @mock.patch("airflow.operators.gcs_to_sftp.SFTPHook") + def test_execute_more_than_one_wildcard_exception(self, sftp_hook, gcs_hook): + gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2] + operator = GoogleCloudStorageToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="file*csv*", + sftp_path=DESTINATION_SFTP, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + delegate_to=DELEGATE_TO, + ) + with self.assertRaises(AirflowException): + operator.execute(None) diff --git a/tests/operators/test_gcs_to_sftp_system.py b/tests/operators/test_gcs_to_sftp_system.py new file mode 100644 index 0000000000000..7605f3f305252 --- /dev/null +++ b/tests/operators/test_gcs_to_sftp_system.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""System tests for Google Cloud Build operators""" +import unittest + +from tests.gcp.utils.base_gcp_system_test_case import ( + OPERATORS_EXAMPLES_DAG_FOLDER, SKIP_TEST_WARNING, TestDagGcpSystem, +) +from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY +from tests.operators.test_gcs_to_sftp_system_helper import GcsToSFTPTestHelper + + +@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING) +class GcsToGcsExampleDagsSystemTest(TestDagGcpSystem): + """ + System tests for Google Cloud Storage to SFTP transfer operator + + It use a real service. + """ + + def __init__(self, method_name="runTest"): + super().__init__( + method_name, + dag_id="example_gcs_to_sftp", + dag_name="example_gcs_to_sftp.py", + example_dags_folder=OPERATORS_EXAMPLES_DAG_FOLDER, + gcp_key=GCP_GCS_KEY, + ) + self.helper = GcsToSFTPTestHelper() + + def setUp(self): + super().setUp() + self.gcp_authenticator.gcp_authenticate() + try: + self.helper.create_buckets() + finally: + self.gcp_authenticator.gcp_revoke_authentication() + + def test_run_example_dag(self): + self._run_dag() + + def tearDown(self): + self.gcp_authenticator.gcp_authenticate() + try: + self.helper.delete_buckets() + finally: + self.gcp_authenticator.gcp_revoke_authentication() + super().tearDown() diff --git a/tests/operators/test_gcs_to_sftp_system_helper.py b/tests/operators/test_gcs_to_sftp_system_helper.py new file mode 100644 index 0000000000000..6e176440d1efc --- /dev/null +++ b/tests/operators/test_gcs_to_sftp_system_helper.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Helpers to perform system tests for the Google Cloud Storage service. +""" +import argparse +import os +from itertools import product + +from airflow.example_dags.example_gcs_to_sftp import BUCKET_SRC, OBJECT_SRC_1, OBJECT_SRC_2 +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor +from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY, GcpAuthenticator + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + + +class GcsToSFTPTestHelper(LoggingCommandExecutor): + """ + Helper class to perform system tests for the Google Cloud Storage service. + """ + + def create_buckets(self): + """Create a bucket in Google Cloud Storage service with sample content.""" + + # 1. Create buckets + self.execute_cmd(["gsutil", "mb", "gs://{}".format(BUCKET_SRC)]) + + # 2. Prepare files + for bucket_src, object_source in product( + ( + BUCKET_SRC, + "{}/subdir-1".format(BUCKET_SRC), + "{}/subdir-2".format(BUCKET_SRC), + ), + (OBJECT_SRC_1, OBJECT_SRC_2), + ): + source_path = "gs://{}/{}".format(bucket_src, object_source) + self.execute_cmd( + [ + "bash", + "-c", + "cat /dev/urandom | head -c $((1 * 1024 * 1024)) | gsutil cp - {}".format( + source_path + ), + ] + ) + + def delete_buckets(self): + """Delete bucket in Google Cloud Storage service""" + self.execute_cmd(["gsutil", "rm", "gs://{}/**".format(BUCKET_SRC)]) + self.execute_cmd(["gsutil", "rb", "gs://{}".format(BUCKET_SRC)]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create bucket for system tests.") + parser.add_argument( + "--action", + dest="action", + required=True, + choices=("create-buckets", "delete-buckets", "before-tests", "after-tests"), + ) + action = parser.parse_args().action + + helper = GcsToSFTPTestHelper() + gcp_authenticator = GcpAuthenticator(GCP_GCS_KEY) + helper.log.info("Starting action: %s", action) + + gcp_authenticator.gcp_store_authentication() + try: + gcp_authenticator.gcp_authenticate() + if action == "before-tests": + pass + elif action == "after-tests": + pass + elif action == "create-buckets": + helper.create_buckets() + elif action == "delete-buckets": + helper.delete_buckets() + else: + raise Exception("Unknown action: {}".format(action)) + finally: + gcp_authenticator.gcp_restore_authentication() + + helper.log.info("Finishing action: %s", action) diff --git a/tests/operators/test_sftp_to_gcs.py b/tests/operators/test_sftp_to_gcs.py new file mode 100644 index 0000000000000..fa0beb3377dd8 --- /dev/null +++ b/tests/operators/test_sftp_to_gcs.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/operators/test_sftp_to_gcs_system_helper.py b/tests/operators/test_sftp_to_gcs_system_helper.py new file mode 100644 index 0000000000000..38f558fa38c39 --- /dev/null +++ b/tests/operators/test_sftp_to_gcs_system_helper.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Helpers to perform system tests for the Google Cloud Storage service. +""" +import argparse +import os +import shutil + +from airflow.example_dags.example_sftp_to_gcs import BUCKET_SRC, OBJECT_SRC_1, OBJECT_SRC_2 +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor +from tests.gcp.utils.gcp_authenticator import GCP_GCS_KEY, GcpAuthenticator + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +TMP_PATH = "/tmp" +TMP_DIR_FOR_TESTS = "tests_sftp_hook_dir" +TMP_FILE_FOR_TESTS = "test_file.txt" + +SFTP_CONNECTION_USER = "root" + + +class SFTPtoGcsTestHelper(LoggingCommandExecutor): + """ + Helper class to perform system tests for the Google Cloud Storage service. + """ + + files_and_dirs = [ + ("tezt_file_1.bin", os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS)), + ("tezt_file_2.bin", os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS)), + ("tezt_file_3.bin", os.path.join(TMP_PATH, TMP_DIR_FOR_TESTS)), + ] + + buckets = [BUCKET_SRC] + + def create_buckets(self): + """Create a bucket in Google Cloud Storage service with sample content.""" + + # 1. Create buckets + for bucket in self.buckets: + self.execute_cmd(["gsutil", "mb", "gs://{}".format(bucket)]) + + def create_temp_files(self): + for filename, dir_path in self.files_and_dirs: + self._create_temp_file(filename, dir_path) + + def delete_temp_files(self): + for filename, dir_path in self.files_and_dirs: + self._delete_temp_file(filename, dir_path) + + def delete_buckets(self): + """Delete bucket in Google Cloud Storage service""" + self.execute_cmd(["gsutil", "rm", "gs://{}/**".format(BUCKET_SRC)]) + self.execute_cmd(["gsutil", "rb", "gs://{}".format(BUCKET_SRC)]) + + @staticmethod + def _create_temp_file(filename, dir_path="/tmp"): + os.makedirs(dir_path, exist_ok=True) + + full_path = os.path.join(dir_path, filename) + with open(full_path, "wb") as f: + f.write(os.urandom(1 * 1024 * 1024)) + + @staticmethod + def _delete_temp_file(filename, dir_path="/tmp"): + full_path = os.path.join(dir_path, filename) + try: + os.remove(full_path) + except FileNotFoundError: + pass + if dir_path is not "/tmp": + shutil.rmtree(dir_path, ignore_errors=True) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create bucket for system tests.") + parser.add_argument( + "--action", + dest="action", + required=True, + choices=( + "create-buckets", + "delete-buckets", + "before-tests", + "after-tests", + "create-files", + "delete-files", + ), + ) + action = parser.parse_args().action + + helper = SFTPtoGcsTestHelper() + gcp_authenticator = GcpAuthenticator(GCP_GCS_KEY) + helper.log.info("Starting action: %s", action) + + gcp_authenticator.gcp_store_authentication() + try: + gcp_authenticator.gcp_authenticate() + if action == "before-tests": + helper.create_buckets() + helper.create_temp_files() + elif action == "after-tests": + helper.delete_buckets() + helper.delete_temp_files() + elif action == "create-buckets": + helper.create_buckets() + elif action == "delete-buckets": + helper.delete_buckets() + elif action == "create-files": + helper.create_temp_files() + elif action == "delete-files": + helper.delete_temp_files() + else: + raise Exception("Unknown action: {}".format(action)) + finally: + gcp_authenticator.gcp_restore_authentication() + + helper.log.info("Finishing action: %s", action)