[BEAM-3099] Split out BufferedReader and BufferedWriter from gcsio.#4471
[BEAM-3099] Split out BufferedReader and BufferedWriter from gcsio.#4471chamikaramj merged 1 commit intoapache:masterfrom
Conversation
| self.child_conn = child_conn | ||
| self.conn = parent_conn | ||
| # TODO: document, rename method?, rename child_conn maybe | ||
| def start(self, child_conn): |
There was a problem hiding this comment.
See comment in filesystemio.py about potentially passing a stream (PipeStream) object here instead.
There was a problem hiding this comment.
I've changed Uploader's interface so that pipe usage is now an implementation detail of GcsUploader.
|
|
||
| @abc.abstractmethod | ||
| def start(self, download_stream, buffer_size): | ||
| """Initialize downloader. |
There was a problem hiding this comment.
Can you detail that this needs to be called before get_range calls?
See also the comments below regarding whether download_stream should be completely managed by the Downloader as opposed to currently, where it is owned by the BufferedReader. In such a case, we may not need this start method since then the download_stream would be an internal detail we can deal with in the particular constructor.
There was a problem hiding this comment.
Thanks. I've removed the start() method entirely.
| Args: | ||
| download_stream: (cStringIO.StringIO) A buffer where downloaded data is | ||
| streamed to. | ||
| buffer_size: Maximum range size for get_range calls. |
There was a problem hiding this comment.
It looks like the buffer_size argument here is mostly an internal implementation detail of the specific downloader so maybe we don't need this argument? It may be helpful for us to expose the maximum chunk size allowable for a single call to get_range, which we use to upper-bound the buffer_size parameter of BufferedReader.
There was a problem hiding this comment.
Added max_range_size property.
There was a problem hiding this comment.
Actually, this was removed. You can pass buffer_size= to io.BufferedReader/Writer.
See examples in gcsio.py and the unit tests.
| # TODO: Consider using cStringIO instead of buffers and data_lists when reading | ||
| # and writing. | ||
| class BufferedWriter(object): | ||
| """A class for writing files from stateless services. |
There was a problem hiding this comment.
"writing files to"?
(actually, what does "stateless" here mean?)
There was a problem hiding this comment.
Stateless refers to the file API: there is no long-lived file handle, the filesystem server doesn't store our current file position, etc.
| return self.position | ||
|
|
||
| def seek(self, offset, whence=os.SEEK_SET): | ||
| # The apitools.base.py.transfer.Upload class insists on seeking to the end |
There was a problem hiding this comment.
Now this is factored out, we can note in this comment that we do this for the sake of the GCS implementation.
| self.child_conn = child_conn | ||
| self.conn = parent_conn | ||
|
|
||
| self.uploader.start(child_conn) |
There was a problem hiding this comment.
Can we wrap this in a PipeStream here (and modify the Uploader API accordingly)?
There was a problem hiding this comment.
Uploader API has been changed so PipeStream is now an implementation detail of GcsUploader.
| return next(self) | ||
|
|
||
| def next(self): | ||
| """Read one line delimited by '\\n' from the file. |
There was a problem hiding this comment.
Remove extra newline.
| return self | ||
|
|
||
| def __next__(self): | ||
| """Read one line delimited by '\\n' from the file. |
There was a problem hiding this comment.
Remove extra newline.
| self.downloader.get_range(start, end) | ||
| value = self.download_stream.getvalue() | ||
| # Clear the cStringIO object after we've read its contents. | ||
| self.download_stream.truncate(0) |
There was a problem hiding this comment.
It looks like the reason we have a stream at all is to satisfy the particular API of the apitools client we use for GCS. Would a cleaner API be to have the downloader directly return the bytes from get_range()? (so that the download_stream would be managed by the downloader?)
See above for how we could get rid of Downloader.start(download_stream, ...) with such an approach.
There was a problem hiding this comment.
get_range now returns a string.
|
PTAL. Thanks |
| Mimics behavior of the readline() method on standard file objects. | ||
| def get_range(self, start, end): | ||
| self.download_stream.truncate(0) | ||
| self._downloader.GetRange(start, end) |
There was a problem hiding this comment.
If we make the end index non-inclusive (see comment on filesystemio.py), we need to use end - 1 here. The reason the apitools library uses inclusive indices is because the HTTP range header uses inclusive indices (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests), but that doesn't seem like a great reason for our interface to be inclusive too.
| return 0 | ||
|
|
||
| start = self._position | ||
| end = min(self._position + len(b) - 1, self._downloader.size - 1) |
There was a problem hiding this comment.
See comment above about possibly making this range end non-inclusive.
| return self._size | ||
|
|
||
| Mimics behavior of the readline() method on standard file objects. | ||
| def get_range(self, start, end): |
There was a problem hiding this comment.
See comment in filesystemio.py about making the range half-open.
| self._get_request.generation = metadata.generation | ||
|
|
||
| # Initialize read buffer state. | ||
| self.download_stream = cStringIO.StringIO() |
There was a problem hiding this comment.
Can we add an underscore / make this private too?
|
|
||
| @abc.abstractproperty | ||
| def last_error(self): | ||
| """Last error encountered for this instance.""" |
There was a problem hiding this comment.
Can you describe usage of this property? When is it set and when would it be useful to query?
There was a problem hiding this comment.
Do you mean expand the docstring, or as a reply?
There was a problem hiding this comment.
In the docstring I mean, for future filesystem implementors.
| return self._read_inner(size=size, readline=False) | ||
| return self._client.objects.Get(get_request) | ||
|
|
||
| def readline(self, size=-1): |
There was a problem hiding this comment.
There may a subtle behavior change now that we remove this custom readline() code. The standard library code (_pyio.py) may do some \r\n -> \n translation depending on the read mode and platform, which we don't want. Can you verify that this won't be the case, even on Windows?
CC: @chamikaramj
There was a problem hiding this comment.
TextIOWrapper is not used, thus no translation is performed and readline() uses b"\n" as a line terminator.
| def finish(self): | ||
| self._conn.close() | ||
| # TODO(udim): Add timeout=DEFAULT_HTTP_TIMEOUT_SECONDS * 2 and check | ||
| # isAlive. |
There was a problem hiding this comment.
Is the TODO content just an optimization, or is this a correctness issue?
There was a problem hiding this comment.
Correctness. The way it's called, join() may block forever. It currently works but there might be flows in which the thread never exists. Adding a timeout is important, but it might introduce a bug so I added this TODO instead.
There was a problem hiding this comment.
Are you worried about the network hanging? That may already be mitigated by using timeout=DEFAULT_HTTP_TIMEOUT_SECONDS in the httplib2.Http client.
There was a problem hiding this comment.
It's just incorrect to wait forever without a timeout or some mechanism to interrupt the wait.
|
|
||
| @abc.abstractmethod | ||
| def get_range(self, start, end): | ||
| """Retrieve a given byte range from this download, inclusive. |
There was a problem hiding this comment.
Can we make the range half-open, i.e. [start, end)? We used inclusive indices because we used the apitools library for GCS, and the reason the apitools library uses inclusive indices is because the HTTP range header uses inclusive indices (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests), but that doesn't seem like a great reason for our interface to be inclusive too.
| return self.position | ||
|
|
||
| def seek(self, offset, whence=os.SEEK_SET): | ||
| # The gcsio.Uploader class insists on seeking to the end of a stream to |
There was a problem hiding this comment.
"The apitools library used by the gcsio.Uploader class"
| return self.position | ||
|
|
||
| def seek(self, offset, whence=os.SEEK_SET): | ||
| # The apitools.base.py.transfer.Upload class insists on seeking to the end |
There was a problem hiding this comment.
udim wrote:
Done.
Done.
| Args: | ||
| download_stream: (cStringIO.StringIO) A buffer where downloaded data is | ||
| streamed to. | ||
| buffer_size: Maximum range size for get_range calls. |
There was a problem hiding this comment.
udim wrote:
Actually, this was removed. You can pass buffer_size= to io.BufferedReader/Writer.
See examples in gcsio.py and the unit tests.
Done.
| self.child_conn = child_conn | ||
| self.conn = parent_conn | ||
| # TODO: document, rename method?, rename child_conn maybe | ||
| def start(self, child_conn): |
There was a problem hiding this comment.
udim wrote:
I've changed Uploader's interface so that pipe usage is now an implementation detail of GcsUploader.
Done.
| return self | ||
|
|
||
| def __next__(self): | ||
| """Read one line delimited by '\\n' from the file. |
There was a problem hiding this comment.
udim wrote:
done
Done.
| self.downloader.get_range(start, end) | ||
| value = self.download_stream.getvalue() | ||
| # Clear the cStringIO object after we've read its contents. | ||
| self.download_stream.truncate(0) |
There was a problem hiding this comment.
udim wrote:
get_range now returns a string.
Done.
| self.child_conn = child_conn | ||
| self.conn = parent_conn | ||
|
|
||
| self.uploader.start(child_conn) |
There was a problem hiding this comment.
udim wrote:
UploaderAPI has been changed soPipeStreamis now an implementation detail ofGcsUploader.
Done.
| return next(self) | ||
|
|
||
| def next(self): | ||
| """Read one line delimited by '\\n' from the file. |
There was a problem hiding this comment.
udim wrote:
done
Done.
| # TODO: Consider using cStringIO instead of buffers and data_lists when reading | ||
| # and writing. | ||
| class BufferedWriter(object): | ||
| """A class for writing files from stateless services. |
There was a problem hiding this comment.
udim wrote:
Stateless refers to the file API: there is no long-lived file handle, the filesystem server doesn't store our current file position, etc.
Done.
|
|
||
| @abc.abstractmethod | ||
| def start(self, download_stream, buffer_size): | ||
| """Initialize downloader. |
There was a problem hiding this comment.
udim wrote:
Thanks. I've removed the start() method entirely.
Done.
| self._get_request.generation = metadata.generation | ||
|
|
||
| # Initialize read buffer state. | ||
| self.download_stream = cStringIO.StringIO() |
There was a problem hiding this comment.
charlesccychen wrote:
Can we add an underscore / make this private too?
Done.
|
|
||
| @abc.abstractproperty | ||
| def last_error(self): | ||
| """Last error encountered for this instance.""" |
There was a problem hiding this comment.
charlesccychen wrote:
In the docstring I mean, for future filesystem implementors.
After further consideration, I think this property is safe to remove.
| return 0 | ||
|
|
||
| start = self._position | ||
| end = min(self._position + len(b) - 1, self._downloader.size - 1) |
There was a problem hiding this comment.
charlesccychen wrote:
See comment above about possibly making this range end non-inclusive.
Done.
|
|
||
| @abc.abstractmethod | ||
| def get_range(self, start, end): | ||
| """Retrieve a given byte range from this download, inclusive. |
There was a problem hiding this comment.
charlesccychen wrote:
Can we make the range half-open, i.e. [start, end)? We used inclusive indices because we used the apitools library for GCS, and the reason the apitools library uses inclusive indices is because the HTTP range header uses inclusive indices (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests), but that doesn't seem like a great reason for our interface to be inclusive too.
Done.
| Mimics behavior of the readline() method on standard file objects. | ||
| def get_range(self, start, end): | ||
| self.download_stream.truncate(0) | ||
| self._downloader.GetRange(start, end) |
There was a problem hiding this comment.
charlesccychen wrote:
If we make the end index non-inclusive (see comment on filesystemio.py), we need to useend - 1here. The reason the apitools library uses inclusive indices is because the HTTP range header uses inclusive indices (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests), but that doesn't seem like a great reason for our interface to be inclusive too.
Done.
| return self._size | ||
|
|
||
| Mimics behavior of the readline() method on standard file objects. | ||
| def get_range(self, start, end): |
There was a problem hiding this comment.
charlesccychen wrote:
See comment in filesystemio.py about making the range half-open.
Done.
| return self.position | ||
|
|
||
| def seek(self, offset, whence=os.SEEK_SET): | ||
| # The gcsio.Uploader class insists on seeking to the end of a stream to |
There was a problem hiding this comment.
charlesccychen wrote:
"The apitools library used by the gcsio.Uploader class"
Done.
charlesccychen
left a comment
There was a problem hiding this comment.
Thanks! This LGTM.
| return self._read_inner(size=size, readline=False) | ||
| return self._client.objects.Get(get_request) | ||
|
|
||
| def readline(self, size=-1): |
There was a problem hiding this comment.
charlesccychen wrote:
Thanks!
Done.
| def finish(self): | ||
| self._conn.close() | ||
| # TODO(udim): Add timeout=DEFAULT_HTTP_TIMEOUT_SECONDS * 2 and check | ||
| # isAlive. |
There was a problem hiding this comment.
udim wrote:
It's just incorrect to wait forever without a timeout or some mechanism to interrupt the wait.
Acknowledged.
|
run python postcommit |
New module filesystemio introduces Uploader and Downloader interfaces, plus respective UploaderStream and DownloaderStream adapters that may be wrapped by io.BufferedWriter and io.BufferedReader.
|
Rebased commits into one. |
Most of the code in filesystemio.py is copied verbatim from gcsio.py.
The Downloader and Uploader classes are new.