diff --git a/sdks/python/apache_beam/io/filesystemio.py b/sdks/python/apache_beam/io/filesystemio.py index dca341ddea5e..8d21e99fe555 100644 --- a/sdks/python/apache_beam/io/filesystemio.py +++ b/sdks/python/apache_beam/io/filesystemio.py @@ -80,16 +80,21 @@ def finish(self): class DownloaderStream(io.RawIOBase): """Provides a stream interface for Downloader objects.""" - def __init__(self, downloader, mode='rb'): + def __init__(self, + downloader, + read_buffer_size=io.DEFAULT_BUFFER_SIZE, + mode='rb'): """Initializes the stream. Args: downloader: (Downloader) Filesystem dependent implementation. + read_buffer_size: (int) Buffer size to use during read operations. mode: (string) Python mode attribute for this stream. """ self._downloader = downloader self.mode = mode self._position = 0 + self._reader_buffer_size = read_buffer_size def readinto(self, b): """Read up to len(b) bytes into b. @@ -157,6 +162,16 @@ def seekable(self): def readable(self): return True + def readall(self): + """Read until EOF, using multiple read() call.""" + res = [] + while True: + data = self.read(self._reader_buffer_size) + if not data: + break + res.append(data) + return b''.join(res) + class UploaderStream(io.RawIOBase): """Provides a stream interface for Uploader objects.""" diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 33a94fc81932..5586eb48fe8e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -174,7 +174,7 @@ def open(self, if mode == 'r' or mode == 'rb': downloader = GcsDownloader(self.client, filename, buffer_size=read_buffer_size) - return io.BufferedReader(DownloaderStream(downloader, mode=mode), + return io.BufferedReader(DownloaderStream(downloader, read_buffer_size=read_buffer_size, mode=mode), buffer_size=read_buffer_size) elif mode == 'w' or mode == 'wb': uploader = GcsUploader(self.client, filename, mime_type)