Skip to content

Commit

Permalink
Add blocksize arg for ftp hook (#24860)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin George <“kevingeorge232@gmail.com”>
  • Loading branch information
kevgeo and Kevin George committed Jul 7, 2022
1 parent 2f29bfe commit 64412ee
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
29 changes: 18 additions & 11 deletions airflow/providers/ftp/hooks/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datetime
import ftplib
import os.path
from typing import Any, List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple

from airflow.hooks.base import BaseHook

Expand Down Expand Up @@ -115,7 +115,13 @@ def delete_directory(self, path: str) -> None:
conn = self.get_conn()
conn.rmd(path)

def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=None):
def retrieve_file(
self,
remote_full_path: str,
local_full_path_or_buffer: Any,
callback: Optional[Callable] = None,
block_size: int = 8192,
) -> None:
"""
Transfers the remote file to a local location.
Expand All @@ -132,6 +138,8 @@ def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=No
that writing to a file or buffer will need to be handled inside the
callback.
[default: output_handle.write()]
:param block_size: file is transferred in chunks of default size 8192
or as set by user
.. code-block:: python
Expand Down Expand Up @@ -164,31 +172,30 @@ def write_to_file_with_progress(data):
"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, str)

# without a callback, default to writing to a user-provided file or
# file-like buffer
if not callback:
if is_path:

output_handle = open(local_full_path_or_buffer, 'wb')
else:
output_handle = local_full_path_or_buffer

callback = output_handle.write
else:
output_handle = None

remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
self.log.info('Retrieving file from FTP: %s', remote_full_path)
conn.retrbinary(f'RETR {remote_file_name}', callback)
conn.retrbinary(f'RETR {remote_file_name}', callback, block_size)
self.log.info('Finished retrieving file from FTP: %s', remote_full_path)

if is_path and output_handle:
output_handle.close()

def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> None:
def store_file(
self, remote_full_path: str, local_full_path_or_buffer: Any, block_size: int = 8192
) -> None:
"""
Transfers a local file to the remote location.
Expand All @@ -199,19 +206,19 @@ def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> N
:param remote_full_path: full path to the remote file
:param local_full_path_or_buffer: full path to the local file or a
file-like buffer
:param block_size: file is transferred in chunks of default size 8192
or as set by user
"""
conn = self.get_conn()

is_path = isinstance(local_full_path_or_buffer, str)

if is_path:

input_handle = open(local_full_path_or_buffer, 'rb')
else:
input_handle = local_full_path_or_buffer
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
conn.storbinary(f'STOR {remote_file_name}', input_handle)
conn.storbinary(f'STOR {remote_file_name}', input_handle, block_size)

if is_path:
input_handle.close()
Expand Down
4 changes: 2 additions & 2 deletions tests/providers/ftp/hooks/test_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ def test_retrieve_file(self):
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write, 8192)

def test_retrieve_file_with_callback(self):
func = mock.Mock()
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer, callback=func)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', func)
self.conn_mock.retrbinary.assert_called_once_with('RETR path', func, 8192)

def test_connection_success(self):
with fh.FTPHook() as ftp_hook:
Expand Down

0 comments on commit 64412ee

Please sign in to comment.