diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 09cf02427efaf..4cdab12be5e27 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -110,6 +110,16 @@ def read_records(self, file_name, range_tracker): while range_tracker.try_claim(next_record_start_position): record, num_bytes_to_next_record = self._read_record(file_to_read, read_buffer) + + # For compressed text files that use an unsplittable OffsetRangeTracker + # with infinity as the end position, above 'try_claim()' invocation + # would pass for an empty record at the end of file that is not + # followed by a new line character. Since such a record is at the last + # position of a file, it should not be a part of the considered range. + # We do this check to ignore such records. + if len(record) == 0 and num_bytes_to_next_record < 0: + break + yield self._coder.decode(record) if num_bytes_to_next_record < 0: break diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 39ddec44a92c4..877e1901d9f01 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -56,11 +56,12 @@ class TextSourceTest(unittest.TestCase): DEFAULT_NUM_RECORDS = 100 def _run_read_test(self, file_or_pattern, expected_data, - buffer_size=DEFAULT_NUM_RECORDS): + buffer_size=DEFAULT_NUM_RECORDS, + compression=CompressionTypes.UNCOMPRESSED): # Since each record usually takes more than 1 byte, default buffer size is # smaller than the total size of the file. This is done to # increase test coverage for cases that hit the buffer boundary. - source = TextSource(file_or_pattern, 0, CompressionTypes.UNCOMPRESSED, + source = TextSource(file_or_pattern, 0, compression, True, coders.StrUtf8Coder(), buffer_size) range_tracker = source.get_range_tracker(None, None) read_data = [record for record in source.read(range_tracker)] @@ -128,6 +129,45 @@ def test_read_empty_single_file(self): # without an end of line character. self._run_read_test(file_name, []) + def test_read_single_file_last_line_no_eol_gzip(self): + file_name, expected_data = write_data( + TextSourceTest.DEFAULT_NUM_RECORDS, + eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE) + + gzip_file_name = file_name + '.gz' + with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst: + dst.writelines(src) + + assert len(expected_data) == TextSourceTest.DEFAULT_NUM_RECORDS + self._run_read_test(gzip_file_name, expected_data, + compression=CompressionTypes.GZIP) + + def test_read_single_file_single_line_no_eol_gzip(self): + file_name, expected_data = write_data( + 1, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE) + + gzip_file_name = file_name + '.gz' + with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst: + dst.writelines(src) + + assert len(expected_data) == 1 + self._run_read_test(gzip_file_name, expected_data, + compression=CompressionTypes.GZIP) + + def test_read_empty_single_file_no_eol_gzip(self): + file_name, written_data = write_data( + 1, no_data=True, eol=EOL.LF_WITH_NOTHING_AT_LAST_LINE) + + gzip_file_name = file_name + '.gz' + with open(file_name) as src, gzip.open(gzip_file_name, 'wb') as dst: + dst.writelines(src) + + assert len(written_data) == 1 + # written data has a single entry with an empty string. Reading the source + # should not produce anything since we only wrote a single empty string + # without an end of line character. + self._run_read_test(gzip_file_name, [], compression=CompressionTypes.GZIP) + def test_read_single_file_with_empty_lines(self): file_name, expected_data = write_data( TextSourceTest.DEFAULT_NUM_RECORDS, no_data=True, eol=EOL.LF)