Skip to content

Commit

Permalink
Fixes more references to common_urns
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff committed Apr 20, 2018
1 parent 38681c6 commit 190974f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
10 changes: 4 additions & 6 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import standard_window_fns_pb2


class PropertiesFromEnumValue(object):
def __init__(self, value_descriptor):
self.urn = (
value_descriptor.GetOptions().Extensions[beam_runner_api_pb2.beam_urn])


class PropertiesFromEnumType(object):
def __init__(self, enum_type):
for v in enum_type.DESCRIPTOR.values:
setattr(self, v.name, PropertiesFromEnumValue(v))


primitives = PropertiesFromEnumType(
beam_runner_api_pb2.StandardPTransforms.Primitives)
deprecated_primitives = PropertiesFromEnumType(
Expand All @@ -44,16 +47,11 @@ def __init__(self, enum_type):

coders = PropertiesFromEnumType(beam_runner_api_pb2.StandardCoders.Enum)

coders.VARINT.urn = coders.VARINT.urn
coders.ITERABLE.urn = coders.ITERABLE.urn
coders.INTERVAL_WINDOW.urn = coders.INTERVAL_WINDOW.urn
coders.LENGTH_PREFIX.urn = coders.LENGTH_PREFIX.urn
coders.GLOBAL_WINDOW.urn = coders.GLOBAL_WINDOW.urn
coders.WINDOWED_VALUE.urn = coders.WINDOWED_VALUE.urn

def PropertiesFromPayloadType(payload_type):
return PropertiesFromEnumType(payload_type.Enum).PROPERTIES


global_windows = PropertiesFromPayloadType(
standard_window_fns_pb2.GlobalWindowsPayload)
fixed_windows = PropertiesFromPayloadType(
Expand Down
10 changes: 6 additions & 4 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def visit_transform(self, transform_node):
new_side_inputs = []
for ix, side_input in enumerate(transform_node.side_inputs):
access_pattern = side_input._side_input_data().access_pattern
if access_pattern == common_urns.ITERABLE_SIDE_INPUT:
if access_pattern == common_urns.side_inputs.ITERABLE.urn:
# Add a map to ('', value) as Dataflow currently only handles
# keyed side inputs.
pipeline = side_input.pvalue.pipeline
Expand All @@ -260,7 +260,7 @@ def visit_transform(self, transform_node):
map_to_void_key.add_output(new_side_input.pvalue)
parent.add_part(map_to_void_key)
transform_node.update_input_refcounts()
elif access_pattern == common_urns.MULTIMAP_SIDE_INPUT:
elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
# Ensure the input coder is a KV coder and patch up the
# access pattern to appease Dataflow.
side_input.pvalue.element_type = typehints.coerce_to_kv_type(
Expand Down Expand Up @@ -959,7 +959,8 @@ class _DataflowIterableSideInput(_DataflowSideInput):
def __init__(self, iterable_side_input):
# pylint: disable=protected-access
side_input_data = iterable_side_input._side_input_data()
assert side_input_data.access_pattern == common_urns.ITERABLE_SIDE_INPUT
assert (
side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
iterable_view_fn = side_input_data.view_fn
self._data = beam.pvalue.SideInputData(
self.DATAFLOW_MULTIMAP_URN,
Expand All @@ -978,7 +979,8 @@ def __init__(self, side_input):
# pylint: disable=protected-access
self.pvalue = side_input.pvalue
side_input_data = side_input._side_input_data()
assert side_input_data.access_pattern == common_urns.MULTIMAP_SIDE_INPUT
assert (
side_input_data.access_pattern == common_urns.side_inputs.MULTIMAP.urn)
self._data = beam.pvalue.SideInputData(
self.DATAFLOW_MULTIMAP_URN,
side_input_data.window_mapping_fn,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import os
import platform
import re
import warnings
from distutils.version import StrictVersion

Expand Down Expand Up @@ -145,6 +144,7 @@ def run(self):
warnings.warn("Could not import gen_protos, skipping proto generation.")
return original_cmd


python_requires = '>=2.7'
if os.environ.get('BEAM_EXPERIMENTAL_PY3') is None:
python_requires += ',<3.0'
Expand Down

0 comments on commit 190974f

Please sign in to comment.