diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 72791fc262bc..9d4e79a9a9af 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -520,11 +520,11 @@ def test_dir(self): options = Breakfast() self.assertEquals( set(['from_dictionary', 'get_all_options', 'slices', 'style', - 'view_as', 'display_data']), + 'view_as', 'display_data', 'next']), set([attr for attr in dir(options) if not attr.startswith('_')])) self.assertEquals( set(['from_dictionary', 'get_all_options', 'style', 'view_as', - 'display_data']), + 'display_data', 'next']), set([attr for attr in dir(options.view_as(Eggs)) if not attr.startswith('_')])) diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 3c04b370cfe6..a207009f8718 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -18,6 +18,8 @@ """PTransform and descendants.""" # pylint: disable=wildcard-import +from __future__ import absolute_import + from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 9b0c0e81e35e..8db0fe5e14fa 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -18,9 +18,12 @@ """A library of basic combiner PTransform subclasses.""" from __future__ import absolute_import +from __future__ import division import operator import random +from builtins import object +from builtins import zip from past.builtins import long diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index f372e881024e..a768231ec6e1 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -16,12 +16,15 @@ # """Unit tests for our libraries of combine PTransforms.""" +from __future__ import absolute_import +from __future__ import division import itertools import random import unittest import hamcrest as hc +from future.builtins import range import apache_beam as beam import apache_beam.transforms.combiners as combine @@ -286,7 +289,7 @@ def match(actual): def matcher(): def match(actual): equal_to([1])([len(actual)]) - equal_to(pairs)(actual[0].iteritems()) + equal_to(pairs)(actual[0].items()) return match assert_that(result, matcher()) pipeline.run() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bbd78342a7f1..fa867e5231d1 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -21,12 +21,15 @@ import copy import inspect -import itertools import random import re import types +from builtins import map +from builtins import object +from builtins import range -from six import string_types +from future.builtins import filter +from past.builtins import unicode from apache_beam import coders from apache_beam import pvalue @@ -82,7 +85,6 @@ 'Impulse', ] - # Type variables T = typehints.TypeVariable('T') K = typehints.TypeVariable('K') @@ -291,6 +293,9 @@ def __eq__(self, other): return self.param_id == other.param_id return False + def __hash__(self): + return hash(self.param_id) + def __repr__(self): return self.param_id @@ -698,7 +703,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs): class ReiterableNonEmptyAccumulators(object): def __iter__(self): - return itertools.ifilter(filter_fn, accumulators) + return filter(filter_fn, accumulators) # It's (weakly) assumed that self._fn is associative. return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs) @@ -902,7 +907,8 @@ def with_outputs(self, *tags, **main_kw): """ main_tag = main_kw.pop('main', None) if main_kw: - raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys()) + raise ValueError('Unexpected keyword arguments: %s' % + list(main_kw)) return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): @@ -1666,7 +1672,6 @@ def expand(self, pcoll): class Windowing(object): - def __init__(self, windowfn, triggerfn=None, accumulation_mode=None, timestamp_combiner=None): global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned @@ -1712,6 +1717,10 @@ def __eq__(self, other): and self.timestamp_combiner == other.timestamp_combiner) return False + def __hash__(self): + return hash((self.windowfn, self.accumulation_mode, + self.timestamp_combiner)) + def is_default(self): return self._is_default @@ -1792,7 +1801,7 @@ def __init__(self, windowfn, **kwargs): accumulation_mode = kwargs.pop('accumulation_mode', None) timestamp_combiner = kwargs.pop('timestamp_combiner', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) self.windowing = Windowing( windowfn, triggerfn, accumulation_mode, timestamp_combiner) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) @@ -1861,7 +1870,7 @@ def __init__(self, **kwargs): super(Flatten, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs)) def _extract_input_pvalues(self, pvalueish): try: @@ -1906,7 +1915,7 @@ def __init__(self, value): value: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(value, string_types): + if isinstance(value, (unicode, str, bytes)): raise TypeError('PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % value) elif isinstance(value, dict): @@ -1941,7 +1950,7 @@ def get_windowing(self, unused_inputs): @staticmethod def _create_source_from_iterable(values, coder): - return Create._create_source(map(coder.encode, values), coder) + return Create._create_source(list(map(coder.encode, values)), coder) @staticmethod def _create_source(serialized_values, coder): diff --git a/sdks/python/apache_beam/transforms/create_source.py b/sdks/python/apache_beam/transforms/create_source.py index 3d02d39463c4..aa26cebc43fd 100644 --- a/sdks/python/apache_beam/transforms/create_source.py +++ b/sdks/python/apache_beam/transforms/create_source.py @@ -15,6 +15,13 @@ # limitations under the License. # +from __future__ import absolute_import +from __future__ import division + +from builtins import map +from builtins import next +from builtins import range + from apache_beam.io import iobase from apache_beam.transforms.core import Create @@ -57,15 +64,15 @@ def split(self, desired_bundle_size, start_position=None, start_position = 0 if stop_position is None: stop_position = len(self._serialized_values) - avg_size_per_value = self._total_size / len(self._serialized_values) + avg_size_per_value = self._total_size // len(self._serialized_values) num_values_per_split = max( - int(desired_bundle_size / avg_size_per_value), 1) + int(desired_bundle_size // avg_size_per_value), 1) start = start_position while start < stop_position: end = min(start + num_values_per_split, stop_position) remaining = stop_position - end # Avoid having a too small bundle at the end. - if remaining < (num_values_per_split / 4): + if remaining < (num_values_per_split // 4): end = stop_position sub_source = Create._create_source( self._serialized_values[start:end], self._coder) diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index b5d02acc8b11..ada36725179a 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -16,8 +16,12 @@ # """Unit tests for the Create and _CreateSource classes.""" +from __future__ import absolute_import +from __future__ import division + import logging import unittest +from builtins import range from apache_beam import Create from apache_beam.coders import FastPrimitivesCoder @@ -33,13 +37,13 @@ def setUp(self): def test_create_transform(self): with TestPipeline() as p: - assert_that(p | Create(range(10)), equal_to(range(10))) + assert_that(p | Create(list(range(10))), equal_to(list(range(10)))) def test_create_source_read(self): self.check_read([], self.coder) self.check_read([1], self.coder) # multiple values. - self.check_read(range(10), self.coder) + self.check_read(list(range(10)), self.coder) def check_read(self, values, coder): source = Create._create_source_from_iterable(values, coder) @@ -49,7 +53,7 @@ def check_read(self, values, coder): def test_create_source_read_with_initial_splits(self): self.check_read_with_initial_splits([], self.coder, num_splits=2) self.check_read_with_initial_splits([1], self.coder, num_splits=2) - values = range(8) + values = list(range(8)) # multiple values with a single split. self.check_read_with_initial_splits(values, self.coder, num_splits=1) # multiple values with a single split with a large desired bundle size @@ -70,7 +74,7 @@ def check_read_with_initial_splits(self, values, coder, num_splits): from the split sources. """ source = Create._create_source_from_iterable(values, coder) - desired_bundle_size = source._total_size / num_splits + desired_bundle_size = source._total_size // num_splits splits = source.split(desired_bundle_size) splits_info = [ (split.source, split.start_position, split.stop_position) diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index 53a440e537e0..2234ef98d871 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -15,12 +15,17 @@ # limitations under the License. # +# cython: language_level=3 + """A library of basic cythonized CombineFn subclasses. For internal use only; no backwards-compatibility guarantees. """ from __future__ import absolute_import +from __future__ import division + +from builtins import object from apache_beam.transforms import core @@ -162,7 +167,7 @@ def extract_output(self): self.sum %= 2**64 if self.sum >= INT64_MAX: self.sum -= 2**64 - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class CountCombineFn(AccumulatorCombineFn): @@ -258,7 +263,7 @@ def merge(self, accumulators): self.count += accumulator.count def extract_output(self): - return self.sum / self.count if self.count else _NAN + return self.sum // self.count if self.count else _NAN class SumFloatFn(AccumulatorCombineFn): diff --git a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py index e3d3c6e5a5a6..91a888a08369 100644 --- a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py +++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py @@ -14,6 +14,8 @@ otherwise, test on pure python module """ +from __future__ import absolute_import + import unittest from mock import Mock diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index 4206f2110b7d..ce10174e00da 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -41,10 +41,11 @@ import calendar import inspect import json +from builtins import object from datetime import datetime from datetime import timedelta -import six +from past.builtins import unicode __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] @@ -169,7 +170,7 @@ class DisplayDataItem(object): display item belongs to. """ typeDict = {str:'STRING', - six.text_type:'STRING', + unicode:'STRING', int:'INTEGER', float:'FLOAT', bool: 'BOOLEAN', diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 90bde8caa8c4..bdaade68fa0f 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -24,8 +24,8 @@ # pylint: disable=ungrouped-imports import hamcrest as hc -import six from hamcrest.core.base_matcher import BaseMatcher +from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -165,7 +165,7 @@ def test_create_list_display_data(self): def test_unicode_type_display_data(self): class MyDoFn(beam.DoFn): def display_data(self): - return {'unicode_string': six.text_type('my string'), + return {'unicode_string': unicode('my string'), 'unicode_literal_string': u'my literal string'} fn = MyDoFn() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 889372f9266a..7a53fbe25b0f 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -43,6 +43,9 @@ class and wrapper class that allows lambda functions to be used as import os import sys import threading +from builtins import hex +from builtins import object +from builtins import zip from functools import reduce from google.protobuf import message @@ -622,7 +625,7 @@ def __init__(self, fn, *args, **kwargs): super(PTransformWithSideInputs, self).__init__() if (any([isinstance(v, pvalue.PCollection) for v in args]) or - any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])): + any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 907ee04c079a..c594e6ab28b5 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -18,12 +18,16 @@ """Unit tests for the PTransform and descendants.""" from __future__ import absolute_import +from __future__ import division from __future__ import print_function import collections import operator import re import unittest +from builtins import map +from builtins import range +from builtins import zip from functools import reduce import hamcrest as hc @@ -382,7 +386,7 @@ def test_combine_with_combine_fn(self): pipeline = TestPipeline() pcoll = pipeline | 'Start' >> beam.Create(vals) result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn()) - assert_that(result, equal_to([sum(vals) / len(vals)])) + assert_that(result, equal_to([sum(vals) // len(vals)])) pipeline.run() def test_combine_with_callable(self): @@ -413,8 +417,8 @@ def test_combine_per_key_with_combine_fn(self): pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn()) - assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), - ('b', sum(vals_2) / len(vals_2))])) + assert_that(result, equal_to([('a', sum(vals_1) // len(vals_1)), + ('b', sum(vals_2) // len(vals_2))])) pipeline.run() def test_combine_per_key_with_callable(self): diff --git a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py index fc9b4d22a8cd..980abab47c46 100644 --- a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py +++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py @@ -17,6 +17,10 @@ """For internal use only; no backwards-compatibility guarantees.""" +from __future__ import absolute_import + +from builtins import object +from builtins import range globals()['INT64_MAX'] = 2**63 - 1 globals()['INT64_MIN'] = -2**63 diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index f10cb92ed5e3..21fc919b72d1 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,6 +26,8 @@ from __future__ import absolute_import +from builtins import object + from apache_beam.transforms import window diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 6b93b8e9137c..f9c9ae93d625 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -17,6 +17,8 @@ """Unit tests for side inputs.""" +from __future__ import absolute_import + import logging import unittest @@ -196,7 +198,7 @@ def match(actual): [[actual_elem, actual_list, actual_dict]] = actual equal_to([expected_elem])([actual_elem]) equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) + equal_to(expected_pairs)(actual_dict.items()) return match assert_that(results, matcher(1, a_list, some_pairs)) @@ -286,8 +288,8 @@ def matcher(expected_elem, expected_kvs): def match(actual): [[actual_elem, actual_dict1, actual_dict2]] = actual equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) + equal_to(expected_kvs)(actual_dict1.items()) + equal_to(expected_kvs)(actual_dict2.items()) return match assert_that(results, matcher(1, some_kvs)) diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 8d63d49baad0..bf30a1313926 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -21,6 +21,9 @@ from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.utils import with_metaclass __all__ = [ 'TimeDomain', @@ -43,11 +46,9 @@ def from_string(domain): raise ValueError('Unknown time domain: %s' % domain) -class TimestampCombinerImpl(object): +class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): """Implementation of TimestampCombiner.""" - __metaclass__ = ABCMeta - @abstractmethod def assign_output_time(self, window, input_timestamp): pass @@ -72,11 +73,9 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(TimestampCombinerImpl): +class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): """TimestampCombinerImpl that only depends on the window.""" - __metaclass__ = ABCMeta - def combine(self, output_timestamp, other_output_timestamp): return output_timestamp diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 159b21b22257..c185a5222184 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -20,13 +20,19 @@ Triggers control when in processing time windows get emitted. """ +from __future__ import absolute_import + import collections import copy -import itertools import logging import numbers from abc import ABCMeta from abc import abstractmethod +from builtins import object + +from future.moves.itertools import zip_longest +from future.utils import iteritems +from future.utils import with_metaclass from apache_beam.coders import observable from apache_beam.portability.api import beam_runner_api_pb2 @@ -68,14 +74,13 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(object): +class _StateTag(with_metaclass(ABCMeta, object)): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this stage. If CombineFn is None then all elements will be returned as a list, otherwise the given CombineFn will be applied (possibly incrementally and eagerly) when adding elements. """ - __metaclass__ = ABCMeta def __init__(self, tag): self.tag = tag @@ -136,12 +141,11 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(object): +class TriggerFn(with_metaclass(ABCMeta, object)): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers """ - __metaclass__ = ABCMeta @abstractmethod def on_element(self, element, window, context): @@ -260,6 +264,9 @@ def reset(self, window, context): def __eq__(self, other): return type(self) == type(other) + def __hash__(self): + return hash(type(self)) + @staticmethod def from_runner_api(proto, context): return DefaultTrigger() @@ -446,6 +453,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.count == other.count + def __hash__(self): + return hash(self.count) + def on_element(self, element, window, context): context.add_state(self.COUNT_TAG, 1) @@ -484,6 +494,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.underlying == other.underlying + def __hash__(self): + return hash(self.underlying) + def on_element(self, element, window, context): self.underlying.on_element(element, window, context) @@ -512,9 +525,7 @@ def to_runner_api(self, context): subtrigger=self.underlying.to_runner_api(context))) -class _ParallelTriggerFn(TriggerFn): - - __metaclass__ = ABCMeta +class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): def __init__(self, *triggers): self.triggers = triggers @@ -526,6 +537,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + @abstractmethod def combine_op(self, trigger_results): pass @@ -620,6 +634,9 @@ def __repr__(self): def __eq__(self, other): return type(self) == type(other) and self.triggers == other.triggers + def __hash__(self): + return hash(self.triggers) + def on_element(self, element, window, context): ix = context.get_state(self.INDEX_TAG) if ix < len(self.triggers): @@ -744,14 +761,12 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(object): +class SimpleState(with_metaclass(ABCMeta, object)): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). """ - __metaclass__ = ABCMeta - @abstractmethod def set_timer(self, window, name, time_domain, timestamp): pass @@ -863,7 +878,7 @@ def merge(self, to_be_merged, merge_result): self._persist_window_ids() def known_windows(self): - return self.window_ids.keys() + return list(self.window_ids) def get_window(self, window_id): for window, ids in self.window_ids.items(): @@ -922,11 +937,9 @@ def create_trigger_driver(windowing, return driver -class TriggerDriver(object): +class TriggerDriver(with_metaclass(ABCMeta, object)): """Breaks a series of bundle and timer firings into window (pane)s.""" - __metaclass__ = ABCMeta - @abstractmethod def process_elements(self, state, windowed_values, output_watermark): pass @@ -972,10 +985,13 @@ def __eq__(self, other): if isinstance(other, collections.Iterable): return all( a == b - for a, b in itertools.izip_longest(self, other, fillvalue=object())) + for a, b in zip_longest(self, other, fillvalue=object())) else: return NotImplemented + def __hash__(self): + return hash(tuple(self)) + def __ne__(self, other): return not self == other @@ -1250,7 +1266,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP - for unused_window, tagged_states in self.state.iteritems(): + for unused_window, tagged_states in iteritems(self.state): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 2e672bb0cf1b..034abae65c8a 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -17,10 +17,14 @@ """Unit tests for the triggering classes.""" +from __future__ import absolute_import + import collections import os.path import pickle import unittest +from builtins import range +from builtins import zip import yaml @@ -382,7 +386,7 @@ def test_picklable_output(self): pickle.dumps(unpicklable) for unwindowed in driver.process_elements(None, unpicklable, None): self.assertEqual(pickle.loads(pickle.dumps(unwindowed)).value, - range(10)) + list(range(10))) class RunnerApiTest(unittest.TestCase): @@ -426,7 +430,7 @@ def format_result(k_v): # A-10, A-11 never emitted due to AfterCount(3) never firing. 'B-4': {6, 7, 8, 9}, 'B-3': {10, 15, 16}, - }.iteritems())) + }.items())) class TranscriptTest(unittest.TestCase): @@ -556,7 +560,7 @@ def fire_timers(): for line in spec['transcript']: - action, params = line.items()[0] + action, params = list(line.items())[0] if action != 'expect': # Fail if we have output that was not expected in the transcript. diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 6a5fd581bb70..0f99da246a5c 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -23,6 +23,7 @@ from __future__ import absolute_import import types +from builtins import object from apache_beam.coders import Coder from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 8dbc9ce5e77a..b891e6281782 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -16,6 +16,7 @@ # """Unit tests for the Beam State and Timer API interfaces.""" +from __future__ import absolute_import import unittest diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 07cab5453510..dbd0f709d6cb 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -19,11 +19,18 @@ """ from __future__ import absolute_import +from __future__ import division import collections import contextlib import random import time +from builtins import object +from builtins import range +from builtins import zip + +from future.utils import itervalues +from past.utils import old_div from apache_beam import typehints from apache_beam.metrics import Metrics @@ -114,12 +121,12 @@ def __init__(self, **kwargs): super(CoGroupByKey, self).__init__() self.pipeline = kwargs.pop('pipeline', None) if kwargs: - raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys()) + raise ValueError('Unexpected keyword arguments: %s' % list(kwargs.keys())) def _extract_input_pvalues(self, pvalueish): try: # If this works, it's a dict. - return pvalueish, tuple(pvalueish.viewvalues()) + return pvalueish, tuple(itervalues(pvalueish)) except AttributeError: pcolls = tuple(pvalueish) return pcolls, pcolls @@ -268,12 +275,12 @@ def _thin_data(self): def div_keys(kv1_kv2): (x1, _), (x2, _) = kv1_kv2 - return x2 / x1 + return old_div(x2, x1) # TODO(BEAM-4858) pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]), key=div_keys) # Keep the top 1/3 most different pairs, average the top 2/3 most similar. - threshold = 2 * len(pairs) / 3 + threshold = 2 * len(pairs) // 3 self._data = ( list(sum(pairs[threshold:], ())) + [((x1 + x2) / 2.0, (t1 + t2) / 2.0) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d834a1c5efea..6cec4a5bf361 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -17,9 +17,13 @@ """Unit tests for the transform.util classes.""" +from __future__ import absolute_import + import logging import time import unittest +from builtins import object +from builtins import range import apache_beam as beam from apache_beam.coders import coders diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 5bc047b48c7a..067227bb3f87 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,10 +50,13 @@ from __future__ import absolute_import import abc +from builtins import object +from builtins import range +from functools import total_ordering +from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 -from past.builtins import cmp from apache_beam.coders import coders from apache_beam.portability import common_urns @@ -109,11 +112,9 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(urns.RunnerApiFn): +class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): """An abstract windowing function defining a basic assign and merge.""" - __metaclass__ = abc.ABCMeta - class AssignContext(object): """Context passed to WindowFn.assign().""" @@ -191,15 +192,35 @@ def __init__(self, end): def max_timestamp(self): return self.end.predecessor() - def __cmp__(self, other): - # Order first by endpoint, then arbitrarily. - return cmp(self.end, other.end) or cmp(hash(self), hash(other)) - def __eq__(self, other): raise NotImplementedError + def __ne__(self, other): + # Order first by endpoint, then arbitrarily + return self.end != other.end or hash(self) != hash(other) + + def __lt__(self, other): + if self.end != other.end: + return self.end < other.end + return hash(self) < hash(other) + + def __le__(self, other): + if self.end != other.end: + return self.end <= other.end + return hash(self) <= hash(other) + + def __gt__(self, other): + if self.end != other.end: + return self.end > other.end + return hash(self) > hash(other) + + def __ge__(self, other): + if self.end != other.end: + return self.end >= other.end + return hash(self) >= hash(other) + def __hash__(self): - return hash(self.end) + raise NotImplementedError def __repr__(self): return '[?, %s)' % float(self.end) @@ -221,7 +242,12 @@ def __hash__(self): return hash((self.start, self.end)) def __eq__(self, other): - return self.start == other.start and self.end == other.end + return (self.start == other.start + and self.end == other.end + and type(self) == type(other)) + + def __ne__(self, other): + return not self == other def __repr__(self): return '[%s, %s)' % (float(self.start), float(self.end)) @@ -234,6 +260,7 @@ def union(self, other): min(self.start, other.start), max(self.end, other.end)) +@total_ordering class TimestampedValue(object): """A timestamped value having a value and a timestamp. @@ -246,10 +273,23 @@ def __init__(self, value, timestamp): self.value = value self.timestamp = Timestamp.of(timestamp) - def __cmp__(self, other): - if type(self) is not type(other): - return cmp(type(self), type(other)) - return cmp((self.value, self.timestamp), (other.value, other.timestamp)) + def __eq__(self, other): + return (type(self) == type(other) + and self.value == other.value + and self.timestamp == other.timestamp) + + def __hash__(self): + return hash((self.value, self.timestamp)) + + def __ne__(self, other): + return not self == other + + def __lt__(self, other): + if type(self) != type(other): + return type(self).__name__ < type(other).__name__ + if self.value != other.value: + return self.value < other.value + return self.timestamp < other.timestamp class GlobalWindow(BoundedWindow): @@ -275,6 +315,9 @@ def __eq__(self, other): # Global windows are always and only equal to each other. return self is other or type(self) is type(other) + def __ne__(self, other): + return not self == other + class NonMergingWindowFn(WindowFn): @@ -348,6 +391,9 @@ def __eq__(self, other): if type(self) == type(other) == FixedWindows: return self.size == other.size and self.offset == other.offset + def __hash__(self): + return hash((self.size, self.offset)) + def __ne__(self, other): return not self == other @@ -407,6 +453,12 @@ def __eq__(self, other): and self.offset == other.offset and self.period == other.period) + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash((self.offset, self.period)) + def to_runner_api_parameter(self, context): return (common_urns.sliding_windows.urn, standard_window_fns_pb2.SlidingWindowsPayload( @@ -474,6 +526,12 @@ def __eq__(self, other): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(self.gap_size) + def to_runner_api_parameter(self, context): return (common_urns.session_windows.urn, standard_window_fns_pb2.SessionsPayload( diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 7c1d4e99f5e3..77ab47e3dd8b 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -16,8 +16,11 @@ # """Unit tests for the windowing classes.""" +from __future__ import absolute_import +from __future__ import division import unittest +from builtins import range from apache_beam.runners import pipeline_context from apache_beam.testing.test_pipeline import TestPipeline @@ -236,7 +239,7 @@ def test_timestamped_with_combiners(self): # We add a 'key' to each value representing the index of the # window. This is important since there is no guarantee of # order for the elements of a PCollection. - | Map(lambda v: (v / 5, v))) + | Map(lambda v: (v // 5, v))) # Sum all elements associated with a key and window. Although it # is called CombinePerKey it is really CombinePerKeyAndWindow the # same way GroupByKey is really GroupByKeyAndWindow. diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index bf4941a5d5de..a8f56fd103b4 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -16,6 +16,8 @@ # """Unit tests for the write transform.""" +from __future__ import absolute_import + import logging import unittest diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index de2352aafe99..da4b00cafc17 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -116,6 +116,7 @@ modules = apache_beam/testing apache_beam/tools apache_beam/typehints + apache_beam/transforms commands = python --version pip --version