Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,11 @@ def test_dir(self):
options = Breakfast()
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'slices', 'style',
'view_as', 'display_data']),
'view_as', 'display_data', 'next']),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, but was there a particular reason it was added?

Copy link
Contributor Author

@Fematich Fematich Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of the import from builtins import object in apache_beam/transforms/display.py.

This import adds an alias:
next = __next__ for Python2 and Python3 compatibility.

PipelineOptions (the tested class in this test) inherits from HasDisplayData class defined in the display.py module.

set([attr for attr in dir(options) if not attr.startswith('_')]))
self.assertEquals(
set(['from_dictionary', 'get_all_options', 'style', 'view_as',
'display_data']),
'display_data', 'next']),
set([attr for attr in dir(options.view_as(Eggs))
if not attr.startswith('_')]))

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""PTransform and descendants."""

# pylint: disable=wildcard-import
from __future__ import absolute_import

from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
from apache_beam.transforms.ptransform import *
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
"""A library of basic combiner PTransform subclasses."""

from __future__ import absolute_import
from __future__ import division

import operator
import random
from builtins import object
from builtins import zip

from past.builtins import long

Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
#

"""Unit tests for our libraries of combine PTransforms."""
from __future__ import absolute_import
from __future__ import division

import itertools
import random
import unittest

import hamcrest as hc
from future.builtins import range

import apache_beam as beam
import apache_beam.transforms.combiners as combine
Expand Down Expand Up @@ -286,7 +289,7 @@ def match(actual):
def matcher():
def match(actual):
equal_to([1])([len(actual)])
equal_to(pairs)(actual[0].iteritems())
equal_to(pairs)(actual[0].items())
return match
assert_that(result, matcher())
pipeline.run()
Expand Down
29 changes: 19 additions & 10 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import copy
import inspect
import itertools
import random
import re
import types
from builtins import map
from builtins import object
from builtins import range

from six import string_types
from future.builtins import filter
from past.builtins import unicode

from apache_beam import coders
from apache_beam import pvalue
Expand Down Expand Up @@ -82,7 +85,6 @@
'Impulse',
]


# Type variables
T = typehints.TypeVariable('T')
K = typehints.TypeVariable('K')
Expand Down Expand Up @@ -291,6 +293,9 @@ def __eq__(self, other):
return self.param_id == other.param_id
return False

def __hash__(self):
return hash(self.param_id)

def __repr__(self):
return self.param_id

Expand Down Expand Up @@ -698,7 +703,7 @@ def merge_accumulators(self, accumulators, *args, **kwargs):

class ReiterableNonEmptyAccumulators(object):
def __iter__(self):
return itertools.ifilter(filter_fn, accumulators)
return filter(filter_fn, accumulators)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be a potential source for the performance loss --> I'll update this to use ifilter on PY2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesccychen and @tvalentyn: is there more detailed info on the benchmarks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Let me test the pipeline with this change. Unfortunately it's not easy to export the benchmark data.


# It's (weakly) assumed that self._fn is associative.
return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs)
Expand Down Expand Up @@ -902,7 +907,8 @@ def with_outputs(self, *tags, **main_kw):
"""
main_tag = main_kw.pop('main', None)
if main_kw:
raise ValueError('Unexpected keyword arguments: %s' % main_kw.keys())
raise ValueError('Unexpected keyword arguments: %s' %
list(main_kw))
return _MultiParDo(self, tags, main_tag)

def _pardo_fn_data(self):
Expand Down Expand Up @@ -1666,7 +1672,6 @@ def expand(self, pcoll):


class Windowing(object):

def __init__(self, windowfn, triggerfn=None, accumulation_mode=None,
timestamp_combiner=None):
global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned
Expand Down Expand Up @@ -1712,6 +1717,10 @@ def __eq__(self, other):
and self.timestamp_combiner == other.timestamp_combiner)
return False

def __hash__(self):
return hash((self.windowfn, self.accumulation_mode,
self.timestamp_combiner))

def is_default(self):
return self._is_default

Expand Down Expand Up @@ -1792,7 +1801,7 @@ def __init__(self, windowfn, **kwargs):
accumulation_mode = kwargs.pop('accumulation_mode', None)
timestamp_combiner = kwargs.pop('timestamp_combiner', None)
if kwargs:
raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))
self.windowing = Windowing(
windowfn, triggerfn, accumulation_mode, timestamp_combiner)
super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
Expand Down Expand Up @@ -1861,7 +1870,7 @@ def __init__(self, **kwargs):
super(Flatten, self).__init__()
self.pipeline = kwargs.pop('pipeline', None)
if kwargs:
raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())
raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))

def _extract_input_pvalues(self, pvalueish):
try:
Expand Down Expand Up @@ -1906,7 +1915,7 @@ def __init__(self, value):
value: An object of values for the PCollection
"""
super(Create, self).__init__()
if isinstance(value, string_types):
if isinstance(value, (unicode, str, bytes)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check for both str and bytes since Python 2.7 defines bytes == str and on Python 3.X unicode == str.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #5729 (comment). Bytes in Python3 also shouldn't be allowed since we don't want to support creation of a PCollection of single bytes.

raise TypeError('PTransform Create: Refusing to treat string as '
'an iterable. (string=%r)' % value)
elif isinstance(value, dict):
Expand Down Expand Up @@ -1941,7 +1950,7 @@ def get_windowing(self, unused_inputs):

@staticmethod
def _create_source_from_iterable(values, coder):
return Create._create_source(map(coder.encode, values), coder)
return Create._create_source(list(map(coder.encode, values)), coder)

@staticmethod
def _create_source(serialized_values, coder):
Expand Down
13 changes: 10 additions & 3 deletions sdks/python/apache_beam/transforms/create_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
# limitations under the License.
#

from __future__ import absolute_import
from __future__ import division

from builtins import map
from builtins import next
from builtins import range

from apache_beam.io import iobase
from apache_beam.transforms.core import Create

Expand Down Expand Up @@ -57,15 +64,15 @@ def split(self, desired_bundle_size, start_position=None,
start_position = 0
if stop_position is None:
stop_position = len(self._serialized_values)
avg_size_per_value = self._total_size / len(self._serialized_values)
avg_size_per_value = self._total_size // len(self._serialized_values)
num_values_per_split = max(
int(desired_bundle_size / avg_size_per_value), 1)
int(desired_bundle_size // avg_size_per_value), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for an int call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to coerce it into an int; e.g. 4 // 7.0 has value 0.0 but type float.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I didn't realise that avg_size_per_value could be a float.

start = start_position
while start < stop_position:
end = min(start + num_values_per_split, stop_position)
remaining = stop_position - end
# Avoid having a too small bundle at the end.
if remaining < (num_values_per_split / 4):
if remaining < (num_values_per_split // 4):
end = stop_position
sub_source = Create._create_source(
self._serialized_values[start:end], self._coder)
Expand Down
12 changes: 8 additions & 4 deletions sdks/python/apache_beam/transforms/create_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
#

"""Unit tests for the Create and _CreateSource classes."""
from __future__ import absolute_import
from __future__ import division

import logging
import unittest
from builtins import range

from apache_beam import Create
from apache_beam.coders import FastPrimitivesCoder
Expand All @@ -33,13 +37,13 @@ def setUp(self):

def test_create_transform(self):
with TestPipeline() as p:
assert_that(p | Create(range(10)), equal_to(range(10)))
assert_that(p | Create(list(range(10))), equal_to(list(range(10))))

def test_create_source_read(self):
self.check_read([], self.coder)
self.check_read([1], self.coder)
# multiple values.
self.check_read(range(10), self.coder)
self.check_read(list(range(10)), self.coder)

def check_read(self, values, coder):
source = Create._create_source_from_iterable(values, coder)
Expand All @@ -49,7 +53,7 @@ def check_read(self, values, coder):
def test_create_source_read_with_initial_splits(self):
self.check_read_with_initial_splits([], self.coder, num_splits=2)
self.check_read_with_initial_splits([1], self.coder, num_splits=2)
values = range(8)
values = list(range(8))
# multiple values with a single split.
self.check_read_with_initial_splits(values, self.coder, num_splits=1)
# multiple values with a single split with a large desired bundle size
Expand All @@ -70,7 +74,7 @@ def check_read_with_initial_splits(self, values, coder, num_splits):
from the split sources.
"""
source = Create._create_source_from_iterable(values, coder)
desired_bundle_size = source._total_size / num_splits
desired_bundle_size = source._total_size // num_splits
splits = source.split(desired_bundle_size)
splits_info = [
(split.source, split.start_position, split.stop_position)
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/transforms/cy_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
# limitations under the License.
#

# cython: language_level=3

"""A library of basic cythonized CombineFn subclasses.

For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import
from __future__ import division

from builtins import object

from apache_beam.transforms import core

Expand Down Expand Up @@ -162,7 +167,7 @@ def extract_output(self):
self.sum %= 2**64
if self.sum >= INT64_MAX:
self.sum -= 2**64
return self.sum / self.count if self.count else _NAN
return self.sum // self.count if self.count else _NAN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also make the change in line 266.



class CountCombineFn(AccumulatorCombineFn):
Expand Down Expand Up @@ -258,7 +263,7 @@ def merge(self, accumulators):
self.count += accumulator.count

def extract_output(self):
return self.sum / self.count if self.count else _NAN
return self.sum // self.count if self.count else _NAN


class SumFloatFn(AccumulatorCombineFn):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
otherwise, test on pure python module
"""

from __future__ import absolute_import

import unittest

from mock import Mock
Expand Down
5 changes: 3 additions & 2 deletions sdks/python/apache_beam/transforms/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import calendar
import inspect
import json
from builtins import object
from datetime import datetime
from datetime import timedelta

import six
from past.builtins import unicode

__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']

Expand Down Expand Up @@ -169,7 +170,7 @@ class DisplayDataItem(object):
display item belongs to.
"""
typeDict = {str:'STRING',
six.text_type:'STRING',
unicode:'STRING',
int:'INTEGER',
float:'FLOAT',
bool: 'BOOLEAN',
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/display_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

# pylint: disable=ungrouped-imports
import hamcrest as hc
import six
from hamcrest.core.base_matcher import BaseMatcher
from past.builtins import unicode

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -165,7 +165,7 @@ def test_create_list_display_data(self):
def test_unicode_type_display_data(self):
class MyDoFn(beam.DoFn):
def display_data(self):
return {'unicode_string': six.text_type('my string'),
return {'unicode_string': unicode('my string'),
'unicode_literal_string': u'my literal string'}

fn = MyDoFn()
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class and wrapper class that allows lambda functions to be used as
import os
import sys
import threading
from builtins import hex
from builtins import object
from builtins import zip
from functools import reduce

from google.protobuf import message
Expand Down Expand Up @@ -622,7 +625,7 @@ def __init__(self, fn, *args, **kwargs):
super(PTransformWithSideInputs, self).__init__()

if (any([isinstance(v, pvalue.PCollection) for v in args]) or
any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):
any([isinstance(v, pvalue.PCollection) for v in kwargs.values()])):
raise error.SideInputError(
'PCollection used directly as side input argument. Specify '
'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the '
Expand Down
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
"""Unit tests for the PTransform and descendants."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import operator
import re
import unittest
from builtins import map
from builtins import range
from builtins import zip
from functools import reduce

import hamcrest as hc
Expand Down Expand Up @@ -382,7 +386,7 @@ def test_combine_with_combine_fn(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(vals)
result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn())
assert_that(result, equal_to([sum(vals) / len(vals)]))
assert_that(result, equal_to([sum(vals) // len(vals)]))
pipeline.run()

def test_combine_with_callable(self):
Expand Down Expand Up @@ -413,8 +417,8 @@ def test_combine_per_key_with_combine_fn(self):
pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] +
[('b', x) for x in vals_2]))
result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn())
assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
('b', sum(vals_2) / len(vals_2))]))
assert_that(result, equal_to([('a', sum(vals_1) // len(vals_1)),
('b', sum(vals_2) // len(vals_2))]))
pipeline.run()

def test_combine_per_key_with_callable(self):
Expand Down
Loading