Skip to content

Commit

Permalink
Add FTPSFileTransmitOperator (#28318)
Browse files Browse the repository at this point in the history
  • Loading branch information
RachitSharma2001 committed Dec 28, 2022
1 parent 0fae6a0 commit 0e349d8
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 34 deletions.
19 changes: 18 additions & 1 deletion airflow/providers/ftp/operators/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.compat.functools import cached_property
from airflow.models import BaseOperator
from airflow.providers.ftp.hooks.ftp import FTPHook
from airflow.providers.ftp.hooks.ftp import FTPHook, FTPSHook


class FTPOperation:
Expand Down Expand Up @@ -130,3 +130,20 @@ def execute(self, context: Any) -> str | list[str] | None:
self.log.info("Starting to transfer file %s", file_msg)
self.hook.store_file(remote_filepath, local_filepath)
return self.local_filepath


class FTPSFileTransmitOperator(FTPFileTransmitOperator):
"""
FTPSFileTransmitOperator for transferring files from remote host to local or vice a versa.
This operator uses an FTPSHook to open ftps transport channel that serve as basis
for file transfer.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:FTPSFileTransmitOperator`
"""

@cached_property
def hook(self) -> FTPSHook:
"""Create and return an FTPSHook."""
return FTPSHook(ftp_conn_id=self.ftp_conn_id)
32 changes: 31 additions & 1 deletion docs/apache-airflow-providers-ftp/operators/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ FTPFileTransmitOperator


Use the FTPFileTransmitOperator to get or
pull files to/from an FTP server.
put files to/from an FTP server.

Using the Operator
^^^^^^^^^^^^^^^^^^
Expand All @@ -46,3 +46,33 @@ The below example shows how to use the FTPFileTransmitOperator to pull a file fr
:dedent: 4
:start-after: [START howto_operator_ftp_get]
:end-before: [END howto_operator_ftp_get]

.. _howto/operator:FTPSFileTransmitOperator:

FTPSFileTransmitOperator
=========================


Use the FTPSFileTransmitOperator to get or
put files to/from an FTPS server.

Using the Operator
^^^^^^^^^^^^^^^^^^

For parameter definition take a look at :class:`~airflow.providers.ftp.operators.FTPSFileTransmitOperator`.

The below example shows how to use the FTPSFileTransmitOperator to transfer a locally stored file to a remote FTPS Server:

.. exampleinclude:: /../../tests/system/providers/ftp/example_ftp.py
:language: python
:dedent: 4
:start-after: [START howto_operator_ftps_put]
:end-before: [END howto_operator_ftps_put]

The below example shows how to use the FTPSFileTransmitOperator to pull a file from a remote FTPS Server.

.. exampleinclude:: /../../tests/system/providers/ftp/example_ftp.py
:language: python
:dedent: 4
:start-after: [START howto_operator_ftps_get]
:end-before: [END howto_operator_ftps_get]
150 changes: 122 additions & 28 deletions tests/providers/ftp/operators/test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
# under the License.
from __future__ import annotations

import os
from unittest import mock

import pytest

from airflow.models import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.operators.ftp import (
FTPFileTransmitOperator,
FTPOperation,
FTPSFileTransmitOperator,
)
from airflow.utils.timezone import datetime

DEFAULT_DATE = datetime(2017, 1, 1)
Expand All @@ -32,21 +35,16 @@

class TestFTPFileTransmitOperator:
def setup_method(self):
self.test_local_dir = "/tmp"
self.test_local_dir_int = "/tmp/interdir"
self.test_local_dir = "ftptmp"
self.test_remote_dir = "/ftphome"
self.test_remote_dir_int = "/ftphome/interdir"
self.test_local_filename = "test_local_file"
self.test_remote_filename = "test_remote_file"
self.test_local_filepath = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath = f"{self.test_remote_dir}/{self.test_remote_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir_int}/{self.test_local_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath_int_dir = f"{self.test_remote_dir_int}/{self.test_remote_filename}"

def teardown_method(self):
if os.path.exists(self.test_local_dir_int):
os.rmdir(self.test_local_dir_int)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.create_directory")
def test_file_transfer_put(self, mock_create_dir, mock_put):
Expand All @@ -58,9 +56,8 @@ def test_file_transfer_put(self, mock_create_dir, mock_put):
operation=FTPOperation.PUT,
)
ftp_op.execute(None)
assert mock_put.call_count == 1
assert not mock_create_dir.called
mock_put.assert_called_with(self.test_remote_filepath, self.test_local_filepath)
mock_put.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.create_directory")
Expand All @@ -74,9 +71,8 @@ def test_file_transfer_with_intermediate_dir_put(self, mock_create_dir, mock_put
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert mock_put.call_count == 1
mock_create_dir.assert_called_with(self.test_remote_dir_int)
mock_put.assert_called_with(self.test_remote_filepath_int_dir, self.test_local_filepath)
mock_put.assert_called_once_with(self.test_remote_filepath_int_dir, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_get(self, mock_get):
Expand All @@ -88,23 +84,23 @@ def test_file_transfer_get(self, mock_get):
operation=FTPOperation.GET,
)
ftp_op.execute(None)
assert mock_get.call_count == 1
mock_get.assert_called_with(self.test_remote_filepath, self.test_local_filepath)
mock_get.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_with_intermediate_dir_get(self, mock_get):
def test_file_transfer_with_intermediate_dir_get(self, mock_get, tmp_path):
ftp_op = FTPFileTransmitOperator(
task_id="test_ftp_get_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath_int_dir,
local_filepath=str(tmp_path / self.test_local_filepath_int_dir),
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert mock_get.call_count == 1
assert os.path.exists(self.test_local_dir_int)
mock_get.assert_called_with(self.test_remote_filepath, self.test_local_filepath_int_dir)
assert len(list(tmp_path.iterdir())) == 1
mock_get.assert_called_once_with(
self.test_remote_filepath, str(tmp_path / self.test_local_filepath_int_dir)
)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_multiple_paths_get(self, mock_get):
Expand All @@ -119,10 +115,8 @@ def test_multiple_paths_get(self, mock_get):
)
ftp_op.execute(None)
assert mock_get.call_count == 2
args0, _ = mock_get.call_args_list[0]
args1, _ = mock_get.call_args_list[1]
assert args0 == (remote_filepath[0], local_filepath[0])
assert args1 == (remote_filepath[1], local_filepath[1])
for count, (args, _) in enumerate(mock_get.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
def test_multiple_paths_put(self, mock_put):
Expand All @@ -137,10 +131,8 @@ def test_multiple_paths_put(self, mock_put):
)
ftp_op.execute(None)
assert mock_put.call_count == 2
args0, _ = mock_put.call_args_list[0]
args1, _ = mock_put.call_args_list[1]
assert args0 == (remote_filepath[0], local_filepath[0])
assert args1 == (remote_filepath[1], local_filepath[1])
for count, (args, _) in enumerate(mock_put.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.store_file")
def test_arg_checking(self, mock_put):
Expand Down Expand Up @@ -184,3 +176,105 @@ def test_unequal_local_remote_file_paths(self):
local_filepath=["/tmp/test1", "/tmp/test2"],
remote_filepath="/tmp/test1",
)


class TestFTPSFileTransmitOperator:
def setup_method(self):
self.test_local_dir = "ftpstmp"
self.test_remote_dir = "/ftpshome"
self.test_remote_dir_int = "/ftpshome/interdir"
self.test_local_filename = "test_local_file"
self.test_remote_filename = "test_remote_file"
self.test_local_filepath = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath = f"{self.test_remote_dir}/{self.test_remote_filename}"
self.test_local_filepath_int_dir = f"{self.test_local_dir}/{self.test_local_filename}"
self.test_remote_filepath_int_dir = f"{self.test_remote_dir_int}/{self.test_remote_filename}"

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.create_directory")
def test_file_transfer_put(self, mock_create_dir, mock_put):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_put",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.PUT,
)
ftps_op.execute(None)
assert not mock_create_dir.called
mock_put.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.create_directory")
def test_file_transfer_with_intermediate_dir_put(self, mock_create_dir, mock_put):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_put_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath_int_dir,
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
)
ftps_op.execute(None)
mock_create_dir.assert_called_with(self.test_remote_dir_int)
mock_put.assert_called_once_with(self.test_remote_filepath_int_dir, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.retrieve_file")
def test_file_transfer_get(self, mock_get):
ftps_op = FTPSFileTransmitOperator(
task_id="test_ftps_get",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=self.test_local_filepath,
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
)
ftps_op.execute(None)
mock_get.assert_called_once_with(self.test_remote_filepath, self.test_local_filepath)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPHook.retrieve_file")
def test_file_transfer_with_intermediate_dir_get(self, mock_get, tmp_path):
ftp_op = FTPFileTransmitOperator(
task_id="test_ftp_get_imm_dirs",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=str(tmp_path / self.test_local_filepath_int_dir),
remote_filepath=self.test_remote_filepath,
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
ftp_op.execute(None)
assert len(list(tmp_path.iterdir())) == 1
mock_get.assert_called_once_with(
self.test_remote_filepath, str(tmp_path / self.test_local_filepath_int_dir)
)

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.retrieve_file")
def test_multiple_paths_get(self, mock_get):
local_filepath = ["/tmp/ltest1", "/tmp/ltest2"]
remote_filepath = ["/tmp/rtest1", "/tmp/rtest2"]
ftps_op = FTPSFileTransmitOperator(
task_id="test_multiple_paths_get",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=local_filepath,
remote_filepath=remote_filepath,
operation=FTPOperation.GET,
)
ftps_op.execute(None)
assert mock_get.call_count == 2
for count, (args, _) in enumerate(mock_get.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])

@mock.patch("airflow.providers.ftp.operators.ftp.FTPSHook.store_file")
def test_multiple_paths_put(self, mock_put):
local_filepath = ["/tmp/ltest1", "/tmp/ltest2"]
remote_filepath = ["/tmp/rtest1", "/tmp/rtest2"]
ftps_op = FTPSFileTransmitOperator(
task_id="test_multiple_paths_put",
ftp_conn_id=DEFAULT_CONN_ID,
local_filepath=local_filepath,
remote_filepath=remote_filepath,
operation=FTPOperation.PUT,
)
ftps_op.execute(None)
assert mock_put.call_count == 2
for count, (args, _) in enumerate(mock_put.call_args_list):
assert args == (remote_filepath[count], local_filepath[count])
35 changes: 31 additions & 4 deletions tests/system/providers/ftp/example_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the FTPFileTransmitOperator.
This is an example dag for using the FTPFileTransmitOperator and FTPSFileTransmitOperator.
"""
from __future__ import annotations

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.ftp.operators.ftp import FTPFileTransmitOperator, FTPOperation
from airflow.providers.ftp.operators.ftp import (
FTPFileTransmitOperator,
FTPOperation,
FTPSFileTransmitOperator,
)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_ftp_put_get"
DAG_ID = "example_ftp_ftps_put_get"

with DAG(
DAG_ID,
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "Ftp", "FtpFileTransmit"],
tags=["example", "Ftp", "FtpFileTransmit", "Ftps", "FtpsFileTransmit"],
) as dag:
# [START howto_operator_ftp_put]
ftp_put = FTPFileTransmitOperator(
Expand All @@ -57,7 +61,30 @@
)
# [END howto_operator_ftp_get]

# [START howto_operator_ftps_put]
ftps_put = FTPSFileTransmitOperator(
task_id="test_ftps_put",
ftp_conn_id="ftps_default",
local_filepath="/tmp/filepath",
remote_filepath="/remote_tmp/filepath",
operation=FTPOperation.PUT,
create_intermediate_dirs=True,
)
# [END howto_operator_ftps_put]

# [START howto_operator_ftps_get]
ftps_get = FTPSFileTransmitOperator(
task_id="test_ftps_get",
ftp_conn_id="ftps_default",
local_filepath="/tmp/filepath",
remote_filepath="/remote_tmp/filepath",
operation=FTPOperation.GET,
create_intermediate_dirs=True,
)
# [END howto_operator_ftps_get]

ftp_put >> ftp_get
ftps_put >> ftps_get

from tests.system.utils.watcher import watcher

Expand Down

0 comments on commit 0e349d8

Please sign in to comment.