Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
"""Implements a source for reading Avro files."""

import cStringIO as StringIO
import cStringIO
import os
import zlib

Expand Down Expand Up @@ -198,8 +198,10 @@ def _decompress_bytes(data, codec):
raise ValueError('Snappy does not seem to be installed.')

# Compressed data includes a 4-byte CRC32 checksum which we verify.
result = snappy.decompress(data[:-4])
avroio.BinaryDecoder(StringIO.StringIO(data[-4:])).check_crc32(result)
# We take care to avoid extra copies of data while slicing large objects
# by use of a buffer.
result = snappy.decompress(buffer(data)[:-4])
avroio.BinaryDecoder(cStringIO.StringIO(data[-4:])).check_crc32(result)
return result
else:
raise ValueError('Unknown codec: %r', codec)
Expand All @@ -209,7 +211,7 @@ def num_records(self):

def records(self):
decoder = avroio.BinaryDecoder(
StringIO.StringIO(self._decompressed_block_bytes))
cStringIO.StringIO(self._decompressed_block_bytes))
reader = avroio.DatumReader(
writers_schema=self._schema, readers_schema=self._schema)

Expand Down
8 changes: 4 additions & 4 deletions sdks/python/apache_beam/io/filebasedsource_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

import bz2
import cStringIO as StringIO
import cStringIO
import gzip
import logging
import math
Expand Down Expand Up @@ -451,7 +451,7 @@ def test_read_pattern_gzip(self):
chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
compressed_chunks = []
for c in chunks:
out = StringIO.StringIO()
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
Expand Down Expand Up @@ -498,7 +498,7 @@ def test_read_auto_pattern(self):
chunks = [lines[splits[i - 1]:splits[i]] for i in xrange(1, len(splits))]
compressed_chunks = []
for c in chunks:
out = StringIO.StringIO()
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
compressed_chunks.append(out.getvalue())
Expand All @@ -518,7 +518,7 @@ def test_read_auto_pattern_compressed_and_uncompressed(self):
chunks_to_write = []
for i, c in enumerate(chunks):
if i%2 == 0:
out = StringIO.StringIO()
out = cStringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write('\n'.join(c))
chunks_to_write.append(out.getvalue())
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class _CompressedFile(object):
def __init__(self,
fileobj,
compression_type=CompressionTypes.GZIP,
read_size=16384):
read_size=gcsio.DEFAULT_READ_BUFFER_SIZE):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this is 16 KB while gcsio.DEFAULT_READ_BUFFER_SIZE is 16 MB. Could you mention that change in the PR description.

Also I don't think we should refer to a constant defined in one file system (GCS) from fileio. Redefine a constant here ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is already mentioned in the PR description (and the individual commit that introduces it). Would you like me to rephrase it in some way?

As for the constant, there's already several parts of fileio that depend on GcsIO (including constants like gcsio.MAX_BATCH_OPERATION_SIZE) etc. And this might make sense from a performance perspective (indeed this PR is moving towards that), so if we think we should refactor constants it should probably be done as a separate PR that does the refactoring?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clearly mention the increase of buffer size in PR description (16 KB to 16 MB).

Most other references to gcsio seems to be in ChannelFactory which is fine. Only other problematic reference I see is gcsio.MAX_BATCH_OPERATION_SIZE which probably should be removed at some point as well. I would like not to add more such references if possible :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created https://issues.apache.org/jira/browse/BEAM-1222 to remove GCS specific constants from fileio.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll add more info there. Leaving DEFAULT_READ_BUFFER_SIZE here for now since its comments are GCS specific (and probably not worth going into fileio), and otherwise we would need 2 constants to be kept in sync. Ie it makes sense to stay as is until the JIRA mentioned above is resolved.

if not fileobj:
raise ValueError('fileobj must be opened file but was %s' % fileobj)
self._validate_compression_type(compression_type)
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/io/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
https://github.com/GoogleCloudPlatform/appengine-gcs-client.
"""

import cStringIO as StringIO
import cStringIO
import errno
import fnmatch
import logging
Expand Down Expand Up @@ -418,7 +418,7 @@ def __init__(self,
get_request.generation = metadata.generation

# Initialize read buffer state.
self.download_stream = StringIO.StringIO()
self.download_stream = cStringIO.StringIO()
self.downloader = transfer.Download(
self.download_stream, auto_transfer=False, chunksize=buffer_size)
self.client.objects.Get(get_request, download=self.downloader)
Expand Down Expand Up @@ -558,7 +558,7 @@ def _get_segment(self, start, size):
end = start + size - 1
self.downloader.GetRange(start, end)
value = self.download_stream.getvalue()
# Clear the StringIO object after we've read its contents.
# Clear the cStringIO object after we've read its contents.
self.download_stream.truncate(0)
assert len(value) == size
return value
Expand Down