From 520d2c9f9c5c6a5771879b9b70a226242ef8cfff Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Mon, 27 Jun 2016 17:59:35 -0700 Subject: [PATCH 1/3] Enables more linting rules. - Fixes some import related warnings and enabled related pylint rules. - Use pep8 for blank line related style check. pylint does not check for pep8 related to pep8. And pep8 is not configurable for indentations. So we need both tools. - Fixed existing lint error related to blank lines. I excluded 3 files for the below usage: T = TypeVariable('T') with_input_types(T) @with_output_types(List[T]) class TopCombineFn(core.CombineFn): Here pep8 style require 2 blanks lines between first and second lines, even though they are logically connected. And there is no option to suppress this error locally. --- sdks/python/.pylintrc | 3 - sdks/python/apache_beam/__init__.py | 3 +- sdks/python/apache_beam/coders/coder_impl.py | 4 +- sdks/python/apache_beam/coders/coders.py | 4 +- .../apache_beam/coders/fast_coders_test.py | 2 +- .../apache_beam/coders/slow_coders_test.py | 2 +- sdks/python/apache_beam/coders/stream_test.py | 4 +- .../complete/juliaset/juliaset/juliaset.py | 9 ++- .../examples/cookbook/bigquery_schema.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 19 +++--- .../examples/snippets/snippets_test.py | 58 ++++++++++++------- sdks/python/apache_beam/internal/apiclient.py | 5 +- .../apache_beam/internal/json_value_test.py | 1 + sdks/python/apache_beam/internal/pickler.py | 2 +- sdks/python/apache_beam/internal/util.py | 1 + sdks/python/apache_beam/io/avroio.py | 3 +- sdks/python/apache_beam/io/bigquery.py | 8 +-- sdks/python/apache_beam/io/bigquery_test.py | 6 +- sdks/python/apache_beam/io/fileio.py | 16 ++--- sdks/python/apache_beam/io/gcsio.py | 7 ++- sdks/python/apache_beam/io/gcsio_test.py | 5 +- sdks/python/apache_beam/pvalue.py | 8 +-- sdks/python/apache_beam/runners/common.py | 6 ++ .../apache_beam/runners/dataflow_runner.py | 5 +- .../apache_beam/runners/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 6 +- .../apache_beam/transforms/aggregator.py | 2 +- .../apache_beam/transforms/combiners_test.py | 4 ++ sdks/python/apache_beam/transforms/core.py | 10 ++-- .../apache_beam/transforms/cy_combiners.py | 53 +++++++++++++++++ .../apache_beam/transforms/ptransform.py | 13 +++-- .../apache_beam/transforms/ptransform_test.py | 10 ++++ .../apache_beam/transforms/sideinputs.py | 2 +- sdks/python/apache_beam/transforms/trigger.py | 1 + .../apache_beam/transforms/trigger_test.py | 5 +- sdks/python/apache_beam/transforms/util.py | 1 + .../apache_beam/transforms/window_test.py | 1 + .../apache_beam/typehints/typehints_test.py | 3 + .../apache_beam/utils/dependency_test.py | 1 + sdks/python/apache_beam/utils/retry_test.py | 4 +- sdks/python/run_pylint.sh | 2 + sdks/python/tox.ini | 13 ++++- 42 files changed, 216 insertions(+), 100 deletions(-) diff --git a/sdks/python/.pylintrc b/sdks/python/.pylintrc index edf13ee7dd14..efe81473ff75 100644 --- a/sdks/python/.pylintrc +++ b/sdks/python/.pylintrc @@ -83,7 +83,6 @@ disable = arguments-differ, attribute-defined-outside-init, bad-builtin, - bad-option-value, bad-super-call, broad-except, consider-using-enumerate, @@ -131,8 +130,6 @@ disable = unused-wildcard-import, used-before-assignment, wildcard-import, - wrong-import-order, - wrong-import-position, [REPORTS] diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py index c9bc736cb171..eed251b83e4e 100644 --- a/sdks/python/apache_beam/__init__.py +++ b/sdks/python/apache_beam/__init__.py @@ -71,7 +71,7 @@ 'Dataflow SDK for Python is supported only on Python 2.7. ' 'It is not supported on Python [%s].' % sys.version) - +# pylint: disable=wrong-import-position import apache_beam.internal.pickler from apache_beam import coders @@ -79,3 +79,4 @@ from apache_beam import typehints from apache_beam.pipeline import Pipeline from apache_beam.transforms import * +# pylint: enable=wrong-import-position diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 07b67116a4cc..19b78b45c5ac 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ from cPickle import loads, dumps -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order try: # Don't depend on the full dataflow sdk to test coders. from apache_beam.transforms.window import WindowedValue @@ -43,7 +43,7 @@ except ImportError: from slow_stream import InputStream as create_InputStream from slow_stream import OutputStream as create_OutputStream -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order class CoderImpl(object): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 6d5b10ac2442..7c3f757a56c6 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -24,7 +24,7 @@ from apache_beam.coders import coder_impl -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order # Avoid dependencies on the full SDK. try: # Import dill from the pickler module to make sure our monkey-patching of dill @@ -46,7 +46,7 @@ def serialize_coder(coder): def deserialize_coder(serialized): from apache_beam.internal import pickler return pickler.loads(serialized.split('$', 1)[1]) -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order class Coder(object): diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index 466fd572452d..907809c94e6c 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -28,7 +28,7 @@ class FastCoders(unittest.TestCase): def test_using_fast_impl(self): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order # pylint: disable=unused-variable import apache_beam.coders.stream diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index 26e6fa7eb070..f85d47ba9517 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -30,7 +30,7 @@ class SlowCoders(unittest.TestCase): def test_using_slow_impl(self): # Assert that we are not using the compiled implementation. with self.assertRaises(ImportError): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order # pylint: disable=unused-variable import apache_beam.coders.stream diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index f2163ca54405..cfd627fc635a 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -139,7 +139,7 @@ def test_byte_counting(self): try: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-position from apache_beam.coders import stream class FastStreamTest(StreamTest): @@ -148,14 +148,12 @@ class FastStreamTest(StreamTest): OutputStream = stream.OutputStream ByteCountingOutputStream = stream.ByteCountingOutputStream - class SlowFastStreamTest(StreamTest): """Runs the test with compiled and uncompiled stream classes.""" InputStream = stream.InputStream OutputStream = slow_stream.OutputStream ByteCountingOutputStream = slow_stream.ByteCountingOutputStream - class FastSlowStreamTest(StreamTest): """Runs the test with uncompiled and compiled stream classes.""" InputStream = slow_stream.InputStream diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 16254181d612..ad3ea7f178c5 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -51,14 +51,15 @@ def point_set(n): julia_set_colors = (pipeline | beam.Create('add points', point_set(n)) - | beam.Map(get_julia_set_point_color, c, n, max_iterations)) + | beam.Map( + get_julia_set_point_color, c, n, max_iterations)) return julia_set_colors def generate_julia_set_visualization(data, n, max_iterations): """Generate the pixel matrix for rendering the julia set as an image.""" - import numpy as np # pylint: disable=g-import-not-at-top + import numpy as np # pylint: disable=wrong-import-order colors = [] for r in range(0, 256, 16): for g in range(0, 256, 16): @@ -74,7 +75,7 @@ def generate_julia_set_visualization(data, n, max_iterations): def save_julia_set_visualization(out_file, image_array): """Save the fractal image of our julia set as a png.""" - from matplotlib import pyplot as plt # pylint: disable=g-import-not-at-top + from matplotlib import pyplot as plt # pylint: disable=wrong-import-order plt.imsave(out_file, image_array, format='png') @@ -104,13 +105,11 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates to # the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned - # pylint: disable=g-long-lambda (coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i))) | beam.GroupByKey('x coord') | beam.Map( 'format', lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) | beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output))) - # pylint: enable=g-long-lambda # pylint: enable=expression-not-assigned p.run() diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 99a967c0bd0b..0cb5d66639bf 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -44,7 +44,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - from apache_beam.internal.clients import bigquery # pylint: disable=g-import-not-at-top + from apache_beam.internal.clients import bigquery # pylint: disable=wrong-import-order table_schema = bigquery.TableSchema() diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index e7af26a30529..e7b556c84358 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -40,8 +40,7 @@ # pylint:disable=expression-not-assigned # pylint:disable=redefined-outer-name # pylint:disable=unused-variable -# pylint:disable=g-doc-args -# pylint:disable=g-import-not-at-top +# pylint:disable=wrong-import-order class SnippetUtils(object): @@ -95,7 +94,7 @@ def filter_words(unused_x): # [START pipelines_constructing_reading] lines = p | beam.io.Read('ReadMyFile', - beam.io.TextFileSource('gs://some/inputData.txt')) + beam.io.TextFileSource('gs://some/inputData.txt')) # [END pipelines_constructing_reading] # [START pipelines_constructing_applying] @@ -106,7 +105,8 @@ def filter_words(unused_x): # [START pipelines_constructing_writing] filtered_words = reversed_words | beam.Filter('FilterWords', filter_words) filtered_words | beam.io.Write('WriteMyFile', - beam.io.TextFileSink('gs://some/outputData.txt')) + beam.io.TextFileSink( + 'gs://some/outputData.txt')) # [END pipelines_constructing_writing] p.visit(SnippetUtils.RenameFiles(renames)) @@ -304,7 +304,8 @@ def pipeline_options_command_line(argv): # Create the Pipeline with remaining arguments. p = beam.Pipeline(argv=pipeline_args) - lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(known_args.input)) + lines = p | beam.io.Read('ReadFromText', + beam.io.TextFileSource(known_args.input)) lines | beam.io.Write('WriteToText', beam.io.TextFileSink(known_args.output)) # [END pipeline_options_command_line] @@ -594,7 +595,8 @@ def process(self, context): | beam.ParDo('FilterText', FilterTextFn('Flourish|stomach'))) # [START example_wordcount_debugging_assert] - beam.assert_that(filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) + beam.assert_that( + filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) # [END example_wordcount_debugging_assert] output = (filtered_words @@ -634,7 +636,7 @@ def filter_words(x): # [START model_pipelineio_write] filtered_words | beam.io.Write( 'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers', - file_name_suffix='.csv')) + file_name_suffix='.csv')) # [END model_pipelineio_write] # [END model_textio_write] @@ -762,6 +764,7 @@ def model_multiple_pcollections_partition(contents, output_path): URL: https://cloud.google.com/dataflow/model/multiple-pcollections """ some_hash_fn = lambda s: ord(s[0]) + def get_percentile(i): """Assume i in [0,100).""" return i @@ -770,6 +773,7 @@ def get_percentile(i): p = beam.Pipeline(options=PipelineOptions()) students = p | beam.Create(contents) + # [START model_multiple_pcollections_partition] def partition_fn(student, num_partitions): return int(get_percentile(student) * num_partitions / 100) @@ -872,4 +876,3 @@ def apply(self, pcoll): | beam.Map('Init', lambda v: (v, 1)) | beam.CombinePerKey(sum)) # [END model_library_transforms_count] -# pylint: enable=g-wrong-blank-lines diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f2f9552b57a2..87ce266a5244 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -43,6 +43,7 @@ def test_pardo(self): # the text of the doc. words = ['aa', 'bbb', 'c'] + # [START model_pardo_pardo] class ComputeWordLengthFn(beam.DoFn): def process(self, context): @@ -57,6 +58,7 @@ def process(self, context): def test_pardo_yield(self): words = ['aa', 'bbb', 'c'] + # [START model_pardo_yield] class ComputeWordLengthFn(beam.DoFn): def process(self, context): @@ -84,11 +86,12 @@ def test_pardo_using_flatmap(self): def test_pardo_using_flatmap_yield(self): words = ['aA', 'bbb', 'C'] + # [START model_pardo_using_flatmap_yield] def capitals(word): for letter in word: if 'A' <= letter <= 'Z': - yield letter + yield letter all_capitals = words | beam.FlatMap(capitals) # [END model_pardo_using_flatmap_yield] @@ -113,27 +116,31 @@ def filter_using_length(word, lower_bound, upper_bound=float('inf')): yield word # Construct a deferred side input. - avg_word_len = words | beam.Map(len) | beam.CombineGlobally(beam.combiners.MeanCombineFn()) + avg_word_len = (words + | beam.Map(len) + | beam.CombineGlobally(beam.combiners.MeanCombineFn())) # Call with explicit side inputs. small_words = words | beam.FlatMap('small', filter_using_length, 0, 3) # A single deferred side input. - larger_than_average = words | beam.FlatMap('large', - filter_using_length, - lower_bound=pvalue.AsSingleton(avg_word_len)) + larger_than_average = (words + | beam.FlatMap('large', filter_using_length, + lower_bound=pvalue.AsSingleton( + avg_word_len))) # Mix and match. small_but_nontrivial = words | beam.FlatMap(filter_using_length, - lower_bound=2, - upper_bound=pvalue.AsSingleton(avg_word_len)) + lower_bound=2, + upper_bound=pvalue.AsSingleton( + avg_word_len)) # [END model_pardo_side_input] beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc'])) beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']), - label='larger_than_average') + label='larger_than_average') beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']), - label='small_but_not_trivial') + label='small_but_not_trivial') p.run() def test_pardo_side_input_dofn(self): @@ -170,9 +177,8 @@ def process(self, context, cutoff_length, marker): # [START model_pardo_with_side_outputs] results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x') - .with_outputs('above_cutoff_lengths', - 'marked strings', - main='below_cutoff_strings')) + .with_outputs('above_cutoff_lengths', 'marked strings', + main='below_cutoff_strings')) below = results.below_cutoff_strings above = results.above_cutoff_lengths marked = results['marked strings'] # indexing works as well @@ -183,10 +189,12 @@ def process(self, context, cutoff_length, marker): self.assertEqual({'xyz'}, set(marked)) # [START model_pardo_with_side_outputs_iter] - below, above, marked = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x') - .with_outputs('above_cutoff_lengths', - 'marked strings', - main='below_cutoff_strings')) + below, above, marked = (words + | beam.ParDo( + ProcessWords(), cutoff_length=2, marker='x') + .with_outputs('above_cutoff_lengths', + 'marked strings', + main='below_cutoff_strings')) # [END model_pardo_with_side_outputs_iter] self.assertEqual({'a', 'an'}, set(below)) @@ -195,6 +203,7 @@ def process(self, context, cutoff_length, marker): def test_pardo_with_undeclared_side_outputs(self): numbers = [1, 2, 3, 4, 5, 10, 20] + # [START model_pardo_with_side_outputs_undeclared] def even_odd(x): yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x) @@ -258,6 +267,7 @@ def process(self, context): # Helps document the contract and checks it at pipeline construction time. # [START type_hints_transform] T = beam.typehints.TypeVariable('T') + @beam.typehints.with_input_types(T) @beam.typehints.with_output_types(beam.typehints.Tuple[int, T]) class MyTransform(beam.PTransform): @@ -316,7 +326,8 @@ def parse_player_and_score(csv): totals = ( lines | beam.Map(parse_player_and_score) - | beam.CombinePerKey(sum).with_input_types(beam.typehints.Tuple[Player, int])) + | beam.CombinePerKey(sum).with_input_types( + beam.typehints.Tuple[Player, int])) # [END type_hints_deterministic_key] self.assertEquals( @@ -491,12 +502,15 @@ def test_combine_per_key(self): ('cat', 1), ('cat', 5), ('cat', 9), ('cat', 1), ('dog', 5), ('dog', 2)] # [START combine_per_key] - avg_accuracy_per_player = player_accuracies | beam.CombinePerKey(beam.combiners.MeanCombineFn()) + avg_accuracy_per_player = (player_accuracies + | beam.CombinePerKey( + beam.combiners.MeanCombineFn())) # [END combine_per_key] self.assertEqual({('cat', 4.0), ('dog', 3.5)}, set(avg_accuracy_per_player)) def test_combine_concat(self): pc = ['a', 'b'] + # [START combine_concat] def concat(values, separator=', '): return separator.join(values) @@ -511,6 +525,7 @@ def concat(values, separator=', '): def test_bounded_sum(self): # [START combine_bounded_sum] pc = [1, 10, 100, 1000] + def bounded_sum(values, bound=500): return min(sum(values), bound) small_sum = pc | beam.CombineGlobally(bounded_sum) # [500] @@ -524,23 +539,26 @@ def test_combine_reduce(self): # [START combine_reduce] import functools import operator - product = factors | beam.CombineGlobally(functools.partial(reduce, operator.mul), 1) + product = factors | beam.CombineGlobally( + functools.partial(reduce, operator.mul), 1) # [END combine_reduce] self.assertEqual([210], product) def test_custom_average(self): pc = [2, 3, 5, 7] - # [START combine_custom_average] class AverageFn(beam.CombineFn): def create_accumulator(self): return (0.0, 0) + def add_input(self, (sum, count), input): return sum + input, count + 1 + def merge_accumulators(self, accumulators): sums, counts = zip(*accumulators) return sum(sums), sum(counts) + def extract_output(self, (sum, count)): return sum / count if count else float('NaN') average = pc | beam.CombineGlobally(AverageFn()) diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index cee501da2b3a..e30d55362da9 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -25,6 +25,9 @@ import time +from apitools.base.py import encoding +from apitools.base.py import exceptions + from apache_beam import utils from apache_beam import version from apache_beam.internal import pickler @@ -40,8 +43,6 @@ from apache_beam.utils.options import StandardOptions from apache_beam.utils.options import WorkerOptions -from apitools.base.py import encoding -from apitools.base.py import exceptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py index 9b26ab296d07..f2ae0c1a0494 100644 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ b/sdks/python/apache_beam/internal/json_value_test.py @@ -24,6 +24,7 @@ from apache_beam.internal.json_value import from_json_value from apache_beam.internal.json_value import to_json_value + class JsonValueTest(unittest.TestCase): def test_string_to(self): diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index edf63ad39d8c..898e04b9a898 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -159,13 +159,13 @@ def new_save_module_dict(pickler, obj): return old_save_module_dict(pickler, obj) dill.dill.save_module_dict = new_save_module_dict - def _nest_dill_logging(): """Prefix all dill logging with its depth in the callstack. Useful for debugging pickling of deeply nested structures. """ old_log_info = dill.dill.log.info + def new_log_info(msg, *args, **kwargs): old_log_info( ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 50ea2f6044f5..ad60ba6acfcc 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -58,6 +58,7 @@ def remove_objects_from_args(args, kwargs, pvalue_classes): a placeholder value. """ pvals = [] + def swapper(value): pvals.append(value) return ArgumentPlaceholder() diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 022a68df7b4c..25412af560d5 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -21,11 +21,12 @@ import StringIO import zlib -from apache_beam.io import filebasedsource from avro import datafile from avro import io as avro_io from avro import schema +from apache_beam.io import filebasedsource + class AvroSource(filebasedsource.FileBasedSource): """A source for reading Avro files. diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 9d33134f77d9..e394528c0e7b 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -110,6 +110,8 @@ import time import uuid +from apitools.base.py.exceptions import HttpError + from apache_beam import coders from apache_beam.internal import auth from apache_beam.internal.json_value import from_json_value @@ -118,15 +120,13 @@ from apache_beam.utils import retry from apache_beam.utils.options import GoogleCloudOptions -from apitools.base.py.exceptions import HttpError - # Protect against environments where bigquery library is not available. -# pylint: disable=g-import-not-at-top +# pylint: disable=wrong-import-order try: from apache_beam.internal.clients import bigquery except ImportError: pass -# pylint: enable=g-import-not-at-top +# pylint: enable=wrong-import-order __all__ = [ diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index 7c315265523a..2bca0dc67556 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -22,16 +22,16 @@ import time import unittest +from apitools.base.py.exceptions import HttpError import mock + import apache_beam as beam +from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder from apache_beam.utils.options import PipelineOptions -from apitools.base.py.exceptions import HttpError -from apache_beam.internal.clients import bigquery - class TestRowAsDictJsonCoder(unittest.TestCase): diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 560d9352f298..2a22aa3c23ff 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -172,7 +172,7 @@ def mkdir(path): @staticmethod def open(path, mode, mime_type): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio return gcsio.GcsIO().open(path, mode, mime_type=mime_type) else: @@ -182,7 +182,7 @@ def open(path, mode, mime_type): def rename(src, dst): if src.startswith('gs://'): assert dst.startswith('gs://'), dst - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio gcsio.GcsIO().rename(src, dst) else: @@ -197,7 +197,7 @@ def copytree(src, dst): assert dst.startswith('gs://'), dst assert src.endswith('/'), src assert dst.endswith('/'), dst - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio gcsio.GcsIO().copytree(src, dst) else: @@ -211,7 +211,7 @@ def copytree(src, dst): @staticmethod def exists(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio return gcsio.GcsIO().exists() else: @@ -220,7 +220,7 @@ def exists(path): @staticmethod def rmdir(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio gcs = gcsio.GcsIO() if not path.endswith('/'): @@ -237,7 +237,7 @@ def rmdir(path): @staticmethod def rm(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio gcsio.GcsIO().delete(path) else: @@ -249,7 +249,7 @@ def rm(path): @staticmethod def glob(path): if path.startswith('gs://'): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio return gcsio.GcsIO().glob(path) else: @@ -608,7 +608,7 @@ def __init__(self, source): def __enter__(self): if self.source.is_gcs_source: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.io import gcsio self._file = gcsio.GcsIO().open(self.source.file_path, 'rb') else: diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 3d7e00f682e0..de353e92bfe1 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -30,16 +30,17 @@ import StringIO import threading +from apitools.base.py.exceptions import HttpError +import apitools.base.py.transfer as transfer + from apache_beam.internal import auth from apache_beam.utils import retry -from apitools.base.py.exceptions import HttpError -import apitools.base.py.transfer as transfer # Issue a friendlier error message if the storage library is not available. # TODO(silviuc): Remove this guard when storage is available everywhere. try: - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.internal.clients import storage except ImportError: raise RuntimeError( diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 138f2a828ada..eeabb1a29245 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -24,13 +24,12 @@ import threading import unittest - import httplib2 - -from apache_beam.io import gcsio from apitools.base.py.exceptions import HttpError from apache_beam.internal.clients import storage +from apache_beam.io import gcsio + class FakeGcsClient(object): # Fake storage client. Usage in gcsio.py is client.objects.Get(...) and diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 6354139a1e29..ba395bc45594 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -348,7 +348,7 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pyli # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value, label=label)) _cache_view(pcoll.pipeline, cache_key, view) @@ -380,7 +380,7 @@ def AsIter(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order view = (pcoll | sideinputs.ViewAsIterable(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -411,7 +411,7 @@ def AsList(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order view = (pcoll | sideinputs.ViewAsList(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -443,7 +443,7 @@ def AsDict(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=g-import-not-at-top + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order view = (pcoll | sideinputs.ViewAsDict(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 0e4a057ee26f..e33d4ce89219 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -32,8 +32,10 @@ class FakeLogger(object): def PerThreadLoggingContext(self, *unused_args, **unused_kwargs): return self + def __enter__(self): pass + def __exit__(self, *unused_args): pass @@ -153,18 +155,22 @@ def _process_outputs(self, element, results): else: self.tagged_receivers[tag].output(windowed_value) + class NoContext(WindowFn.AssignContext): """An uninspectable WindowFn.AssignContext.""" NO_VALUE = object() + def __init__(self, value, timestamp=NO_VALUE): self.value = value self._timestamp = timestamp + @property def timestamp(self): if self._timestamp is self.NO_VALUE: raise ValueError('No timestamp in this context.') else: return self._timestamp + @property def existing_windows(self): raise ValueError('No existing_windows in this context.') diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 7b04ae9187f3..fbc3569bed63 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -86,6 +86,7 @@ def poll_for_job_completion(runner, job_id): # It typically takes about 30 seconds. final_countdown_timer_secs = 50.0 sleep_secs = 5.0 + # Try to prioritize the user-level traceback, if any. def rank_error(msg): if 'work item was attempted' in msg: @@ -151,7 +152,7 @@ def rank_error(msg): def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.internal import apiclient self.job = apiclient.Job(pipeline.options) # The superclass's run will trigger a traversal of all reachable nodes. @@ -244,7 +245,7 @@ def _get_encoded_output_coder(self, transform_node, window_value=True): def _add_step(self, step_kind, step_label, transform_node, side_tags=()): """Creates a Step object and adds it to the cache.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.internal import apiclient step = apiclient.Step(step_kind, self._get_unique_step_name()) self.job.proto.steps.append(step.proto) diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py index 2c73394a39d9..e0df439d7ce2 100644 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct_runner.py @@ -112,7 +112,7 @@ def run_CreatePCollectionView(self, transform_node): values = self._cache.get_pvalue(transform_node.inputs[0]) if isinstance(view, SingletonPCollectionView): has_default, default_value = view._view_options() # pylint: disable=protected-access - if len(values) == 0: # pylint: disable=g-explicit-length-test + if len(values) == 0: if has_default: result = default_value else: diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 9b8d62256d01..1288d8eb6cba 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -40,7 +40,7 @@ def create_runner(runner_name): Raises: RuntimeError: if an invalid runner name is used. """ - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order if runner_name == 'DirectPipelineRunner': import apache_beam.runners.direct_runner return apache_beam.runners.direct_runner.DirectPipelineRunner() @@ -81,7 +81,7 @@ def run(self, pipeline): """Execute the entire pipeline or the sub-DAG reachable from a node.""" # Imported here to avoid circular dependencies. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.pipeline import PipelineVisitor class RunVisitor(PipelineVisitor): @@ -117,7 +117,7 @@ def clear(self, pipeline, node=None): """ # Imported here to avoid circular dependencies. - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.pipeline import PipelineVisitor class ClearVisitor(PipelineVisitor): diff --git a/sdks/python/apache_beam/transforms/aggregator.py b/sdks/python/apache_beam/transforms/aggregator.py index b7167ff783df..69200e87b662 100644 --- a/sdks/python/apache_beam/transforms/aggregator.py +++ b/sdks/python/apache_beam/transforms/aggregator.py @@ -103,6 +103,6 @@ def get_name(thing): def _is_supported_kind(combine_fn): - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.internal.apiclient import metric_translations return combine_fn.__class__ in metric_translations diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 112c591c4e50..10682b40e828 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -122,6 +122,7 @@ def test_sample(self): pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | Create('start', [1, 1, 2, 2]) result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) + def matcher(): def match(actual): # There is always exactly one result. @@ -142,6 +143,7 @@ def match(actual): 'start-perkey', sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) result = pcoll | combine.Sample.FixedSizePerKey('sample', 3) + def matcher(): def match(actual): for _, samples in actual: @@ -180,6 +182,7 @@ def test_to_list_and_to_dict(self): the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] pcoll = pipeline | Create('start', the_list) result = pcoll | combine.ToList('to list') + def matcher(expected): def match(actual): equal_to(expected[0])(actual[0]) @@ -191,6 +194,7 @@ def match(actual): pairs = [(1, 2), (3, 4), (5, 6)] pcoll = pipeline | Create('start-pairs', pairs) result = pcoll | combine.ToDict('to dict') + def matcher(): def match(actual): equal_to([1])([len(actual)]) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 445367fc3cdb..a7edd0e80a02 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -999,10 +999,10 @@ def infer_output_type(self, input_type): def process(self, context): k, vs = context.element - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import create_trigger_driver - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order driver = create_trigger_driver(self.windowing, True) state = InMemoryUnmergedState() # TODO(robertwb): Conditionally process in smaller chunks. @@ -1115,9 +1115,9 @@ class Windowing(object): def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, output_time_fn=None): global AccumulationMode, DefaultTrigger - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order if triggerfn is None: triggerfn = DefaultTrigger() if accumulation_mode is None: @@ -1148,7 +1148,7 @@ def is_default(self): T = typehints.TypeVariable('T') @typehints.with_input_types(T) @typehints.with_output_types(T) -class WindowInto(ParDo): # pylint: disable=g-wrong-blank-lines +class WindowInto(ParDo): """A window transform assigning windows to each element of a PCollection. Transforms an input PCollection by applying a windowing function to each diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index be40d662f855..f82487077ec7 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -26,20 +26,25 @@ class AccumulatorCombineFn(core.CombineFn): # singleton? def create_accumulator(self): return self._accumulator_type() + @staticmethod def add_input(accumulator, element): accumulator.add_input(element) return accumulator + def merge_accumulators(self, accumulators): accumulator = self._accumulator_type() accumulator.merge(accumulators) return accumulator + @staticmethod def extract_output(accumulator): return accumulator.extract_output() + def __eq__(self, other): return (isinstance(other, AccumulatorCombineFn) and self._accumulator_type is other._accumulator_type) + def __hash__(self): return hash(self._accumulator_type) @@ -52,11 +57,14 @@ def __hash__(self): class CountAccumulator(object): def __init__(self): self.value = 0 + def add_input(self, unused_element): self.value += 1 + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): return self.value @@ -64,14 +72,17 @@ def extract_output(self): class SumInt64Accumulator(object): def __init__(self): self.value = 0 + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) self.value += element + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): if not INT64_MIN <= self.value <= INT64_MAX: self.value %= 2**64 @@ -83,16 +94,19 @@ def extract_output(self): class MinInt64Accumulator(object): def __init__(self): self.value = INT64_MAX + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) if element < self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value < self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -100,16 +114,19 @@ def extract_output(self): class MaxInt64Accumulator(object): def __init__(self): self.value = INT64_MIN + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) if element > self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value > self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -118,16 +135,19 @@ class MeanInt64Accumulator(object): def __init__(self): self.sum = 0 self.count = 0 + def add_input(self, element): element = int(element) if not INT64_MIN <= element <= INT64_MAX: raise OverflowError(element) self.sum += element self.count += 1 + def merge(self, accumulators): for accumulator in accumulators: self.sum += accumulator.sum self.count += accumulator.count + def extract_output(self): if not INT64_MIN <= self.sum <= INT64_MAX: self.sum %= 2**64 @@ -138,12 +158,20 @@ def extract_output(self): class CountCombineFn(AccumulatorCombineFn): _accumulator_type = CountAccumulator + + class SumInt64Fn(AccumulatorCombineFn): _accumulator_type = SumInt64Accumulator + + class MinInt64Fn(AccumulatorCombineFn): _accumulator_type = MinInt64Accumulator + + class MaxInt64Fn(AccumulatorCombineFn): _accumulator_type = MaxInt64Accumulator + + class MeanInt64Fn(AccumulatorCombineFn): _accumulator_type = MeanInt64Accumulator @@ -156,12 +184,15 @@ class MeanInt64Fn(AccumulatorCombineFn): class SumDoubleAccumulator(object): def __init__(self): self.value = 0 + def add_input(self, element): element = float(element) self.value += element + def merge(self, accumulators): for accumulator in accumulators: self.value += accumulator.value + def extract_output(self): return self.value @@ -169,14 +200,17 @@ def extract_output(self): class MinDoubleAccumulator(object): def __init__(self): self.value = _POS_INF + def add_input(self, element): element = float(element) if element < self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value < self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -184,14 +218,17 @@ def extract_output(self): class MaxDoubleAccumulator(object): def __init__(self): self.value = _NEG_INF + def add_input(self, element): element = float(element) if element > self.value: self.value = element + def merge(self, accumulators): for accumulator in accumulators: if accumulator.value > self.value: self.value = accumulator.value + def extract_output(self): return self.value @@ -200,24 +237,33 @@ class MeanDoubleAccumulator(object): def __init__(self): self.sum = 0 self.count = 0 + def add_input(self, element): element = float(element) self.sum += element self.count += 1 + def merge(self, accumulators): for accumulator in accumulators: self.sum += accumulator.sum self.count += accumulator.count + def extract_output(self): return self.sum / self.count if self.count else _NAN class SumFloatFn(AccumulatorCombineFn): _accumulator_type = SumDoubleAccumulator + + class MinFloatFn(AccumulatorCombineFn): _accumulator_type = MinDoubleAccumulator + + class MaxFloatFn(AccumulatorCombineFn): _accumulator_type = MaxDoubleAccumulator + + class MeanFloatFn(AccumulatorCombineFn): _accumulator_type = MeanDoubleAccumulator @@ -225,11 +271,14 @@ class MeanFloatFn(AccumulatorCombineFn): class AllAccumulator(object): def __init__(self): self.value = True + def add_input(self, element): self.value &= not not element + def merge(self, accumulators): for accumulator in accumulators: self.value &= accumulator.value + def extract_output(self): return self.value @@ -237,11 +286,14 @@ def extract_output(self): class AnyAccumulator(object): def __init__(self): self.value = False + def add_input(self, element): self.value |= not not element + def merge(self, accumulators): for accumulator in accumulators: self.value |= accumulator.value + def extract_output(self): return self.value @@ -249,5 +301,6 @@ def extract_output(self): class AnyCombineFn(AccumulatorCombineFn): _accumulator_type = AnyAccumulator + class AllCombineFn(AccumulatorCombineFn): _accumulator_type = AllAccumulator diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 5b089b9aa26c..3ad639607ddc 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -383,10 +383,10 @@ def __ror__(self, left): pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)] if pvalues and not pipelines: deferred = False - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam import pipeline from apache_beam.utils.options import PipelineOptions - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order p = pipeline.Pipeline( 'DirectPipelineRunner', PipelineOptions(sys.argv)) else: @@ -403,9 +403,9 @@ def __ror__(self, left): raise ValueError( 'Mixing value from different pipelines not allowed.') deferred = not getattr(p.runner, 'is_eager', False) - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.transforms.core import Create - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order replacements = {id(v): p | Create('CreatePInput%s' % ix, v) for ix, v in enumerate(pvalues) if not isinstance(v, pvalue.PValue) and v is not None} @@ -431,9 +431,9 @@ def _extract_input_pvalues(self, pvalueish): Generally only needs to be overriden for multi-input PTransforms. """ - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam import pipeline - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order if isinstance(pvalueish, pipeline.Pipeline): pvalueish = pvalue.PBegin(pvalueish) @@ -557,6 +557,7 @@ def type_check_inputs(self, pvalueish): type_hints = self.get_type_hints().input_types if type_hints: args, kwargs = self.raw_side_inputs + def element_type(side_input): if isinstance(side_input, pvalue.PCollectionView): return side_input.element_type diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 8ae7a372225a..d6ee18acc55c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -208,8 +208,10 @@ def test_do_fn_with_start_finish(self): class MyDoFn(beam.DoFn): def start_bundle(self, c): yield 'start' + def process(self, c): pass + def finish_bundle(self, c): yield 'finish' pipeline = Pipeline('DirectPipelineRunner') @@ -540,6 +542,7 @@ class NestedFlatten(PTransform): def _extract_input_pvalues(self, pvalueish): pvalueish = list(pvalueish) return pvalueish, sum([list(p.values()) for p in pvalueish], []) + def apply(self, pcoll_dicts): keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts]) res = {} @@ -554,6 +557,7 @@ def apply(self, pcoll_dicts): self.assertEqual(['x', 'x', 'y', 'y', 'z'], sorted(res['b'])) self.assertEqual([], sorted(res['c'])) + @beam.ptransform_fn def SamplePTransform(label, pcoll, context, *args, **kwargs): """Sample transform using the @ptransform_fn decorator.""" @@ -1646,6 +1650,7 @@ def test_sample_globally_pipeline_satisfied(self): | combine.Sample.FixedSizeGlobally('sample', 3)) self.assertCompatible(typehints.Iterable[int], d.element_type) + def matcher(expected_len): def match(actual): equal_to([expected_len])([len(actual[0])]) @@ -1661,6 +1666,7 @@ def test_sample_globally_runtime_satisfied(self): | combine.Sample.FixedSizeGlobally('sample', 2)) self.assertCompatible(typehints.Iterable[int], d.element_type) + def matcher(expected_len): def match(actual): equal_to([expected_len])([len(actual[0])]) @@ -1676,6 +1682,7 @@ def test_sample_per_key_pipeline_satisfied(self): self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) + def matcher(expected_len): def match(actual): for _, sample in actual: @@ -1694,6 +1701,7 @@ def test_sample_per_key_runtime_satisfied(self): self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) + def matcher(expected_len): def match(actual): for _, sample in actual: @@ -1708,6 +1716,7 @@ def test_to_list_pipeline_check_satisfied(self): | combine.ToList('to list')) self.assertCompatible(typehints.List[int], d.element_type) + def matcher(expected): def match(actual): equal_to(expected)(actual[0]) @@ -1723,6 +1732,7 @@ def test_to_list_runtime_check_satisfied(self): | combine.ToList('to list')) self.assertCompatible(typehints.List[str], d.element_type) + def matcher(expected): def match(actual): equal_to(expected)(actual[0]) diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index fc54fa3ef326..c58bd0e8edc5 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -123,7 +123,7 @@ def apply(self, pcoll): V = typehints.TypeVariable('V') @typehints.with_input_types(typehints.Tuple[K, V]) @typehints.with_output_types(typehints.Dict[K, V]) -class ViewAsDict(PTransform): # pylint: disable=g-wrong-blank-lines +class ViewAsDict(PTransform): """Transform to view PCollection as a dict PCollectionView. Important: this transform is an implementation detail and should not be used diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 1c75d3b6a017..788277c9249c 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -731,6 +731,7 @@ def __iter__(self): unwindowed_value = wv.value self.notify_observers(unwindowed_value) yield unwindowed_value + def __repr__(self): return '' % windowed_values unwindowed = UnwindowedValues() diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index fee9b1ba0051..be2584a73151 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -61,6 +61,7 @@ def run_trigger_simple(self, window_fn, trigger_fn, accumulation_mode, **kwargs): late_data = kwargs.pop('late_data', []) assert not kwargs + def bundle_data(data, size): bundle = [] for timestamp, elem in data: @@ -477,10 +478,10 @@ def parse_fn(s, names): else: return fn - # pylint: disable=g-import-not-at-top + # pylint: disable=wrong-import-order from apache_beam.transforms import window as window_module from apache_beam.transforms import trigger as trigger_module - # pylint: enable=g-import-not-at-top + # pylint: enable=wrong-import-order window_fn_names = dict(window_module.__dict__) window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn': CustomTimestampingFixedWindowsWindowFn}) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 52bba0517312..b8380a088310 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -181,6 +181,7 @@ class DataflowAssertException(Exception): # TODO(silviuc): Add contains_in_any_order-style matchers. def equal_to(expected): expected = list(expected) + def _equal(actual): sorted_expected = sorted(expected) sorted_actual = sorted(actual) diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1d7f27248969..186fcd46c0c3 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -52,6 +52,7 @@ def process(self, context): yield "%s @ %s" % (key, window), values reify_windows = core.ParDo(ReifyWindowsFn()) + class WindowTest(unittest.TestCase): def test_fixed_windows(self): diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index e6bab6106007..aa04fe25f187 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -45,6 +45,7 @@ def check_or_interleave(hint, value, var): _check_instance_type(hint, value, var) return value + def check_type_hints(f): @functools.wraps(f) def wrapper(*args, **kwargs): @@ -770,6 +771,7 @@ class TakesDecoratorTestCase(TypeHintTestCase): def test_must_be_primitive_type_or_constraint(self): with self.assertRaises(TypeError) as e: t = [1, 2] + @with_input_types(a=t) def foo(a): pass @@ -781,6 +783,7 @@ def foo(a): with self.assertRaises(TypeError) as e: t = 5 + @check_type_hints @with_input_types(a=t) def foo(a): diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index 4edcaa7b2c7f..8a97f4bedbfc 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -336,6 +336,7 @@ def test_with_extra_packages(self): 'gs://my-gcs-bucket/gcs.tar.gz'] gcs_copied_files = [] + def file_copy(from_path, to_path): if from_path.startswith('gs://'): gcs_copied_files.append(from_path) diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index 0a016b9a12e0..705c5558bf3c 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -19,10 +19,10 @@ import unittest -from apache_beam.utils import retry - from apitools.base.py.exceptions import HttpError +from apache_beam.utils import retry + class FakeClock(object): """A fake clock object implementing sleep() and recording calls.""" diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index e8062c0632ac..2f484834d3f8 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -43,6 +43,8 @@ if test "$CHANGED_FILES"; then echo "Running pylint on changed files:" echo "$CHANGED_FILES" pylint $CHANGED_FILES + echo "Running pep8 on changed files:" + pep8 $CHANGED_FILES else echo "Not running pylint. No eligible files." fi diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 356de5776ded..2e7193821ef0 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -18,8 +18,19 @@ [tox] envlist = py27 +[pep8] +# Disable all errors and warnings except for the ones related to blank lines. +# pylint does not check the number of blank lines. +select = E3 + +# Skip auto generated files (windmill_pb2.py, windmill_service_pb2.py) +# Skip files that use typehint semantics (violates E302) +exclude = windmill_pb2.py, windmill_service_pb2.py, combiners.py, core.py, sideinputs.py + [testenv:py27] -deps=pylint +deps= + pep8 + pylint commands = python setup.py test {toxinidir}/run_pylint.sh From 619c686d0c2dbca720a309ec54ff88b99dba1456 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Tue, 28 Jun 2016 11:42:09 -0700 Subject: [PATCH 2/3] Adding additional local pylint disable statements. --- sdks/python/apache_beam/coders/coder_impl.py | 4 ++-- sdks/python/apache_beam/coders/coders.py | 4 ++-- .../apache_beam/coders/fast_coders_test.py | 2 +- .../apache_beam/coders/slow_coders_test.py | 2 +- .../complete/juliaset/juliaset/juliaset.py | 4 ++-- .../examples/cookbook/bigquery_schema.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/bigquery.py | 4 ++-- sdks/python/apache_beam/io/fileio.py | 16 ++++++++-------- sdks/python/apache_beam/io/gcsio.py | 2 +- sdks/python/apache_beam/pvalue.py | 8 ++++---- .../apache_beam/runners/dataflow_runner.py | 4 ++-- sdks/python/apache_beam/runners/runner.py | 6 +++--- sdks/python/apache_beam/transforms/aggregator.py | 2 +- sdks/python/apache_beam/transforms/core.py | 8 ++++---- sdks/python/apache_beam/transforms/ptransform.py | 8 ++++---- .../apache_beam/transforms/trigger_test.py | 4 ++-- 17 files changed, 41 insertions(+), 41 deletions(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 19b78b45c5ac..a623f2bd05ae 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ from cPickle import loads, dumps -# pylint: disable=wrong-import-order +# pylint: disable=wrong-import-order, wrong-import-position try: # Don't depend on the full dataflow sdk to test coders. from apache_beam.transforms.window import WindowedValue @@ -43,7 +43,7 @@ except ImportError: from slow_stream import InputStream as create_InputStream from slow_stream import OutputStream as create_OutputStream -# pylint: enable=wrong-import-order +# pylint: enable=wrong-import-order, wrong-import-position class CoderImpl(object): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 7c3f757a56c6..619586fc142c 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -24,7 +24,7 @@ from apache_beam.coders import coder_impl -# pylint: disable=wrong-import-order +# pylint: disable=wrong-import-order, wrong-import-position # Avoid dependencies on the full SDK. try: # Import dill from the pickler module to make sure our monkey-patching of dill @@ -46,7 +46,7 @@ def serialize_coder(coder): def deserialize_coder(serialized): from apache_beam.internal import pickler return pickler.loads(serialized.split('$', 1)[1]) -# pylint: enable=wrong-import-order +# pylint: enable=wrong-import-order, wrong-import-position class Coder(object): diff --git a/sdks/python/apache_beam/coders/fast_coders_test.py b/sdks/python/apache_beam/coders/fast_coders_test.py index 907809c94e6c..55cf16c12483 100644 --- a/sdks/python/apache_beam/coders/fast_coders_test.py +++ b/sdks/python/apache_beam/coders/fast_coders_test.py @@ -28,7 +28,7 @@ class FastCoders(unittest.TestCase): def test_using_fast_impl(self): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position # pylint: disable=unused-variable import apache_beam.coders.stream diff --git a/sdks/python/apache_beam/coders/slow_coders_test.py b/sdks/python/apache_beam/coders/slow_coders_test.py index f85d47ba9517..62149a3d9782 100644 --- a/sdks/python/apache_beam/coders/slow_coders_test.py +++ b/sdks/python/apache_beam/coders/slow_coders_test.py @@ -30,7 +30,7 @@ class SlowCoders(unittest.TestCase): def test_using_slow_impl(self): # Assert that we are not using the compiled implementation. with self.assertRaises(ImportError): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position # pylint: disable=unused-variable import apache_beam.coders.stream diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index ad3ea7f178c5..2bc37e60dd58 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -59,7 +59,7 @@ def point_set(n): def generate_julia_set_visualization(data, n, max_iterations): """Generate the pixel matrix for rendering the julia set as an image.""" - import numpy as np # pylint: disable=wrong-import-order + import numpy as np # pylint: disable=wrong-import-order, wrong-import-position colors = [] for r in range(0, 256, 16): for g in range(0, 256, 16): @@ -75,7 +75,7 @@ def generate_julia_set_visualization(data, n, max_iterations): def save_julia_set_visualization(out_file, image_array): """Save the fractal image of our julia set as a png.""" - from matplotlib import pyplot as plt # pylint: disable=wrong-import-order + from matplotlib import pyplot as plt # pylint: disable=wrong-import-order, wrong-import-position plt.imsave(out_file, image_array, format='png') diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 0cb5d66639bf..7c420fb69cfa 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -44,7 +44,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - from apache_beam.internal.clients import bigquery # pylint: disable=wrong-import-order + from apache_beam.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position table_schema = bigquery.TableSchema() diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index e7b556c84358..f5bbc6633021 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -40,7 +40,7 @@ # pylint:disable=expression-not-assigned # pylint:disable=redefined-outer-name # pylint:disable=unused-variable -# pylint:disable=wrong-import-order +# pylint:disable=wrong-import-order, wrong-import-position class SnippetUtils(object): diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index e394528c0e7b..f2c56dc38af2 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -121,12 +121,12 @@ from apache_beam.utils.options import GoogleCloudOptions # Protect against environments where bigquery library is not available. -# pylint: disable=wrong-import-order +# pylint: disable=wrong-import-order, wrong-import-position try: from apache_beam.internal.clients import bigquery except ImportError: pass -# pylint: enable=wrong-import-order +# pylint: enable=wrong-import-order, wrong-import-position __all__ = [ diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 2a22aa3c23ff..115bc0ec9d0d 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -172,7 +172,7 @@ def mkdir(path): @staticmethod def open(path, mode, mime_type): if path.startswith('gs://'): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().open(path, mode, mime_type=mime_type) else: @@ -182,7 +182,7 @@ def open(path, mode, mime_type): def rename(src, dst): if src.startswith('gs://'): assert dst.startswith('gs://'), dst - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().rename(src, dst) else: @@ -197,7 +197,7 @@ def copytree(src, dst): assert dst.startswith('gs://'), dst assert src.endswith('/'), src assert dst.endswith('/'), dst - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().copytree(src, dst) else: @@ -211,7 +211,7 @@ def copytree(src, dst): @staticmethod def exists(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().exists() else: @@ -220,7 +220,7 @@ def exists(path): @staticmethod def rmdir(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcs = gcsio.GcsIO() if not path.endswith('/'): @@ -237,7 +237,7 @@ def rmdir(path): @staticmethod def rm(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio gcsio.GcsIO().delete(path) else: @@ -249,7 +249,7 @@ def rm(path): @staticmethod def glob(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio return gcsio.GcsIO().glob(path) else: @@ -608,7 +608,7 @@ def __init__(self, source): def __enter__(self): if self.source.is_gcs_source: - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.io import gcsio self._file = gcsio.GcsIO().open(self.source.file_path, 'rb') else: diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index de353e92bfe1..a01988bae148 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -40,7 +40,7 @@ # Issue a friendlier error message if the storage library is not available. # TODO(silviuc): Remove this guard when storage is available everywhere. try: - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal.clients import storage except ImportError: raise RuntimeError( diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index ba395bc45594..78ff209a80ad 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -348,7 +348,7 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None): # pyli # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsSingleton(has_default, default_value, label=label)) _cache_view(pcoll.pipeline, cache_key, view) @@ -380,7 +380,7 @@ def AsIter(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsIterable(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -411,7 +411,7 @@ def AsList(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsList(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view @@ -443,7 +443,7 @@ def AsDict(pcoll, label=None): # pylint: disable=invalid-name # Local import is required due to dependency loop; even though the # implementation of this function requires concepts defined in modules that # depend on pvalue, it lives in this module to reduce user workload. - from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order + from apache_beam.transforms import sideinputs # pylint: disable=wrong-import-order, wrong-import-position view = (pcoll | sideinputs.ViewAsDict(label=label)) _cache_view(pcoll.pipeline, cache_key, view) return view diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index fbc3569bed63..43a50ba3e72a 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -152,7 +152,7 @@ def rank_error(msg): def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal import apiclient self.job = apiclient.Job(pipeline.options) # The superclass's run will trigger a traversal of all reachable nodes. @@ -245,7 +245,7 @@ def _get_encoded_output_coder(self, transform_node, window_value=True): def _add_step(self, step_kind, step_label, transform_node, side_tags=()): """Creates a Step object and adds it to the cache.""" # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal import apiclient step = apiclient.Step(step_kind, self._get_unique_step_name()) self.job.proto.steps.append(step.proto) diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 1288d8eb6cba..55b63f3418e5 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -40,7 +40,7 @@ def create_runner(runner_name): Raises: RuntimeError: if an invalid runner name is used. """ - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position if runner_name == 'DirectPipelineRunner': import apache_beam.runners.direct_runner return apache_beam.runners.direct_runner.DirectPipelineRunner() @@ -81,7 +81,7 @@ def run(self, pipeline): """Execute the entire pipeline or the sub-DAG reachable from a node.""" # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor class RunVisitor(PipelineVisitor): @@ -117,7 +117,7 @@ def clear(self, pipeline, node=None): """ # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor class ClearVisitor(PipelineVisitor): diff --git a/sdks/python/apache_beam/transforms/aggregator.py b/sdks/python/apache_beam/transforms/aggregator.py index 69200e87b662..a5e83cb8a204 100644 --- a/sdks/python/apache_beam/transforms/aggregator.py +++ b/sdks/python/apache_beam/transforms/aggregator.py @@ -103,6 +103,6 @@ def get_name(thing): def _is_supported_kind(combine_fn): - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal.apiclient import metric_translations return combine_fn.__class__ in metric_translations diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a7edd0e80a02..e663c5f0a5da 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -999,10 +999,10 @@ def infer_output_type(self, input_type): def process(self, context): k, vs = context.element - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import create_trigger_driver - # pylint: enable=wrong-import-order + # pylint: enable=wrong-import-order, wrong-import-position driver = create_trigger_driver(self.windowing, True) state = InMemoryUnmergedState() # TODO(robertwb): Conditionally process in smaller chunks. @@ -1115,9 +1115,9 @@ class Windowing(object): def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, output_time_fn=None): global AccumulationMode, DefaultTrigger - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger - # pylint: enable=wrong-import-order + # pylint: enable=wrong-import-order, wrong-import-position if triggerfn is None: triggerfn = DefaultTrigger() if accumulation_mode is None: diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 3ad639607ddc..106c44f88383 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -383,10 +383,10 @@ def __ror__(self, left): pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)] if pvalues and not pipelines: deferred = False - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam import pipeline from apache_beam.utils.options import PipelineOptions - # pylint: enable=wrong-import-order + # pylint: enable=wrong-import-order, wrong-import-position p = pipeline.Pipeline( 'DirectPipelineRunner', PipelineOptions(sys.argv)) else: @@ -403,9 +403,9 @@ def __ror__(self, left): raise ValueError( 'Mixing value from different pipelines not allowed.') deferred = not getattr(p.runner, 'is_eager', False) - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.core import Create - # pylint: enable=wrong-import-order + # pylint: enable=wrong-import-order, wrong-import-position replacements = {id(v): p | Create('CreatePInput%s' % ix, v) for ix, v in enumerate(pvalues) if not isinstance(v, pvalue.PValue) and v is not None} diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index be2584a73151..2cdd1e3c5dae 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -478,10 +478,10 @@ def parse_fn(s, names): else: return fn - # pylint: disable=wrong-import-order + # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms import window as window_module from apache_beam.transforms import trigger as trigger_module - # pylint: enable=wrong-import-order + # pylint: enable=wrong-import-order, wrong-import-position window_fn_names = dict(window_module.__dict__) window_fn_names.update({'CustomTimestampingFixedWindowsWindowFn': CustomTimestampingFixedWindowsWindowFn}) From 08d4c752d780022a4976c1654c3ec06312914d0f Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Tue, 28 Jun 2016 12:52:17 -0700 Subject: [PATCH 3/3] Moved type variables to the top of the files and enabled pep8 verification for the 3 previously excluded files. --- sdks/python/apache_beam/transforms/combiners.py | 10 +++++----- sdks/python/apache_beam/transforms/core.py | 11 ++++++----- sdks/python/apache_beam/transforms/sideinputs.py | 7 +++++-- sdks/python/run_pylint.sh | 5 +++-- sdks/python/tox.ini | 3 +-- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 64ede3b025cf..e9f11a0be7a2 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -46,6 +46,11 @@ 'ToList', ] +# Type variables +T = TypeVariable('T') +K = TypeVariable('K') +V = TypeVariable('V') + class Mean(object): """Combiners for computing arithmetic means of elements.""" @@ -214,7 +219,6 @@ def SmallestPerKey(label, pcoll, n): return pcoll | Top.PerKey(label, n, lambda a, b: b < a) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class TopCombineFn(core.CombineFn): @@ -329,7 +333,6 @@ def FixedSizePerKey(label, pcoll, n): return pcoll | core.CombinePerKey(label, SampleCombineFn(n)) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class SampleCombineFn(core.CombineFn): @@ -404,7 +407,6 @@ def apply(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToListCombineFn()) -T = TypeVariable('T') @with_input_types(T) @with_output_types(List[T]) class ToListCombineFn(core.CombineFn): @@ -439,8 +441,6 @@ def apply(self, pcoll): return pcoll | core.CombineGlobally(self.label, ToDictCombineFn()) -K = TypeVariable('K') -V = TypeVariable('V') @with_input_types(Tuple[K, V]) @with_output_types(Dict[K, V]) class ToDictCombineFn(core.CombineFn): diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index e663c5f0a5da..8f4c246c5143 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -49,6 +49,11 @@ from apache_beam.typehints.trivial_inference import element_type from apache_beam.utils.options import TypeOptions +# Type variables +T = typehints.TypeVariable('T') +K = typehints.TypeVariable('K') +V = typehints.TypeVariable('V') + class DoFnProcessContext(object): """A processing context passed to DoFn methods during execution. @@ -842,6 +847,7 @@ def add_input_types(transform): "an empty PCollection if the input PCollection is empty, " "or CombineGlobally().as_singleton_view() to get the default " "output of the CombineFn if the input PCollection is empty.") + def typed(transform): # TODO(robertwb): We should infer this. if combined.element_type: @@ -954,8 +960,6 @@ def default_type_hints(self): return hints -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) class GroupByKey(PTransform): @@ -1051,8 +1055,6 @@ def apply(self, pcoll): self.GroupAlsoByWindow(pcoll.windowing))) -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) class GroupByKeyOnly(PTransform): @@ -1145,7 +1147,6 @@ def is_default(self): return self._is_default -T = typehints.TypeVariable('T') @typehints.with_input_types(T) @typehints.with_output_types(T) class WindowInto(ParDo): diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index c58bd0e8edc5..6484a7cf3866 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -28,6 +28,10 @@ from apache_beam import typehints from apache_beam.transforms.ptransform import PTransform +# Type variables +K = typehints.TypeVariable('K') +V = typehints.TypeVariable('V') + class CreatePCollectionView(PTransform): """Transform to materialize a given PCollectionView in the pipeline. @@ -119,8 +123,7 @@ def apply(self, pcoll): .with_input_types(input_type) .with_output_types(output_type)) -K = typehints.TypeVariable('K') -V = typehints.TypeVariable('V') + @typehints.with_input_types(typehints.Tuple[K, V]) @typehints.with_output_types(typehints.Dict[K, V]) class ViewAsDict(PTransform): diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh index 2f484834d3f8..4e0b1296c0dc 100755 --- a/sdks/python/run_pylint.sh +++ b/sdks/python/run_pylint.sh @@ -16,10 +16,11 @@ # limitations under the License. # -# This script will run pylint on files that changed compared to the current -# HEAD of the branch. +# This script will run pylint and pep8 on files that changed compared to the +# current HEAD of the branch. # # Use "pylint apache_beam" to run pylint all files. +# Use "pep8 apache_beam" to run pep8 all files. # # The exit-code of the script indicates success or a failure. diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 2e7193821ef0..29674ed773da 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -24,8 +24,7 @@ envlist = py27 select = E3 # Skip auto generated files (windmill_pb2.py, windmill_service_pb2.py) -# Skip files that use typehint semantics (violates E302) -exclude = windmill_pb2.py, windmill_service_pb2.py, combiners.py, core.py, sideinputs.py +exclude = windmill_pb2.py, windmill_service_pb2.py [testenv:py27] deps=