Skip to content

Commit

Permalink
Closes #1757
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Jan 11, 2017
2 parents 2d190a2 + fb80e09 commit 86d4203
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/io/textio.py
Expand Up @@ -110,6 +110,16 @@ def read_records(self, file_name, range_tracker):
while range_tracker.try_claim(next_record_start_position):
record, num_bytes_to_next_record = self._read_record(file_to_read,
read_buffer)

# For compressed text files that use an unsplittable OffsetRangeTracker
# with infinity as the end position, above 'try_claim()' invocation
# would pass for an empty record at the end of file that is not
# followed by a new line character. Since such a record is at the last
# position of a file, it should not be a part of the considered range.
# We do this check to ignore such records.
if len(record) == 0 and num_bytes_to_next_record < 0:
break

yield self._coder.decode(record)
if num_bytes_to_next_record < 0:
break
Expand Down
44 changes: 42 additions & 2 deletions sdks/python/apache_beam/io/textio_test.py
Expand Up @@ -56,11 +56,12 @@ class TextSourceTest(unittest.TestCase):
DEFAULT_NUM_RECORDS = 100

def _run_read_test(self, file_or_pattern, expected_data,
buffer_size=DEFAULT_NUM_RECORDS):
buffer_size=DEFAULT_NUM_RECORDS,
compression=CompressionTypes.UNCOMPRESSED):
# Since each record usually takes more than 1 byte, default buffer size is
# smaller than the total size of the file. This is done to
# increase test coverage for cases that hit the buffer boundary.
source = TextSource(file_or_pattern, 0, CompressionTypes.UNCOMPRESSED,
source = TextSource(file_or_pattern, 0, compression,
True, coders.StrUtf8Coder(), buffer_size)
range_tracker = source.get_range_tracker(None, None)
read_data = [record for record in source.read(range_tracker)]
Expand Down Expand Up @@ -128,6 +129,45 @@ def test_read_empty_single_file(self):
# without an end of line character.
self._run_read_test(file_name, [])

def test_read_single_file_last_line_no_eol_gzip(self):
file_name, expected_data = write_data(
TextSourceTest.DEFAULT_NUM_RECORDS,
eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)

gzip_file_name = file_name + '.gz'
with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
dst.writelines(src)

assert len(expected_data) == TextSourceTest.DEFAULT_NUM_RECORDS
self._run_read_test(gzip_file_name, expected_data,
compression=CompressionTypes.GZIP)

def test_read_single_file_single_line_no_eol_gzip(self):
file_name, expected_data = write_data(
1, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)

gzip_file_name = file_name + '.gz'
with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
dst.writelines(src)

assert len(expected_data) == 1
self._run_read_test(gzip_file_name, expected_data,
compression=CompressionTypes.GZIP)

def test_read_empty_single_file_no_eol_gzip(self):
file_name, written_data = write_data(
1, no_data=True, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE)

gzip_file_name = file_name + '.gz'
with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst:
dst.writelines(src)

assert len(written_data) == 1
# written data has a single entry with an empty string. Reading the source
# should not produce anything since we only wrote a single empty string
# without an end of line character.
self._run_read_test(gzip_file_name, [], compression=CompressionTypes.GZIP)

def test_read_single_file_with_empty_lines(self):
file_name, expected_data = write_data(
TextSourceTest.DEFAULT_NUM_RECORDS, no_data=True, eol=EOL.LF)
Expand Down

0 comments on commit 86d4203

Please sign in to comment.