From 57f5f062561685f26d2fa2c18e072b3c663b4e04 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 18 Apr 2017 16:23:51 -0700 Subject: [PATCH 1/3] Remove vestigial Read and Write from core.py --- sdks/python/apache_beam/pipeline_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 13 ++----------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 631460990008..05503bd5f634 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -25,6 +25,7 @@ # from nose.plugins.attrib import attr import apache_beam as beam +from apache_beam.io import Read from apache_beam.metrics import Metrics from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions @@ -39,7 +40,6 @@ from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform -from apache_beam.transforms import Read from apache_beam.transforms import WindowInto from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bdfddbb89d85..c7e7a14cb32c 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1359,13 +1359,14 @@ def infer_output_type(self, unused_input_type): return Union[[trivial_inference.instance_to_type(v) for v in self.value]] def expand(self, pbegin): + from apache_beam.io import iobase assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline ouput_type = (self.get_type_hints().simple_output_type(self.label) or self.infer_output_type(None)) coder = typecoders.registry.get_coder(ouput_type) source = self._create_source_from_iterable(self.value, coder) - return pbegin.pipeline | Read(source).with_output_types(ouput_type) + return pbegin.pipeline | iobase.Read(source).with_output_types(ouput_type) def get_windowing(self, unused_inputs): return Windowing(GlobalWindows()) @@ -1453,13 +1454,3 @@ def estimate_size(self): return self._total_size return _CreateSource(serialized_values, coder) - - -def Read(*args, **kwargs): - from apache_beam import io - return io.Read(*args, **kwargs) - - -def Write(*args, **kwargs): - from apache_beam import io - return io.Write(*args, **kwargs) From da64c3c9f24bca9d9f554cc45e3cec7cb5f7bce8 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 19 Apr 2017 22:38:48 -0700 Subject: [PATCH 2/3] fixup: more occurances --- .../examples/cookbook/bigquery_side_input.py | 2 +- .../apache_beam/examples/cookbook/filters.py | 2 +- .../apache_beam/io/filebasedsource_test.py | 18 +++++++++--------- sdks/python/apache_beam/io/sources_test.py | 2 +- sdks/python/apache_beam/io/tfrecordio_test.py | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 486cc88f1867..f68c95d03f4a 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -101,7 +101,7 @@ def run(argv=None): pcoll_corpus = p | 'read corpus' >> beam.io.Read( beam.io.BigQuerySource(query=query_corpus)) - pcoll_word = p | 'read_words' >> beam.Read( + pcoll_word = p | 'read_words' >> beam.io.Read( beam.io.BigQuerySource(query=query_word)) pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( [ignore_corpus]) diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index d13d823972bf..374001cd83fe 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -88,7 +88,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input)) + input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index e681f262eecb..5318c4d9c6e6 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -389,7 +389,7 @@ def test_read_splits_file_pattern(self): def _run_source_test(self, pattern, expected_data, splittable=True): pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( pattern, splittable=splittable)) assert_that(pcoll, equal_to(expected_data)) pipeline.run() @@ -429,7 +429,7 @@ def test_read_file_bzip2(self): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, splittable=False, compression_type=CompressionTypes.BZIP2)) @@ -444,7 +444,7 @@ def test_read_file_gzip(self): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, splittable=False, compression_type=CompressionTypes.GZIP)) @@ -462,7 +462,7 @@ def test_read_pattern_bzip2(self): compressobj.compress('\n'.join(c)) + compressobj.flush()) file_pattern = write_prepared_pattern(compressed_chunks) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, splittable=False, compression_type=CompressionTypes.BZIP2)) @@ -481,7 +481,7 @@ def test_read_pattern_gzip(self): compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern(compressed_chunks) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, splittable=False, compression_type=CompressionTypes.GZIP)) @@ -496,7 +496,7 @@ def test_read_auto_single_file_bzip2(self): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -510,7 +510,7 @@ def test_read_auto_single_file_gzip(self): f.write('\n'.join(lines)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( filename, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -529,7 +529,7 @@ def test_read_auto_pattern(self): file_pattern = write_prepared_pattern( compressed_chunks, suffixes=['.gz']*len(chunks)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) @@ -551,7 +551,7 @@ def test_read_auto_pattern_compressed_and_uncompressed(self): file_pattern = write_prepared_pattern(chunks_to_write, suffixes=(['.gz', '']*3)) pipeline = TestPipeline() - pcoll = pipeline | 'Read' >> beam.Read(LineSource( + pcoll = pipeline | 'Read' >> beam.io.Read(LineSource( file_pattern, compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index dc0fd54e9fbb..3f92756979b3 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -100,7 +100,7 @@ def test_read_from_source(self): def test_run_direct(self): file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd') pipeline = TestPipeline() - pcoll = pipeline | beam.Read(LineSource(file_name)) + pcoll = pipeline | beam.io.Read(LineSource(file_name)) assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd'])) pipeline.run() diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index 49f96393ec3c..d8c706ea9888 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -248,7 +248,7 @@ def test_process_single(self): self._write_file(path, FOO_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -261,7 +261,7 @@ def test_process_multiple(self): self._write_file(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -274,7 +274,7 @@ def test_process_gzip(self): self._write_file_gzip(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), @@ -287,7 +287,7 @@ def test_process_auto(self): self._write_file_gzip(path, FOO_BAR_RECORD_BASE64) with TestPipeline() as p: result = (p - | beam.Read( + | beam.io.Read( _TFRecordSource( path, coder=coders.BytesCoder(), From 138af49ffca28d86b57d7fd235745cbef8014757 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 19 Apr 2017 22:43:13 -0700 Subject: [PATCH 3/3] fixup: and another --- sdks/python/apache_beam/io/concat_source_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 77d264747973..7c16e6335e51 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -214,7 +214,7 @@ def test_run_concat_direct(self): RangeSource(100, 1000), ]) pipeline = TestPipeline() - pcoll = pipeline | beam.Read(source) + pcoll = pipeline | beam.io.Read(source) assert_that(pcoll, equal_to(range(1000))) pipeline.run()