Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion sdks/python/apache_beam/io/filesystemio.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,21 @@ def finish(self):
class DownloaderStream(io.RawIOBase):
"""Provides a stream interface for Downloader objects."""

def __init__(self, downloader, mode='rb'):
def __init__(self,
downloader,
read_buffer_size=io.DEFAULT_BUFFER_SIZE,
mode='rb'):
"""Initializes the stream.

Args:
downloader: (Downloader) Filesystem dependent implementation.
read_buffer_size: (int) Buffer size to use during read operations.
mode: (string) Python mode attribute for this stream.
"""
self._downloader = downloader
self.mode = mode
self._position = 0
self._reader_buffer_size = read_buffer_size

def readinto(self, b):
"""Read up to len(b) bytes into b.
Expand Down Expand Up @@ -157,6 +162,16 @@ def seekable(self):
def readable(self):
return True

def readall(self):
"""Read until EOF, using multiple read() call."""
res = []
while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function used ?

Prob. remove if unused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually seems like you are overriding the function here: https://docs.python.org/3/library/io.html#io.IOBase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, still have a question.

Does Beam call readlll() function anywhere ? I couldn't find a usage. Beam textio for example, invokes read() not readall().
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L272

If it does, I'm not sure what will prevent us from reading a huge amount of data into memory and running into OOMs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only found this usage in ReadableFile (relatively new) where we don't specify the size:

def open(self, mime_type='text/plain'):
return filesystems.FileSystems.open(self.metadata.path)
def read(self):
return self.open().read()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I think ReadableFile is intended for small files. But probably we should add a readall() method there as well and update read() to take a buffer (not in this PR).

cc: @pabloem

data = self.read(self._reader_buffer_size)
if not data:
break
res.append(data)
return b''.join(res)


class UploaderStream(io.RawIOBase):
"""Provides a stream interface for Uploader objects."""
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def open(self,
if mode == 'r' or mode == 'rb':
downloader = GcsDownloader(self.client, filename,
buffer_size=read_buffer_size)
return io.BufferedReader(DownloaderStream(downloader, mode=mode),
return io.BufferedReader(DownloaderStream(downloader, read_buffer_size=read_buffer_size, mode=mode),
buffer_size=read_buffer_size)
elif mode == 'w' or mode == 'wb':
uploader = GcsUploader(self.client, filename, mime_type)
Expand Down