Skip to content

Commit

Permalink
[AIRFLOW-6125] [AIP-21] Rename S3 operator and SFTP operator (#7112)
Browse files Browse the repository at this point in the history
PR contains changes regarding AIP-21 (renaming GCP operators and hooks):
* renamed GCP modules
* fixed tests
  • Loading branch information
michalslowikowski00 authored and turbaszek committed Jan 9, 2020
1 parent 70af358 commit 5b6772c
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 24 deletions.
4 changes: 2 additions & 2 deletions airflow/contrib/operators/s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.utils.decorators import apply_defaults


class S3ToGoogleCloudStorageOperator(S3ListOperator):
class S3ToGCSOperator(S3ListOperator):
"""
Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage
destination path.
Expand Down Expand Up @@ -74,7 +74,7 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
.. code-block:: python
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
s3_to_gcs_op = S3ToGCSOperator(
task_id='s3_to_gcs_example',
bucket='my-s3-bucket',
prefix='data/customers-201804',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os

from airflow import models
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGoogleCloudStorageOperator
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}
Expand All @@ -43,15 +43,15 @@
"example_sftp_to_gcs", default_args=default_args, schedule_interval=None
) as dag:
# [START howto_operator_sftp_to_gcs_copy_single_file]
copy_file_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator(
copy_file_from_sftp_to_gcs = SFTPToGCSOperator(
task_id="file-copy-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1),
destination_bucket=BUCKET_SRC,
)
# [END howto_operator_sftp_to_gcs_copy_single_file]

# [START howto_operator_sftp_to_gcs_move_single_file_destination]
move_file_from_sftp_to_gcs_destination = SFTPToGoogleCloudStorageOperator(
move_file_from_sftp_to_gcs_destination = SFTPToGCSOperator(
task_id="file-move-sftp-to-gcs-destination",
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2),
destination_bucket=BUCKET_SRC,
Expand All @@ -61,15 +61,15 @@
# [END howto_operator_sftp_to_gcs_move_single_file_destination]

# [START howto_operator_sftp_to_gcs_copy_directory]
copy_directory_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator(
copy_directory_from_sftp_to_gcs = SFTPToGCSOperator(
task_id="dir-copy-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"),
destination_bucket=BUCKET_SRC,
)
# [END howto_operator_sftp_to_gcs_copy_directory]

# [START howto_operator_sftp_to_gcs_move_specific_files]
move_specific_files_from_gcs_to_sftp = SFTPToGoogleCloudStorageOperator(
move_specific_files_from_gcs_to_sftp = SFTPToGCSOperator(
task_id="dir-move-specific-files-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"),
destination_bucket=BUCKET_SRC,
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/sftp_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
WILDCARD = "*"


class SFTPToGoogleCloudStorageOperator(BaseOperator):
class SFTPToGCSOperator(BaseOperator):
"""
Transfer files to Google Cloud Storage from SFTP server.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SFTPToGoogleCloudStorageOperator`
:ref:`howto/operator:SFTPToGCSOperator`
:param source_path: The sftp remote path. This is the specified file path
for downloading the single file or multiple files from the SFTP server.
Expand Down
6 changes: 3 additions & 3 deletions docs/howto/operator/gcp/sftp_to_gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ Prerequisite Tasks

.. include:: _partials/prerequisite_tasks.rst

.. _howto/operator:SFTPToGoogleCloudStorageOperator:
.. _howto/operator:SFTPToGCSOperator:

Operator
^^^^^^^^

Transfer files between SFTP and Google Storage is performed with the
:class:`~airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator` operator.
:class:`~airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGCSOperator` operator.

Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator`
:template-fields:`airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGCSOperator`
to define values dynamically.

Copying single files
Expand Down
10 changes: 5 additions & 5 deletions tests/contrib/operators/test_s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import mock

from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGCSOperator

TASK_ID = 'test-s3-gcs-operator'
S3_BUCKET = 'test-bucket'
Expand All @@ -35,9 +35,9 @@

class TestS3ToGoogleCloudStorageOperator(unittest.TestCase):
def test_init(self):
"""Test S3ToGoogleCloudStorageOperator instance is properly initialized."""
"""Test S3ToGCSOperator instance is properly initialized."""

operator = S3ToGoogleCloudStorageOperator(
operator = S3ToGCSOperator(
task_id=TASK_ID,
bucket=S3_BUCKET,
prefix=S3_PREFIX,
Expand All @@ -59,7 +59,7 @@ def test_init(self):
def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
"""Test the execute function when the run is successful."""

operator = S3ToGoogleCloudStorageOperator(
operator = S3ToGCSOperator(
task_id=TASK_ID,
bucket=S3_BUCKET,
prefix=S3_PREFIX,
Expand Down Expand Up @@ -94,7 +94,7 @@ def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
"""Test the execute function when the run is successful."""

operator = S3ToGoogleCloudStorageOperator(
operator = S3ToGCSOperator(
task_id=TASK_ID,
bucket=S3_BUCKET,
prefix=S3_PREFIX,
Expand Down
14 changes: 7 additions & 7 deletions tests/providers/google/cloud/operators/test_sftp_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import mock

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGoogleCloudStorageOperator
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGCSOperator

TASK_ID = "test-gcs-to-sftp-operator"
GCP_CONN_ID = "GCP_CONN_ID"
Expand Down Expand Up @@ -52,11 +52,11 @@


# pylint: disable=unused-argument
class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase):
class TestSFTPToGCSOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_copy_single_file(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator(
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -88,7 +88,7 @@ def test_execute_copy_single_file(self, sftp_hook, gcs_hook):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_move_single_file(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator(
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -128,7 +128,7 @@ def test_execute_copy_with_wildcard(self, sftp_hook, gcs_hook):
[],
]

task = SFTPToGoogleCloudStorageOperator(
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_WILDCARD_FILENAME,
destination_bucket=TEST_BUCKET,
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_execute_move_with_wildcard(self, sftp_hook, gcs_hook):
]

gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2]
task = SFTPToGoogleCloudStorageOperator(
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_WILDCARD_FILENAME,
destination_bucket=TEST_BUCKET,
Expand All @@ -200,7 +200,7 @@ def test_execute_move_with_wildcard(self, sftp_hook, gcs_hook):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_more_than_one_wildcard_exception(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator(
task = SFTPToGCSOperator(
task_id=TASK_ID,
source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS,
destination_bucket=TEST_BUCKET,
Expand Down

0 comments on commit 5b6772c

Please sign in to comment.