From 45c046308e1afeb6d4fcf18a8a3861a12e9933f9 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2017 12:11:44 -0700 Subject: [PATCH 01/16] Implement FnApi side inputs in Python. --- sdks/python/apache_beam/coders/typecoders.py | 2 +- sdks/python/apache_beam/pipeline.py | 29 +++- sdks/python/apache_beam/pvalue.py | 99 +++++++++++ .../runners/portability/fn_api_runner.py | 154 ++++++++++++++++-- .../runners/portability/fn_api_runner_test.py | 64 +++++++- .../runners/worker/bundle_processor.py | 99 +++++++++-- .../apache_beam/runners/worker/data_plane.py | 18 +- .../apache_beam/runners/worker/operations.pxd | 1 + .../apache_beam/runners/worker/operations.py | 17 +- .../apache_beam/runners/worker/sdk_worker.py | 91 ++++++++++- sdks/python/apache_beam/transforms/core.py | 21 ++- .../apache_beam/transforms/sideinputs.py | 21 ++- sdks/python/apache_beam/utils/urns.py | 5 + 13 files changed, 557 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 797aee5815f5..433a3b0e31c4 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -131,7 +131,7 @@ def get_coder(self, typehint): # TODO(robertwb): Clean this up when type inference is fully enabled. pass else: - warnings.warn('Using fallback coder for typehint: %r.' % typehint) + pass #warnings.warn('Using fallback coder for typehint: %r.' % typehint) coder = self._fallback_coder return coder.from_type_hint(typehint, self) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c670978b08d2..50d81271a577 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -506,9 +506,6 @@ def enter_composite_transform(self, transform_node): self.visit_transform(transform_node) def visit_transform(self, transform_node): - if transform_node.side_inputs: - # No side inputs (yet). - Visitor.ok = False try: # Transforms must be picklable. pickler.loads(pickler.dumps(transform_node.transform, @@ -730,8 +727,12 @@ def visit(self, visitor, pipeline, visited): def named_inputs(self): # TODO(BEAM-1833): Push names up into the sdk construction. - return {str(ix): input for ix, input in enumerate(self.inputs) - if isinstance(input, pvalue.PCollection)} + main_inputs = {str(ix): input + for ix, input in enumerate(self.inputs) + if isinstance(input, pvalue.PCollection)} + side_inputs = {"side%s" % ix: si.pvalue + for ix, si in enumerate(self.side_inputs)} + return dict(main_inputs, **side_inputs) def named_outputs(self): return {str(tag): output for tag, output in self.outputs.items() @@ -750,7 +751,6 @@ def transform_to_runner_api(transform, context): spec=transform_to_runner_api(self.transform, context), subtransforms=[context.transforms.get_id(part, label=part.full_label) for part in self.parts], - # TODO(BEAM-115): Side inputs. inputs={tag: context.pcollections.get_id(pc) for tag, pc in self.named_inputs().items()}, outputs={str(tag): context.pcollections.get_id(out) @@ -760,12 +760,25 @@ def transform_to_runner_api(transform, context): @staticmethod def from_runner_api(proto, context): + def is_side_input(tag): + return tag.startswith('side') + main_inputs = [context.pcollections.get_by_id(id) + for tag, id in proto.inputs.items() + if not is_side_input(tag)] + # Ordering is important here. + indexed_side_inputs = [(int(tag[4:]), context.pcollections.get_by_id(id)) + for tag, id in proto.inputs.items() + if is_side_input(tag)] + side_inputs = [si for _, si in sorted(indexed_side_inputs)] result = AppliedPTransform( parent=None, transform=ptransform.PTransform.from_runner_api(proto.spec, context), full_label=proto.unique_name, - inputs=[ - context.pcollections.get_by_id(id) for id in proto.inputs.values()]) + inputs=main_inputs) + if result.transform and result.transform.side_inputs: + for si, pcoll in zip(result.transform.side_inputs, side_inputs): + si.pvalue = pcoll + result.side_inputs = tuple(result.transform.side_inputs) result.parts = [ context.transforms.get_by_id(id) for id in proto.subtransforms] result.outputs = { diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index d2d365369fca..d8e1ce68db3c 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -28,7 +28,12 @@ import itertools +from apache_beam import coders from apache_beam import typehints +from apache_beam.internal import pickler +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.utils import urns + __all__ = [ 'PCollection', @@ -288,6 +293,79 @@ def _view_options(self): def element_type(self): return typehints.Any + # TODO(robertwb): Get rid of _from_runtime_iterable and _view_options + # in favor of _side_input_data(). + def _side_input_data(self): + view_options = self._view_options() + from_runtime_iterable = type(self)._from_runtime_iterable + return SideInputData( + urns.ITERABLE_ACCESS, + self._window_mapping_fn, + lambda iterable: from_runtime_iterable(iterable, view_options), + self._input_element_coder()) + + def _input_element_coder(self): + return coders.WindowedValueCoder( + coders.registry.get_coder(self.pvalue.element_type), + window_coder=self.pvalue.windowing.windowfn.get_window_coder()) + + def to_runner_api(self, context): + return self._side_input_data().to_runner_api(context) + + @staticmethod + def from_runner_api(proto, context): + return _UnpickledSideInput( + SideInputData.from_runner_api(proto, context)) + + +class _UnpickledSideInput(AsSideInput): + def __init__(self, side_input_data): + self._data = side_input_data + # For older runners. + self._window_mapping_fn = side_input_data.window_mapping_fn + + @staticmethod + def _from_runtime_iterable(it, options): + return options['data'].view_fn(it) + + def _view_options(self): + base = super(_UnpickledSideInput, self)._view_options() + base['data'] = self._data + return base + + def _side_input_data(self): + return self._data + + +class SideInputData(object): + def __init__(self, access_pattern, window_mapping_fn, view_fn, coder): + self.access_pattern = access_pattern + self.window_mapping_fn = window_mapping_fn + self.view_fn = view_fn + self.coder = coder + + def to_runner_api(self, unused_context): + return beam_runner_api_pb2.SideInput( + access_pattern=beam_runner_api_pb2.FunctionSpec( + urn=self.access_pattern), + view_fn=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urns.PICKLED_PYTHON_VIEWFN, + payload=pickler.dumps((self.view_fn, self.coder)))), + window_mapping_fn=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urns.PICKLED_WINDOW_MAPPING_FN, + payload=pickler.dumps(self.window_mapping_fn)))) + + @staticmethod + def from_runner_api(proto, unused_context): + assert proto.view_fn.spec.urn == urns.PICKLED_PYTHON_VIEWFN + assert proto.window_mapping_fn.spec.urn == urns.PICKLED_WINDOW_MAPPING_FN + return SideInputData( + proto.access_pattern.urn, + pickler.loads(proto.window_mapping_fn.spec.payload), + *pickler.loads(proto.view_fn.spec.payload)) + class AsSingleton(AsSideInput): """Marker specifying that an entire PCollection is to be used as a side input. @@ -358,6 +436,13 @@ def __repr__(self): def _from_runtime_iterable(it, options): return it + def _side_input_data(self): + return SideInputData( + urns.ITERABLE_ACCESS, + self._window_mapping_fn, + lambda iterable: iterable, + self._input_element_coder()) + @property def element_type(self): return typehints.Iterable[self.pvalue.element_type] @@ -382,6 +467,13 @@ class AsList(AsSideInput): def _from_runtime_iterable(it, options): return list(it) + def _side_input_data(self): + return SideInputData( + urns.ITERABLE_ACCESS, + self._window_mapping_fn, + list, + self._input_element_coder()) + class AsDict(AsSideInput): """Marker specifying a PCollection to be used as an indexable side input. @@ -403,6 +495,13 @@ class AsDict(AsSideInput): def _from_runtime_iterable(it, options): return dict(it) + def _side_input_data(self): + return SideInputData( + urns.ITERABLE_ACCESS, + self._window_mapping_fn, + dict, + self._input_element_coder()) + class EmptySideInput(object): """Value indicating when a singleton side input was empty. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 463f78f08c78..dc8a55a19a8e 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -65,7 +65,8 @@ def __init__(self): self._push_queue = queue.Queue() self._pull_queue = queue.Queue() setattr(self, method_name, self.run) - self._read_thread = threading.Thread(target=self._read) + self._read_thread = threading.Thread( + name='streaming_rpc_handler-read', target=self._read) self._started = False def run(self, iterator, context): @@ -155,6 +156,35 @@ def __iter__(self): return iter([output_stream.get()]) +class _WindowGroupingBuffer(object): + """Used to partition windowed side inputs.""" + def __init__(self, side_input_data): + # Here's where we would use a different type of partitioning + # (e.g. also by key) for a different access pattern. + assert side_input_data.access_pattern == urns.ITERABLE_ACCESS + self._windowed_value_coder = side_input_data.coder + self._window_coder = side_input_data.coder.window_coder + self._value_coder = side_input_data.coder.wrapped_value_coder + self._values_by_window = collections.defaultdict(list) + + def append(self, elements_data): + input_stream = create_InputStream(elements_data) + while input_stream.size() > 0: + windowed_value = self._windowed_value_coder.get_impl( + ).decode_from_stream(input_stream, True) + for window in windowed_value.windows: + self._values_by_window[window].append(windowed_value.value) + + def items(self): + value_coder_impl = self._value_coder.get_impl() + for window, values in self._values_by_window.items(): + encoded_window = self._window_coder.encode(window) + output_stream = create_OutputStream() + for value in values: + value_coder_impl.encode_to_stream(value, output_stream, True) + yield encoded_window, output_stream.get() + + class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner): def __init__(self, use_grpc=False, sdk_harness_factory=None): @@ -175,6 +205,8 @@ def _next_uid(self): def run(self, pipeline): MetricsEnvironment.set_metrics_supported(self.has_metrics_support()) if pipeline._verify_runner_api_compatible(): + #print pipeline.to_runner_api() + print "Running pipeline..." return self.run_via_runner_api(pipeline.to_runner_api()) else: return super(FnApiRunner, self).run(pipeline) @@ -204,13 +236,24 @@ def __init__(self, name, transforms, self.downstream_side_inputs = downstream_side_inputs self.must_follow = must_follow + @property + def must_follow(self): + return self._must_follow + + @must_follow.setter + def must_follow(self, value): + assert isinstance(value, frozenset) + self._must_follow = value + def __repr__(self): must_follow = ', '.join(prev.name for prev in self.must_follow) - return "%s\n %s\n must follow: %s" % ( + downstream_side_inputs = ', '.join(str(si) for si in self.downstream_side_inputs) + return "%s\n %s\n must follow: %s\n downstream_side_inputs: %s" % ( self.name, '\n'.join(["%s:%s" % (transform.unique_name, transform.spec.urn) for transform in self.transforms]), - must_follow) + must_follow, + downstream_side_inputs) def can_fuse(self, consumer): def no_overlap(a, b): @@ -469,11 +512,12 @@ def compute_downstream_side_inputs(stage): for transform in stage.transforms: for output in transform.outputs.values(): if output in all_side_inputs: - downstream_side_inputs = union(downstream_side_inputs, output) - for consumer in consumers[output]: - downstream_side_inputs = union( - downstream_side_inputs, - compute_downstream_side_inputs(consumer)) + downstream_side_inputs = union( + downstream_side_inputs, frozenset([output])) + for consumer in consumers[output]: + downstream_side_inputs = union( + downstream_side_inputs, + compute_downstream_side_inputs(consumer)) downstream_side_inputs_by_stage[stage] = downstream_side_inputs return downstream_side_inputs_by_stage[stage] @@ -524,7 +568,7 @@ def fuse(producer, consumer): producer = replacement(producer) consumer = replacement(consumer) # Update consumer.must_follow set, as it's used in can_fuse. - consumer.must_follow = set( + consumer.must_follow = frozenset( replacement(s) for s in consumer.must_follow) if producer.can_fuse(consumer): fuse(producer, consumer) @@ -549,8 +593,11 @@ def fuse(producer, consumer): spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, payload=pcoll_as_param))], - must_follow={write_pcoll}) + must_follow=frozenset([write_pcoll])) fuse(read_pcoll, consumer) + else: + consumer.must_follow = union( + consumer.must_follow, frozenset([write_pcoll])) # Everything that was originally a stage or a replacement, but wasn't # replaced, should be in the final graph. @@ -658,9 +705,9 @@ def extract_endpoints(stage): data_side_input = {} data_output = {} for transform in stage.transforms: - pcoll_id = transform.spec.payload if transform.spec.urn in (bundle_processor.DATA_INPUT_URN, bundle_processor.DATA_OUTPUT_URN): + pcoll_id = transform.spec.payload if transform.spec.urn == bundle_processor.DATA_INPUT_URN: target = transform.unique_name, only_element(transform.outputs) data_input[target] = pcoll_id @@ -673,13 +720,18 @@ def extract_endpoints(stage): transform.spec.payload = data_operation_spec.SerializeToString() else: transform.spec.payload = "" + elif transform.spec.urn == urns.PARDO_TRANSFORM: + payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) + for tag, si in payload.side_inputs.items(): + data_side_input[transform.unique_name, tag] = ( + 'materialize:' + transform.inputs[tag], + beam.pvalue.SideInputData.from_runner_api(si, None)) return data_input, data_side_input, data_output logging.info('Running %s', stage.name) logging.debug(' %s', stage) data_input, data_side_input, data_output = extract_endpoints(stage) - if data_side_input: - raise NotImplementedError('Side inputs.') process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor( id=self._next_uid(), @@ -711,6 +763,20 @@ def extract_endpoints(stage): data_out.write(element_data) data_out.close() + # Store the required side inputs into state. + for (transform_id, tag), (pcoll_id, si) in data_side_input.items(): + elements_by_window = _WindowGroupingBuffer(si) + for element_data in pcoll_buffers[pcoll_id]: + elements_by_window.append(element_data) + for window, elements_data in elements_by_window.items(): + state_key = beam_fn_api_pb2.StateKey( + multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput( + ptransform_id=transform_id, + side_input_id=tag, + window=window)) + controller.state_handler.blocking_append( + state_key, elements_data, process_bundle.instruction_id) + # Register and start running the bundle. controller.control_handler.push(process_bundle_registration) controller.control_handler.push(process_bundle) @@ -981,11 +1047,15 @@ def __init__(self): self._all = collections.defaultdict(list) def Get(self, state_key): + print "GET ", str(state_key).replace("\n", " "), repr(''.join(self._all[self._to_key(state_key)])) + import pprint + pprint.pprint(dict(self._all)) return beam_fn_api_pb2.Elements.Data( data=''.join(self._all[self._to_key(state_key)])) def Append(self, state_key, data): - self._all[self._to_key(state_key)].extend(data) + print "APPEND", str(state_key).replace("\n", " "), repr(data) + self._all[self._to_key(state_key)].append(data) def Clear(self, state_key): try: @@ -995,14 +1065,56 @@ def Clear(self, state_key): @staticmethod def _to_key(state_key): - return state_key.window, state_key.key + return state_key.SerializeToString() + + class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer): + + def __init__(self): + self._lock = threading.Lock() + self._state = collections.defaultdict(list) + + def blocking_get(self, state_key, instruction_reference=None): + with self._lock: + return ''.join(self._state[self._to_key(state_key)]) + + def blocking_append(self, state_key, data, instruction_reference=None): + with self._lock: + self._state[self._to_key(state_key)].append(data) + + def blocking_clear(self, state_key, instruction_reference=None): + with self._lock: + del self._state[self._to_key(state_key)] + + def _to_key(self, state_key): + return state_key.SerializeToString() + + class GrpcStateServicer( + StateServicer, beam_fn_api_pb2_grpc.BeamFnStateServicer): + def State(self, request_stream, context=None): + # Note that this eagerly mutates state, assuming any failures are fatal. + for request in request_stream: + if request.get: + yield beam_fn_api_pb2.StateResponse( + id=request.id, + get=beam_fn_api_pb2.StateGetResponse( + data=self.blocking_get(request.state_key))) + elif request.append: + data=self.blocking_append(request.state_key, request.append.data) + yield beam_fn_api_pb2.StateResponse( + id=request.id, + append=beam_fn_api_pb2.AppendResponse()) + elif request.clear: + data=self.blocking_clear(request.state_key) + yield beam_fn_api_pb2.StateResponse( + id=request.id, + clear=beam_fn_api_pb2.ClearResponse()) class DirectController(object): """An in-memory controller for fn API control, state and data planes.""" def __init__(self): self._responses = [] - self.state_handler = FnApiRunner.SimpleState() + self.state_handler = FnApiRunner.StateServicer() self.control_handler = self self.data_plane_handler = data_plane.InMemoryDataChannel() self.worker = sdk_worker.SdkWorker( @@ -1032,7 +1144,6 @@ class GrpcController(object): def __init__(self, sdk_harness_factory=None): self.sdk_harness_factory = sdk_harness_factory - self.state_handler = FnApiRunner.SimpleState() self.control_server = grpc.server( futures.ThreadPoolExecutor(max_workers=10)) self.control_port = self.control_server.add_insecure_port('[::]:0') @@ -1049,6 +1160,12 @@ def __init__(self, sdk_harness_factory=None): beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server( self.data_plane_handler, self.data_server) + # TODO(robertwb): Is sharing the control channel fine? Alternatively, + # how should this be plumbed? + self.state_handler = FnApiRunner.GrpcStateServicer() + beam_fn_api_pb2_grpc.add_BeamFnStateServicer_to_server( + self.state_handler, self.control_server) + logging.info('starting control server on port %s', self.control_port) logging.info('starting data server on port %s', self.data_port) self.data_server.start() @@ -1056,7 +1173,8 @@ def __init__(self, sdk_harness_factory=None): self.worker = (self.sdk_harness_factory or sdk_worker.SdkHarness)( 'localhost:%s' % self.control_port) - self.worker_thread = threading.Thread(target=self.worker.run) + self.worker_thread = threading.Thread( + name='run_worker', target=self.worker.run) logging.info('starting worker') self.worker_thread.start() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 31f1b6ff0c67..5086ad2cdaa4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -24,12 +24,31 @@ from apache_beam.runners.portability import maptask_executor_runner_test from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms import window try: from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS except ImportError: DEFAULT_SAMPLING_PERIOD_MS = 0 +if True: + import threading + import time + def dump_threads(): + while True: + print + for thread in threading.enumerate(): + if not thread.daemon: + print('...', + thread.name, + thread.__dict__['_Thread__target'], + getattr(thread.__dict__['_Thread__target'], 'func_code', None)) + time.sleep(5) + t = threading.Thread(name='dump_threads', target=dump_threads) + t.daemon = True + t.start() + + class FnApiRunnerTest( maptask_executor_runner_test.MapTaskExecutorRunnerTest): @@ -46,9 +65,50 @@ def test_combine_per_key(self): # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. pass + def test_much_data_channel(self): + class Permute(beam.PTransform): + def expand(self, pcoll): + import pprint + return ( + pcoll + | beam.Map(lambda (k, v), _: (v, k), beam.pvalue.AsList(pcoll)) + | beam.GroupByKey() + | beam.FlatMap(lambda (k, vs): [(k, v) for v in vs]) + #| beam.Map(lambda x: pprint.pprint(x) or x) + ) + for _ in range(5): + with self.create_pipeline() as p: + pcoll = p | beam.Create(enumerate(range(3) + range(5))) + for ix in range(3): + pcoll = pcoll | "Permute%s" % ix >> Permute() + + def test_pardo_side_inputs(self): - # TODO(BEAM-1348): Enable once side inputs are supported in fn API. - pass + + + def cross_product(elem, sides): + print elem, sides + for side in sides: + yield elem, side + with self.create_pipeline() as p: +# main = p | 'main' >> beam.Create(['a', 'b', 'c']) +# side = p | 'side' >> beam.Create(['x', 'y']) +# main | beam.FlatMap(cross_product, beam.pvalue.AsList(side)) + +# assert_that(main | beam.FlatMap(cross_product, AsList(side)), +# equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'), +# ('a', 'y'), ('b', 'y'), ('c', 'y')])) + +# with self.create_pipeline() as p: +# pcoll = p | beam.Create([1, 2, 3, 11, 15, 20]) | beam.Map( + pcoll = p | beam.Create(range(30)) | beam.Map( + lambda t: window.TimestampedValue(t, t)) + main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(7)) + side = pcoll | 'WindowSide' >> beam.WindowInto(window.FixedWindows(10)) + res = main | "USE SIDES" >> beam.Map(lambda x, s: (x, s), beam.pvalue.AsList(side)) + import pprint + res | beam.Map(pprint.pprint) + def test_pardo_unfusable_side_inputs(self): # TODO(BEAM-1348): Enable once side inputs are supported in fn API. diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 1049ae1c0e0a..df09a850471a 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -39,6 +39,7 @@ from apache_beam.runners.dataflow.native_io import iobase as native_iobase from apache_beam.runners.worker import operation_specs from apache_beam.runners.worker import operations +from apache_beam.transforms import sideinputs from apache_beam.utils import counters from apache_beam.utils import proto_utils from apache_beam.utils import urns @@ -162,6 +163,43 @@ def __iter__(self): yield self._coder.get_impl().decode_from_stream(input_stream, True) +class StateBackedSideInputMap(object): + def __init__(self, state_handler, transform_id, tag, side_input_data): + self._state_handler = state_handler + self._transform_id = transform_id + self._tag = tag + self._side_input_data = side_input_data + self._element_coder = side_input_data.coder.wrapped_value_coder + self._target_window_coder = side_input_data.coder.window_coder + # TODO(robertwb): Limit the cache size. + # TODO(robertwb): Cross-bundle caching respecting cache tokens. + self._cache = {} + + def __getitem__(self, window): + target_window = self._side_input_data.window_mapping_fn(window) + if target_window not in self._cache: + state_key = beam_fn_api_pb2.StateKey( + multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput( + ptransform_id=self._transform_id, + side_input_id=self._tag, + window=self._target_window_coder.encode(target_window))) + element_coder_impl = self._element_coder.get_impl() + state_handler = self._state_handler + class AllElements(object): + def __iter__(self): + # TODO(robertwb): Support pagination. + input_stream = coder_impl.create_InputStream( + state_handler.blocking_get(state_key, None)) + while input_stream.size() > 0: + yield element_coder_impl.decode_from_stream(input_stream, True) + self._cache[target_window] = self._side_input_data.view_fn(AllElements()) + return self._cache[target_window] + + def is_globally_windowed(self): + return (self._side_input_data.window_mapping_fn + == sideinputs._global_window_mapping_fn) + + def memoize(func): cache = {} missing = object() @@ -201,10 +239,17 @@ def create_execution_tree(self, descriptor): descriptor, self.data_channel_factory, self.counter_factory, self.state_sampler, self.state_handler) + def is_side_input(transform_proto, tag): + if transform_proto.spec.urn == urns.PARDO_TRANSFORM: + return tag in proto_utils.parse_Bytes( + transform_proto.spec.payload, + beam_runner_api_pb2.ParDoPayload).side_inputs + pcoll_consumers = collections.defaultdict(list) for transform_id, transform_proto in descriptor.transforms.items(): - for pcoll_id in transform_proto.inputs.values(): - pcoll_consumers[pcoll_id].append(transform_id) + for tag, pcoll_id in transform_proto.inputs.items(): + if not is_side_input(transform_proto, tag): + pcoll_consumers[pcoll_id].append(transform_id) @memoize def get_operation(transform_id): @@ -412,7 +457,6 @@ def create(factory, transform_id, transform_proto, parameter, consumers): @BeamTransformFactory.register_urn( urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload) def create(factory, transform_id, transform_proto, parameter, consumers): - # The Dataflow runner harness strips the base64 encoding. source = iobase.SourceBase.from_runner_api(parameter.source, factory.context) spec = operation_specs.WorkerRead( iobase.SourceBundle(1.0, source, None, None), @@ -455,12 +499,15 @@ def create(factory, transform_id, transform_proto, parameter, consumers): side_input_data = [] return _create_pardo_operation( factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data) + serialized_fn, side_input_data, parameter.side_inputs) def _create_pardo_operation( factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data): + serialized_fn, side_input_data, side_inputs_proto=None): + + fn, args, kwargs, tags_and_types, windowing = pickler.loads(serialized_fn) + def create_side_input(tag, coder): # TODO(robertwb): Extract windows (and keys) out of element data. # TODO(robertwb): Extract state key from ParDoPayload. @@ -468,9 +515,28 @@ def create_side_input(tag, coder): tag=tag, source=SideInputSource( factory.state_handler, - beam_fn_api_pb2.StateKey.MultimapSideInput( - key=side_input_tag(transform_id, tag)), + beam_fn_api_pb2.StateKey( + multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput( + ptransform_id=transform_id, + side_input_id=tag, + window="TODO", + key="TODO")), coder=coder)) + + if side_inputs_proto: + tagged_side_inputs = [ + (tag, beam.pvalue.SideInputData.from_runner_api(si, factory.context)) + for tag, si in side_inputs_proto.items()] + tagged_side_inputs.sort(key=lambda tag_si: int(tag_si[0][4:])) + side_inputs = None, + side_input_maps = [ + StateBackedSideInputMap(factory.state_handler, transform_id, tag, si) + for tag, si in tagged_side_inputs] + else: + side_inputs = [ + create_side_input(tag, coder) for tag, coder in side_input_data] + side_input_maps = None + output_tags = list(transform_proto.outputs.keys()) # Hack to match out prefix injected by dataflow runner. @@ -482,27 +548,30 @@ def mutate_tag(tag): return 'out_' + tag else: return tag - dofn_data = pickler.loads(serialized_fn) - if not dofn_data[-1]: + + if not windowing: # Windowing not set. - pcoll_id, = transform_proto.inputs.values() + side_input_tags = side_inputs_proto or () + pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items() + if tag not in side_input_tags] windowing = factory.context.windowing_strategies.get_by_id( factory.descriptor.pcollections[pcoll_id].windowing_strategy_id) - serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing,)) + output_coders = factory.get_output_coders(transform_proto) spec = operation_specs.WorkerDoFn( - serialized_fn=serialized_fn, + serialized_fn=pickler.dumps( + (fn, args, kwargs, tags_and_types, windowing)), output_tags=[mutate_tag(tag) for tag in output_tags], input=None, - side_inputs=[ - create_side_input(tag, coder) for tag, coder in side_input_data], + side_inputs=side_inputs, output_coders=[output_coders[tag] for tag in output_tags]) return factory.augment_oldstyle_op( operations.DoOperation( transform_proto.unique_name, spec, factory.counter_factory, - factory.state_sampler), + factory.state_sampler, + side_input_maps), transform_proto.unique_name, consumers, output_tags) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 5a511a088bf4..f2a3751dc2ab 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -25,6 +25,7 @@ import collections import logging import Queue as queue +import sys import threading import grpc @@ -147,6 +148,7 @@ def __init__(self): self._receive_lock = threading.Lock() self._reads_finished = threading.Event() self._closed = False + self._exc_info = None def close(self): self._to_send.put(self._WRITES_FINISHED) @@ -163,12 +165,17 @@ def input_elements(self, instruction_id, expected_targets): received = self._receiving_queue(instruction_id) done_targets = [] while len(done_targets) < len(expected_targets): - data = received.get() - if not data.data and data.target in expected_targets: - done_targets.append(data.target) + try: + data = received.get(timeout=1) + except queue.Empty: + if self._exc_info: + raise exc_info[0], exc_info[1], exc_info[2] else: - assert data.target not in done_targets - yield data + if not data.data and data.target in expected_targets: + done_targets.append(data.target) + else: + assert data.target not in done_targets + yield data def output_stream(self, instruction_id, target): # TODO: Return an output stream that sends data @@ -215,6 +222,7 @@ def _read_inputs(self, elements_iterator): except: # pylint: disable=bare-except if not self._closed: logging.exception('Failed to read inputs in the data plane') + self._exc_info = sys.exc_info() raise finally: self._reads_finished.set() diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd index d380a45d982a..cb05c90d242c 100644 --- a/sdks/python/apache_beam/runners/worker/operations.pxd +++ b/sdks/python/apache_beam/runners/worker/operations.pxd @@ -72,6 +72,7 @@ cdef class DoOperation(Operation): cdef object dofn_runner cdef Receiver dofn_receiver cdef object tagged_receivers + cdef object side_input_maps cdef class CombineOperation(Operation): cdef object phased_combine_fn diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index ed3f3b8f466c..6b5f0246c10a 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -259,6 +259,11 @@ def __missing__(self, tag): class DoOperation(Operation): """A Do operation that will execute a custom DoFn for each input element.""" + def __init__( + self, name, spec, counter_factory, sampler, side_input_maps=None): + super(DoOperation, self).__init__(name, spec, counter_factory, sampler) + self.side_input_maps = side_input_maps + def _read_side_inputs(self, tags_and_types): """Generator reading side inputs in the order prescribed by tags_and_types. @@ -273,6 +278,10 @@ def _read_side_inputs(self, tags_and_types): either in singleton or collection mode according to the tags_and_types argument. """ + # Only call this on the old path where side_input_maps was not + # provided directly. + assert self.side_input_maps is None + # We will read the side inputs in the order prescribed by the # tags_and_types argument because this is exactly the order needed to # replace the ArgumentPlaceholder objects in the args/kwargs of the DoFn @@ -336,8 +345,14 @@ def start(self): raise ValueError('Unexpected output name for operation: %s' % tag) self.tagged_receivers[original_tag] = self.receivers[index] + if self.side_input_maps is None: + if tags_and_types: + self.side_input_maps = list(self._read_side_inputs(tags_and_types)) + else: + self.side_input_maps = [] + self.dofn_runner = common.DoFnRunner( - fn, args, kwargs, self._read_side_inputs(tags_and_types), + fn, args, kwargs, self.side_input_maps, window_fn, context, self.tagged_receivers, logger, self.step_name, scoped_metrics_container=self.scoped_metrics_container) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index b08e47372436..d27edd6b5a68 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -24,6 +24,8 @@ import functools import logging import Queue as queue +import sys +import threading import traceback from concurrent import futures @@ -46,9 +48,10 @@ def __init__(self, control_address): def run(self): control_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel) - # TODO(robertwb): Wire up to new state api. - state_stub = None - self.worker = SdkWorker(state_stub, self._data_channel_factory) + state_stub = beam_fn_api_pb2_grpc.BeamFnStateStub(self._control_channel) + state_handler = GrpcStateHandler(state_stub) + state_handler.start() + self.worker = SdkWorker(state_handler, self._data_channel_factory) responses = queue.Queue() no_more_work = object() @@ -102,6 +105,7 @@ def handle_response(request, response_future): # control to its caller. responses.put(no_more_work) self._data_channel_factory.close() + state_handler.done() logging.info('Done consuming work.') @@ -148,3 +152,84 @@ def process_bundle(self, request, instruction_id): def process_bundle_progress(self, request, instruction_id): # It is an error to get progress for a not-in-flight bundle. return self.bundle_processors.get(instruction_id).metrics() + + +class GrpcStateHandler(object): + + _DONE = object() + + def __init__(self, state_stub): + self._lock = threading.Lock() + self._state_stub = state_stub + self._requests = queue.Queue() + self._responses = queue.Queue() + self._responses_by_id = {} + self._last_id = 0 + self._exc_info = None + + def start(self): + self._done = False + def request_iter(): + while True: + request = self._requests.get() + if request is self._DONE or self._done: + break + yield request + responses = self._state_stub.State(request_iter()) + def pull_responses(): + try: + for response in responses: + self._responses.put(response) + if self._done: + break + except: + self._exc_info = sys.exc_info() + reader = threading.Thread(target=pull_responses, name='read_state') + reader.daemon = True + reader.start() + + def done(self): + self._done = True + self._requests.put(self._DONE) + + def blocking_get(self, state_key, instruction_reference): + response = self._blocking_request( + beam_fn_api_pb2.StateRequest( + instruction_reference=instruction_reference, + state_key=state_key, + get=beam_fn_api_pb2.StateGetRequest())) + if response.get.continuation_token: + raise NotImplementedErrror + return response.get.data + + def blocking_append(self, state_key, data, instruction_reference): + self._blocking_request( + beam_fn_api_pb2.StateRequest( + instruction_reference=instruction_reference, + state_key=state_key, + append=beam_fn_api_pb2.StateAppendRequest(data=data))) + + def blocking_clear(self, state_key, instruction_reference): + self._blocking_request( + beam_fn_api_pb2.StateRequest( + instruction_reference=instruction_reference, + state_key=state_key, + clear=beam_fn_api_pb2.StateClearRequest())) + + def _blocking_request(self, request): + request.id = self._next_id() + self._requests.put(request) + while request.id not in self._responses_by_id: + with self._lock: + if request.id not in self._responses_by_id: + response = self._responses.get(timeout=2) + self._responses_by_id[response.id] = response + if self._exc_info: + raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + response = self._responses_by_id[response.id] + del self._responses_by_id[response.id] + return response + + def _next_id(self): + self._last_id += 1 + return str(self._last_id) \ No newline at end of file diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index ff2428e97f0e..58d182d789a2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -842,7 +842,7 @@ def with_outputs(self, *tags, **main_kw): return _MultiParDo(self, tags, main_tag) def _pardo_fn_data(self): - si_tags_and_types = [] + si_tags_and_types = None windowing = None return self.fn, self.args, self.kwargs, si_tags_and_types, windowing @@ -855,7 +855,15 @@ def to_runner_api_parameter(self, context): do_fn=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( urn=urns.PICKLED_DO_FN_INFO, - payload=picked_pardo_fn_data)))) + payload=picked_pardo_fn_data)), + # It'd be nice to name these according to their actual + # names/positions in the orignal argument list, but such a + # transformation is currently irreversible given how + # remove_objects_from_args and insert_values_in_args + # are currently implemented. + side_inputs = { + "side%s" % ix: si.to_runner_api(context) + for ix, si in enumerate(self.side_inputs)})) @PTransform.register_urn( urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload) @@ -864,10 +872,15 @@ def from_runner_api_parameter(pardo_payload, context): fn, args, kwargs, si_tags_and_types, windowing = pickler.loads( pardo_payload.do_fn.spec.payload) if si_tags_and_types: - raise NotImplementedError('deferred side inputs') + raise NotImplementedError('explicit side input data') elif windowing: raise NotImplementedError('explicit windowing') - return ParDo(fn, *args, **kwargs) + result = ParDo(fn, *args, **kwargs) + indexed_side_inputs = [ + (int(ix[4:]), pvalue.AsSideInput.from_runner_api(si, context)) + for ix, si in pardo_payload.side_inputs.items()] + result.side_inputs = [si for _, si in sorted(indexed_side_inputs)] + return result class _MultiParDo(PTransform): diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index f10cb92ed5e3..1592a3c8506b 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,6 +26,7 @@ from __future__ import absolute_import +from apache_beam import pvalue from apache_beam.transforms import window @@ -48,19 +49,25 @@ def map_via_end(source_window): class SideInputMap(object): """Represents a mapping of windows to side input values.""" - def __init__(self, view_class, view_options, iterable): - self._window_mapping_fn = view_options.get( - 'window_mapping_fn', _global_window_mapping_fn) - self._view_class = view_class - self._view_options = view_options + def __init__(self, side_input_data, old_view_options, iterable): + if isinstance(side_input_data, pvalue.SideInputData): # Remove? + self._view_fn = side_input_data.view_fn + self._window_mapping_fn = side_input_data.window_mapping_fn + else: + # Support older sdks/runners. + view_class = side_input_data + self._window_mapping_fn = old_view_options.get( + 'window_mapping_fn', _global_window_mapping_fn) + self._view_fn = lambda iterable: view_class._from_runtime_iterable( + iterable, old_view_options) self._iterable = iterable self._cache = {} def __getitem__(self, window): if window not in self._cache: target_window = self._window_mapping_fn(window) - self._cache[window] = self._view_class._from_runtime_iterable( - _FilteringIterable(self._iterable, target_window), self._view_options) + self._cache[window] = self._view_fn( + _FilteringIterable(self._iterable, target_window)) return self._cache[window] def is_globally_windowed(self): diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 2aeaa5340292..c6135ba13909 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -58,6 +58,11 @@ GLOBAL_WINDOW_CODER = "urn:beam:coders:urn:beam:coders:global_window:0.1" WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1" +ITERABLE_ACCESS = "urn:beam:sideinput:iterable" +MULTIMAP_ACCESS = "urn:beam:sideinput:multimap" +PICKLED_PYTHON_VIEWFN = "beam:view_fn:pickled_python_data:v0.1" +PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v0.1" + class RunnerApiFn(object): """Abstract base class that provides urn registration utilities. From 202dfc74e58e9736bb40ed0382fa03dd072b7ab4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2017 16:59:34 -0700 Subject: [PATCH 02/16] cleanup --- sdks/python/apache_beam/coders/typecoders.py | 2 +- sdks/python/apache_beam/pipeline.py | 3 +- sdks/python/apache_beam/pvalue.py | 1 + .../runners/portability/fn_api_runner.py | 24 +---- .../runners/portability/fn_api_runner_test.py | 94 +++++++------------ .../runners/worker/bundle_processor.py | 8 +- .../apache_beam/runners/worker/sdk_worker.py | 2 +- 7 files changed, 46 insertions(+), 88 deletions(-) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 433a3b0e31c4..797aee5815f5 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -131,7 +131,7 @@ def get_coder(self, typehint): # TODO(robertwb): Clean this up when type inference is fully enabled. pass else: - pass #warnings.warn('Using fallback coder for typehint: %r.' % typehint) + warnings.warn('Using fallback coder for typehint: %r.' % typehint) coder = self._fallback_coder return coder.from_type_hint(typehint, self) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 50d81271a577..62626a364896 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -730,7 +730,7 @@ def named_inputs(self): main_inputs = {str(ix): input for ix, input in enumerate(self.inputs) if isinstance(input, pvalue.PCollection)} - side_inputs = {"side%s" % ix: si.pvalue + side_inputs = {'side%s' % ix: si.pvalue for ix, si in enumerate(self.side_inputs)} return dict(main_inputs, **side_inputs) @@ -761,6 +761,7 @@ def transform_to_runner_api(transform, context): @staticmethod def from_runner_api(proto, context): def is_side_input(tag): + # As per named_inputs() above. return tag.startswith('side') main_inputs = [context.pcollections.get_by_id(id) for tag, id in proto.inputs.items() diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index d8e1ce68db3c..3cd9e92e0562 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -338,6 +338,7 @@ def _side_input_data(self): class SideInputData(object): + """All of the data about a side input except for the bound PCollection.""" def __init__(self, access_pattern, window_mapping_fn, view_fn, coder): self.access_pattern = access_pattern self.window_mapping_fn = window_mapping_fn diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index dc8a55a19a8e..a546abed18eb 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -66,7 +66,7 @@ def __init__(self): self._pull_queue = queue.Queue() setattr(self, method_name, self.run) self._read_thread = threading.Thread( - name='streaming_rpc_handler-read', target=self._read) + name='streaming_rpc_handler_read', target=self._read) self._started = False def run(self, iterator, context): @@ -205,8 +205,6 @@ def _next_uid(self): def run(self, pipeline): MetricsEnvironment.set_metrics_supported(self.has_metrics_support()) if pipeline._verify_runner_api_compatible(): - #print pipeline.to_runner_api() - print "Running pipeline..." return self.run_via_runner_api(pipeline.to_runner_api()) else: return super(FnApiRunner, self).run(pipeline) @@ -236,15 +234,6 @@ def __init__(self, name, transforms, self.downstream_side_inputs = downstream_side_inputs self.must_follow = must_follow - @property - def must_follow(self): - return self._must_follow - - @must_follow.setter - def must_follow(self, value): - assert isinstance(value, frozenset) - self._must_follow = value - def __repr__(self): must_follow = ', '.join(prev.name for prev in self.must_follow) downstream_side_inputs = ', '.join(str(si) for si in self.downstream_side_inputs) @@ -1047,15 +1036,11 @@ def __init__(self): self._all = collections.defaultdict(list) def Get(self, state_key): - print "GET ", str(state_key).replace("\n", " "), repr(''.join(self._all[self._to_key(state_key)])) - import pprint - pprint.pprint(dict(self._all)) return beam_fn_api_pb2.Elements.Data( data=''.join(self._all[self._to_key(state_key)])) def Append(self, state_key, data): - print "APPEND", str(state_key).replace("\n", " "), repr(data) - self._all[self._to_key(state_key)].append(data) + self._all[self._to_key(state_key)].extend(data) def Clear(self, state_key): try: @@ -1065,7 +1050,7 @@ def Clear(self, state_key): @staticmethod def _to_key(state_key): - return state_key.SerializeToString() + return state_key.window, state_key.key class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer): @@ -1085,7 +1070,8 @@ def blocking_clear(self, state_key, instruction_reference=None): with self._lock: del self._state[self._to_key(state_key)] - def _to_key(self, state_key): + @staticmethod + def _to_key(state_key): return state_key.SerializeToString() class GrpcStateServicer( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 5086ad2cdaa4..70362f5f7eab 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -31,24 +31,6 @@ except ImportError: DEFAULT_SAMPLING_PERIOD_MS = 0 -if True: - import threading - import time - def dump_threads(): - while True: - print - for thread in threading.enumerate(): - if not thread.daemon: - print('...', - thread.name, - thread.__dict__['_Thread__target'], - getattr(thread.__dict__['_Thread__target'], 'func_code', None)) - time.sleep(5) - t = threading.Thread(name='dump_threads', target=dump_threads) - t.daemon = True - t.start() - - class FnApiRunnerTest( maptask_executor_runner_test.MapTaskExecutorRunnerTest): @@ -65,54 +47,46 @@ def test_combine_per_key(self): # TODO(BEAM-1348): Enable once Partial GBK is supported in fn API. pass - def test_much_data_channel(self): - class Permute(beam.PTransform): - def expand(self, pcoll): - import pprint - return ( - pcoll - | beam.Map(lambda (k, v), _: (v, k), beam.pvalue.AsList(pcoll)) - | beam.GroupByKey() - | beam.FlatMap(lambda (k, vs): [(k, v) for v in vs]) - #| beam.Map(lambda x: pprint.pprint(x) or x) - ) - for _ in range(5): - with self.create_pipeline() as p: - pcoll = p | beam.Create(enumerate(range(3) + range(5))) - for ix in range(3): - pcoll = pcoll | "Permute%s" % ix >> Permute() - - def test_pardo_side_inputs(self): - - def cross_product(elem, sides): - print elem, sides for side in sides: yield elem, side with self.create_pipeline() as p: -# main = p | 'main' >> beam.Create(['a', 'b', 'c']) -# side = p | 'side' >> beam.Create(['x', 'y']) -# main | beam.FlatMap(cross_product, beam.pvalue.AsList(side)) - -# assert_that(main | beam.FlatMap(cross_product, AsList(side)), -# equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'), -# ('a', 'y'), ('b', 'y'), ('c', 'y')])) - -# with self.create_pipeline() as p: -# pcoll = p | beam.Create([1, 2, 3, 11, 15, 20]) | beam.Map( - pcoll = p | beam.Create(range(30)) | beam.Map( + main = p | 'main' >> beam.Create(['a', 'b', 'c']) + side = p | 'side' >> beam.Create(['x', 'y']) + assert_that(main | beam.FlatMap(cross_product, beam.pvalue.AsList(side)), + equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'), + ('a', 'y'), ('b', 'y'), ('c', 'y')])) + + # Now with some windowing. + pcoll = p | beam.Create(range(10)) | beam.Map( lambda t: window.TimestampedValue(t, t)) - main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(7)) - side = pcoll | 'WindowSide' >> beam.WindowInto(window.FixedWindows(10)) - res = main | "USE SIDES" >> beam.Map(lambda x, s: (x, s), beam.pvalue.AsList(side)) - import pprint - res | beam.Map(pprint.pprint) - - - def test_pardo_unfusable_side_inputs(self): - # TODO(BEAM-1348): Enable once side inputs are supported in fn API. - pass + # Intentionally choosing non-aligned windows to highlight the transition. + main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(5)) + side = pcoll | 'WindowSide' >> beam.WindowInto(window.FixedWindows(7)) + res = main | beam.Map(lambda x, s: (x, sorted(s)), + beam.pvalue.AsList(side)) + assert_that( + res, + equal_to([ + # The window [0, 5) maps to the window [0, 7). + (0, range(7)), + (1, range(7)), + (2, range(7)), + (3, range(7)), + (4, range(7)), + # The window [5, 10) maps to the window [7, 14). + (5, range(7, 10)), + (6, range(7, 10)), + (7, range(7, 10)), + (8, range(7, 10)), + (9, range(7, 10))]), + label='windowed') + + +# def test_pardo_unfusable_side_inputs(self): +# # TODO(BEAM-1348): Enable once side inputs are supported in fn API. +# pass def test_assert_that(self): # TODO: figure out a way for fn_api_runner to parse and raise the diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index df09a850471a..44aa2aa167c7 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -515,12 +515,8 @@ def create_side_input(tag, coder): tag=tag, source=SideInputSource( factory.state_handler, - beam_fn_api_pb2.StateKey( - multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput( - ptransform_id=transform_id, - side_input_id=tag, - window="TODO", - key="TODO")), + beam_fn_api_pb2.StateKey.MultimapSideInput( + key=side_input_tag(transform_id, tag)), coder=coder)) if side_inputs_proto: diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index d27edd6b5a68..6635b3081eb5 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -232,4 +232,4 @@ def _blocking_request(self, request): def _next_id(self): self._last_id += 1 - return str(self._last_id) \ No newline at end of file + return str(self._last_id) From 198c864aca35b6bca1313e2f216dbb40c73b27e0 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2017 17:02:02 -0700 Subject: [PATCH 03/16] Revert unneeded changes. --- .../apache_beam/transforms/sideinputs.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 1592a3c8506b..f10cb92ed5e3 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,7 +26,6 @@ from __future__ import absolute_import -from apache_beam import pvalue from apache_beam.transforms import window @@ -49,25 +48,19 @@ def map_via_end(source_window): class SideInputMap(object): """Represents a mapping of windows to side input values.""" - def __init__(self, side_input_data, old_view_options, iterable): - if isinstance(side_input_data, pvalue.SideInputData): # Remove? - self._view_fn = side_input_data.view_fn - self._window_mapping_fn = side_input_data.window_mapping_fn - else: - # Support older sdks/runners. - view_class = side_input_data - self._window_mapping_fn = old_view_options.get( - 'window_mapping_fn', _global_window_mapping_fn) - self._view_fn = lambda iterable: view_class._from_runtime_iterable( - iterable, old_view_options) + def __init__(self, view_class, view_options, iterable): + self._window_mapping_fn = view_options.get( + 'window_mapping_fn', _global_window_mapping_fn) + self._view_class = view_class + self._view_options = view_options self._iterable = iterable self._cache = {} def __getitem__(self, window): if window not in self._cache: target_window = self._window_mapping_fn(window) - self._cache[window] = self._view_fn( - _FilteringIterable(self._iterable, target_window)) + self._cache[window] = self._view_class._from_runtime_iterable( + _FilteringIterable(self._iterable, target_window), self._view_options) return self._cache[window] def is_globally_windowed(self): From 3dfe862220afbc5e57d7ac674a329cb5e66164cf Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2017 17:20:40 -0700 Subject: [PATCH 04/16] lint --- sdks/python/apache_beam/pvalue.py | 9 +++------ .../runners/portability/fn_api_runner.py | 16 +++++++++------- .../runners/worker/bundle_processor.py | 1 + .../apache_beam/runners/worker/sdk_worker.py | 5 ++++- sdks/python/apache_beam/transforms/core.py | 2 +- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 3cd9e92e0562..5221bd5beaf2 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -132,8 +132,6 @@ def __reduce_ex__(self, unused_version): return _InvalidUnpickledPCollection, () def to_runner_api(self, context): - from apache_beam.portability.api import beam_runner_api_pb2 - from apache_beam.internal import pickler return beam_runner_api_pb2.PCollection( unique_name='%d%s.%s' % ( len(self.producer.full_label), self.producer.full_label, self.tag), @@ -144,7 +142,6 @@ def to_runner_api(self, context): @staticmethod def from_runner_api(proto, context): - from apache_beam.internal import pickler # Producer and tag will be filled in later, the key point is that the # same object is returned for the same pcollection id. return PCollection(None, element_type=pickler.loads(proto.coder_id)) @@ -363,9 +360,9 @@ def from_runner_api(proto, unused_context): assert proto.view_fn.spec.urn == urns.PICKLED_PYTHON_VIEWFN assert proto.window_mapping_fn.spec.urn == urns.PICKLED_WINDOW_MAPPING_FN return SideInputData( - proto.access_pattern.urn, - pickler.loads(proto.window_mapping_fn.spec.payload), - *pickler.loads(proto.view_fn.spec.payload)) + proto.access_pattern.urn, + pickler.loads(proto.window_mapping_fn.spec.payload), + *pickler.loads(proto.view_fn.spec.payload)) class AsSingleton(AsSideInput): diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index a546abed18eb..4bf1189ee759 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -236,8 +236,9 @@ def __init__(self, name, transforms, def __repr__(self): must_follow = ', '.join(prev.name for prev in self.must_follow) - downstream_side_inputs = ', '.join(str(si) for si in self.downstream_side_inputs) - return "%s\n %s\n must follow: %s\n downstream_side_inputs: %s" % ( + downstream_side_inputs = ', '.join( + str(si) for si in self.downstream_side_inputs) + return "%s\n %s\n must follow: %s\n downstream_side_inputs: %s" % ( self.name, '\n'.join(["%s:%s" % (transform.unique_name, transform.spec.urn) for transform in self.transforms]), @@ -713,9 +714,9 @@ def extract_endpoints(stage): payload = proto_utils.parse_Bytes( transform.spec.payload, beam_runner_api_pb2.ParDoPayload) for tag, si in payload.side_inputs.items(): - data_side_input[transform.unique_name, tag] = ( - 'materialize:' + transform.inputs[tag], - beam.pvalue.SideInputData.from_runner_api(si, None)) + data_side_input[transform.unique_name, tag] = ( + 'materialize:' + transform.inputs[tag], + beam.pvalue.SideInputData.from_runner_api(si, None)) return data_input, data_side_input, data_output logging.info('Running %s', stage.name) @@ -1078,6 +1079,7 @@ class GrpcStateServicer( StateServicer, beam_fn_api_pb2_grpc.BeamFnStateServicer): def State(self, request_stream, context=None): # Note that this eagerly mutates state, assuming any failures are fatal. + # Thus it is safe to ignore instruction_reference. for request in request_stream: if request.get: yield beam_fn_api_pb2.StateResponse( @@ -1085,12 +1087,12 @@ def State(self, request_stream, context=None): get=beam_fn_api_pb2.StateGetResponse( data=self.blocking_get(request.state_key))) elif request.append: - data=self.blocking_append(request.state_key, request.append.data) + self.blocking_append(request.state_key, request.append.data) yield beam_fn_api_pb2.StateResponse( id=request.id, append=beam_fn_api_pb2.AppendResponse()) elif request.clear: - data=self.blocking_clear(request.state_key) + self.blocking_clear(request.state_key) yield beam_fn_api_pb2.StateResponse( id=request.id, clear=beam_fn_api_pb2.ClearResponse()) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 44aa2aa167c7..749de7bb88f4 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -185,6 +185,7 @@ def __getitem__(self, window): window=self._target_window_coder.encode(target_window))) element_coder_impl = self._element_coder.get_impl() state_handler = self._state_handler + class AllElements(object): def __iter__(self): # TODO(robertwb): Support pagination. diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6635b3081eb5..99075b01449c 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -169,6 +169,7 @@ def __init__(self, state_stub): def start(self): self._done = False + def request_iter(): while True: request = self._requests.get() @@ -176,14 +177,16 @@ def request_iter(): break yield request responses = self._state_stub.State(request_iter()) + def pull_responses(): try: for response in responses: self._responses.put(response) if self._done: break - except: + except: # pylint: disable=bare-except self._exc_info = sys.exc_info() + raise reader = threading.Thread(target=pull_responses, name='read_state') reader.daemon = True reader.start() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 58d182d789a2..19707313600d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -861,7 +861,7 @@ def to_runner_api_parameter(self, context): # transformation is currently irreversible given how # remove_objects_from_args and insert_values_in_args # are currently implemented. - side_inputs = { + side_inputs={ "side%s" % ix: si.to_runner_api(context) for ix, si in enumerate(self.side_inputs)})) From abc8cd910559a5a8ac40f752d59091e6a2c38cd3 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 19 Oct 2017 17:48:59 -0700 Subject: [PATCH 05/16] more cleanup --- .../runners/portability/fn_api_runner.py | 22 ------- .../runners/worker/bundle_processor.py | 57 ++++--------------- 2 files changed, 11 insertions(+), 68 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 4bf1189ee759..838ce1ea78d4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1031,28 +1031,6 @@ def _reencode_elements(elements, element_coder): # These classes are used to interact with the worker. - class SimpleState(object): # TODO(robertwb): Inherit from GRPC servicer. - - def __init__(self): - self._all = collections.defaultdict(list) - - def Get(self, state_key): - return beam_fn_api_pb2.Elements.Data( - data=''.join(self._all[self._to_key(state_key)])) - - def Append(self, state_key, data): - self._all[self._to_key(state_key)].extend(data) - - def Clear(self, state_key): - try: - del self._all[self._to_key(state_key)] - except KeyError: - pass - - @staticmethod - def _to_key(state_key): - return state_key.window, state_key.key - class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer): def __init__(self): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 749de7bb88f4..00bba5c5c47e 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -63,10 +63,6 @@ OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1' -def side_input_tag(transform_id, tag): - return str("%d[%s][%s]" % (len(transform_id), transform_id, tag)) - - class RunnerIOOperation(operations.Operation): """Common baseclass for runner harness IO operations.""" @@ -473,17 +469,9 @@ def create(factory, transform_id, transform_proto, parameter, consumers): @BeamTransformFactory.register_urn(OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN, None) -def create(factory, transform_id, transform_proto, parameter, consumers): - dofn_data = pickler.loads(parameter) - if len(dofn_data) == 2: - # Has side input data. - serialized_fn, side_input_data = dofn_data - else: - # No side input data. - serialized_fn, side_input_data = parameter, [] +def create(factory, transform_id, transform_proto, serialized_fn, consumers): return _create_pardo_operation( - factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data) + factory, transform_id, transform_proto, consumers, serialized_fn) @BeamTransformFactory.register_urn( @@ -491,34 +479,14 @@ def create(factory, transform_id, transform_proto, parameter, consumers): def create(factory, transform_id, transform_proto, parameter, consumers): assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO serialized_fn = parameter.do_fn.spec.payload - dofn_data = pickler.loads(serialized_fn) - if len(dofn_data) == 2: - # Has side input data. - serialized_fn, side_input_data = dofn_data - else: - # No side input data. - side_input_data = [] return _create_pardo_operation( factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data, parameter.side_inputs) + serialized_fn, parameter.side_inputs) def _create_pardo_operation( factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data, side_inputs_proto=None): - - fn, args, kwargs, tags_and_types, windowing = pickler.loads(serialized_fn) - - def create_side_input(tag, coder): - # TODO(robertwb): Extract windows (and keys) out of element data. - # TODO(robertwb): Extract state key from ParDoPayload. - return operation_specs.WorkerSideInputSource( - tag=tag, - source=SideInputSource( - factory.state_handler, - beam_fn_api_pb2.StateKey.MultimapSideInput( - key=side_input_tag(transform_id, tag)), - coder=coder)) + serialized_fn, side_inputs_proto=None): if side_inputs_proto: tagged_side_inputs = [ @@ -530,9 +498,7 @@ def create_side_input(tag, coder): StateBackedSideInputMap(factory.state_handler, transform_id, tag, si) for tag, si in tagged_side_inputs] else: - side_inputs = [ - create_side_input(tag, coder) for tag, coder in side_input_data] - side_input_maps = None + side_input_maps = [] output_tags = list(transform_proto.outputs.keys()) @@ -546,21 +512,22 @@ def mutate_tag(tag): else: return tag - if not windowing: + dofn_data = pickler.loads(serialized_fn) + if not dofn_data[-1]: # Windowing not set. side_input_tags = side_inputs_proto or () pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items() if tag not in side_input_tags] windowing = factory.context.windowing_strategies.get_by_id( factory.descriptor.pcollections[pcoll_id].windowing_strategy_id) + serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing,)) output_coders = factory.get_output_coders(transform_proto) spec = operation_specs.WorkerDoFn( - serialized_fn=pickler.dumps( - (fn, args, kwargs, tags_and_types, windowing)), + serialized_fn=serialized_fn, output_tags=[mutate_tag(tag) for tag in output_tags], input=None, - side_inputs=side_inputs, + side_inputs=[], # Obsoleted by side_input_maps. output_coders=[output_coders[tag] for tag in output_tags]) return factory.augment_oldstyle_op( operations.DoOperation( @@ -577,10 +544,8 @@ def mutate_tag(tag): def _create_simple_pardo_operation( factory, transform_id, transform_proto, consumers, dofn): serialized_fn = pickler.dumps((dofn, (), {}, [], None)) - side_input_data = [] return _create_pardo_operation( - factory, transform_id, transform_proto, consumers, - serialized_fn, side_input_data) + factory, transform_id, transform_proto, consumers, serialized_fn) @BeamTransformFactory.register_urn( From f79e1d1957e06902f10a2ce177b684245834a9ce Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 16:58:27 -0700 Subject: [PATCH 06/16] reviewer comments --- sdks/python/apache_beam/pvalue.py | 6 ++- .../runners/portability/fn_api_runner_test.py | 5 --- .../apache_beam/runners/worker/sdk_worker.py | 41 +++++++++++++------ sdks/python/apache_beam/transforms/core.py | 2 + 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 5221bd5beaf2..3e414d3d4abd 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -318,7 +318,6 @@ def from_runner_api(proto, context): class _UnpickledSideInput(AsSideInput): def __init__(self, side_input_data): self._data = side_input_data - # For older runners. self._window_mapping_fn = side_input_data.window_mapping_fn @staticmethod @@ -326,6 +325,11 @@ def _from_runtime_iterable(it, options): return options['data'].view_fn(it) def _view_options(self): + return { + 'data': self._data, + # For non-fn-api runners. + 'window_mapping_fn': self._data.window_mapping_fn, + } base = super(_UnpickledSideInput, self)._view_options() base['data'] = self._data return base diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 70362f5f7eab..ea9ed1a3bf6d 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -83,11 +83,6 @@ def cross_product(elem, sides): (9, range(7, 10))]), label='windowed') - -# def test_pardo_unfusable_side_inputs(self): -# # TODO(BEAM-1348): Enable once side inputs are supported in fn API. -# pass - def test_assert_that(self): # TODO: figure out a way for fn_api_runner to parse and raise the # underlying exception. diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 99075b01449c..55ecbcc217a5 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -162,7 +162,6 @@ def __init__(self, state_stub): self._lock = threading.Lock() self._state_stub = state_stub self._requests = queue.Queue() - self._responses = queue.Queue() self._responses_by_id = {} self._last_id = 0 self._exc_info = None @@ -181,7 +180,7 @@ def request_iter(): def pull_responses(): try: for response in responses: - self._responses.put(response) + self._responses_by_id[response.id].set(response) if self._done: break except: # pylint: disable=bare-except @@ -221,18 +220,36 @@ def blocking_clear(self, state_key, instruction_reference): def _blocking_request(self, request): request.id = self._next_id() + self._responses_by_id[request.id] = future = _Future() self._requests.put(request) - while request.id not in self._responses_by_id: - with self._lock: - if request.id not in self._responses_by_id: - response = self._responses.get(timeout=2) - self._responses_by_id[response.id] = response - if self._exc_info: - raise self._exc_info[0], self._exc_info[1], self._exc_info[2] - response = self._responses_by_id[response.id] - del self._responses_by_id[response.id] - return response + while not future.wait(timeout=1): + if self._exc_info: + raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + elif self._done: + raise RuntimeError() + del self._responses_by_id[request.id] + return future.get() def _next_id(self): self._last_id += 1 return str(self._last_id) + + +class _Future(object): + """A simple future object to implement blocking requests. + """ + def __init__(self): + self._event = threading.Event() + + def wait(self, timeout=None): + return self._event.wait(timeout) + + def get(self, timeout=None): + if self.wait(timeout): + return self._value + else: + raise LookupError() + + def set(self, value): + self._value = value + self._event.set() diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 19707313600d..fbc5f33c4260 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -876,6 +876,8 @@ def from_runner_api_parameter(pardo_payload, context): elif windowing: raise NotImplementedError('explicit windowing') result = ParDo(fn, *args, **kwargs) + # This is an ordered list stored as a dict (see the comments in + # to_runner_api_parameter above). indexed_side_inputs = [ (int(ix[4:]), pvalue.AsSideInput.from_runner_api(si, context)) for ix, si in pardo_payload.side_inputs.items()] From 80d5acd2cde4a83fb08c3d6a9cc1d337cddab25c Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 17:50:17 -0700 Subject: [PATCH 07/16] lint --- sdks/python/apache_beam/pvalue.py | 9 +++------ .../apache_beam/runners/worker/bundle_processor.py | 1 - 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 3e414d3d4abd..1ee41ed7c5e0 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -326,13 +326,10 @@ def _from_runtime_iterable(it, options): def _view_options(self): return { - 'data': self._data, - # For non-fn-api runners. - 'window_mapping_fn': self._data.window_mapping_fn, + 'data': self._data, + # For non-fn-api runners. + 'window_mapping_fn': self._data.window_mapping_fn, } - base = super(_UnpickledSideInput, self)._view_options() - base['data'] = self._data - return base def _side_input_data(self): return self._data diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 00bba5c5c47e..689eab7b8422 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -493,7 +493,6 @@ def _create_pardo_operation( (tag, beam.pvalue.SideInputData.from_runner_api(si, factory.context)) for tag, si in side_inputs_proto.items()] tagged_side_inputs.sort(key=lambda tag_si: int(tag_si[0][4:])) - side_inputs = None, side_input_maps = [ StateBackedSideInputMap(factory.state_handler, transform_id, tag, si) for tag, si in tagged_side_inputs] From 87c7c568ede703397fabeafd25bc3cb4595b733a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 18:04:17 -0700 Subject: [PATCH 08/16] Type hint fixes. --- .../apache_beam/runners/direct/transform_evaluator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 16a299118cae..bf5968070a0d 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -584,10 +584,9 @@ def start_bundle(self): assert len(self._outputs) == 1 self.output_pcollection = list(self._outputs)[0] - # The input type of a GroupByKey will be KV[Any, Any] or more specific. - kv_type_hint = ( - self._applied_ptransform.transform.get_type_hints().input_types[0]) - self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) + # The output type of a GroupByKey will be KV[Any, Any] or more specific. + kv_type_hint = self._applied_ptransform.outputs[None].element_type + self.key_coder = coders.registry.get_coder(kv_type_hint.tuple_types[0]) def process_timer(self, timer_firing): # We do not need to emit a KeyedWorkItem to process_element(). From 1159fd525926b9e4beb96f1561c6b93b80c543ae Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 22:58:28 -0700 Subject: [PATCH 09/16] Fix generated files license exclusion. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3509407f6661..a656ca4490e7 100644 --- a/pom.xml +++ b/pom.xml @@ -1561,7 +1561,7 @@ **/.settings/**/* - **/apache_beam/runners/api/*_pb2*.py + **/apache_beam/portability/api/*_pb2*.py From f1eda7b8140a00c486cdc8a6d8a48a59a8605c48 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 23:07:46 -0700 Subject: [PATCH 10/16] another license check --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a656ca4490e7..bc4a4280f17c 100644 --- a/pom.xml +++ b/pom.xml @@ -1540,7 +1540,7 @@ **/target/**/* **/dependency-reduced-pom.xml **/hs_err_pid*.log - .github/**/* + src/.github/**/* **/*.iml **/.idea/**/* **/*.egg-info/**/* From db6db04184418fc0a38b4a6ca7c14a98d2c03852 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2017 23:15:30 -0700 Subject: [PATCH 11/16] another license check attempt --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index bc4a4280f17c..8f08db1e171d 100644 --- a/pom.xml +++ b/pom.xml @@ -1540,7 +1540,7 @@ **/target/**/* **/dependency-reduced-pom.xml **/hs_err_pid*.log - src/.github/**/* + **/.github/**/* **/*.iml **/.idea/**/* **/*.egg-info/**/* From eb5e887a9e10df059a63e31f9679860ba1ec0f89 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Oct 2017 01:41:40 -0700 Subject: [PATCH 12/16] fallback type error --- .../apache_beam/runners/direct/transform_evaluator.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index bf5968070a0d..2f3ac4fd9895 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -585,7 +585,11 @@ def start_bundle(self): self.output_pcollection = list(self._outputs)[0] # The output type of a GroupByKey will be KV[Any, Any] or more specific. - kv_type_hint = self._applied_ptransform.outputs[None].element_type + # TODO(BEAM-2717): Infer coders earlier. + kv_type_hint = ( + self._applied_ptransform.outputs[None].element_type + or + self._applied_ptransform.transform.get_type_hints().input_types[0][0]) self.key_coder = coders.registry.get_coder(kv_type_hint.tuple_types[0]) def process_timer(self, timer_firing): From fb28ed0bf2b60919ba2a1dd1740ae7e365930820 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Oct 2017 16:03:25 -0700 Subject: [PATCH 13/16] isort changes --- sdks/python/apache_beam/io/avroio_test.py | 6 +++--- .../runners/experimental/python_rpc_direct/server.py | 2 +- .../python/apache_beam/runners/portability/fn_api_runner.py | 3 ++- .../runners/portability/universal_local_runner.py | 2 +- sdks/python/apache_beam/runners/worker/data_plane_test.py | 2 +- sdks/python/apache_beam/runners/worker/log_handler_test.py | 2 +- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- sdks/python/apache_beam/runners/worker/sdk_worker_test.py | 2 +- .../apache_beam/typehints/native_type_compatibility_test.py | 3 ++- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 3 ++- 10 files changed, 15 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 8a344275a1a8..5aa225f66c84 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -23,9 +23,9 @@ import avro.datafile import avro.schema +import hamcrest as hc from avro.datafile import DataFileWriter from avro.io import DatumWriter -import hamcrest as hc import apache_beam as beam from apache_beam import Create @@ -33,8 +33,8 @@ from apache_beam.io import filebasedsource from apache_beam.io import iobase from apache_beam.io import source_test_utils -from apache_beam.io.avroio import _AvroSink as AvroSink # For testing -from apache_beam.io.avroio import _AvroSource as AvroSource # For testing +from apache_beam.io.avroio import _AvroSink as AvroSink # For testing +from apache_beam.io.avroio import _AvroSource as AvroSource # For testing from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py index 4986dc40abc9..c891ad39a93d 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -19,9 +19,9 @@ """ import time import uuid -from concurrent import futures import grpc +from concurrent import futures from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 838ce1ea78d4..3f76ead7764a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -24,9 +24,9 @@ import Queue as queue import threading import time -from concurrent import futures import grpc +from concurrent import futures import apache_beam as beam # pylint: disable=ungrouped-imports from apache_beam.coders import WindowedValueCoder @@ -51,6 +51,7 @@ from apache_beam.utils import proto_utils from apache_beam.utils import urns + # This module is experimental. No backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 579983c5af63..135b1ecf57d8 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -26,9 +26,9 @@ import time import traceback import uuid -from concurrent import futures import grpc +from concurrent import futures from google.protobuf import text_format from apache_beam.portability.api import beam_job_api_pb2 diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index 07ba8fd44f1f..9ff114338176 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -25,9 +25,9 @@ import sys import threading import unittest -from concurrent import futures import grpc +from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index 647b8b7e8b4a..5a7b5cf729f8 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -18,9 +18,9 @@ import logging import unittest -from concurrent import futures import grpc +from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 55ecbcc217a5..86a9011446b9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -27,9 +27,9 @@ import sys import threading import traceback -from concurrent import futures import grpc +from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index 2532341df937..d3bd249aeed9 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -23,9 +23,9 @@ import logging import unittest -from concurrent import futures import grpc +from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index 4171507f3459..0b479e45a8e5 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -17,9 +17,10 @@ """Test for Beam type compatibility library.""" -import typing import unittest +import typing + from apache_beam.typehints import native_type_compatibility from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 2581457e7ea1..d9af2e9396a2 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,10 @@ """Unit tests for the type-hint objects and decorators.""" import inspect -import typing import unittest +import typing + import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints From 3b65cc77a085a7fd8553e84f8b47b6e71c348acb Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Oct 2017 21:16:40 -0700 Subject: [PATCH 14/16] Revert "isort changes" This reverts commit fb28ed0bf2b60919ba2a1dd1740ae7e365930820. --- sdks/python/apache_beam/io/avroio_test.py | 6 +++--- .../runners/experimental/python_rpc_direct/server.py | 2 +- .../python/apache_beam/runners/portability/fn_api_runner.py | 3 +-- .../runners/portability/universal_local_runner.py | 2 +- sdks/python/apache_beam/runners/worker/data_plane_test.py | 2 +- sdks/python/apache_beam/runners/worker/log_handler_test.py | 2 +- sdks/python/apache_beam/runners/worker/sdk_worker.py | 2 +- sdks/python/apache_beam/runners/worker/sdk_worker_test.py | 2 +- .../apache_beam/typehints/native_type_compatibility_test.py | 3 +-- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 3 +-- 10 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 5aa225f66c84..8a344275a1a8 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -23,9 +23,9 @@ import avro.datafile import avro.schema -import hamcrest as hc from avro.datafile import DataFileWriter from avro.io import DatumWriter +import hamcrest as hc import apache_beam as beam from apache_beam import Create @@ -33,8 +33,8 @@ from apache_beam.io import filebasedsource from apache_beam.io import iobase from apache_beam.io import source_test_utils -from apache_beam.io.avroio import _AvroSink as AvroSink # For testing -from apache_beam.io.avroio import _AvroSource as AvroSource # For testing +from apache_beam.io.avroio import _AvroSink as AvroSink # For testing +from apache_beam.io.avroio import _AvroSource as AvroSource # For testing from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py index c891ad39a93d..4986dc40abc9 100644 --- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py +++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py @@ -19,9 +19,9 @@ """ import time import uuid +from concurrent import futures import grpc -from concurrent import futures from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import Pipeline diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 3f76ead7764a..838ce1ea78d4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -24,9 +24,9 @@ import Queue as queue import threading import time +from concurrent import futures import grpc -from concurrent import futures import apache_beam as beam # pylint: disable=ungrouped-imports from apache_beam.coders import WindowedValueCoder @@ -51,7 +51,6 @@ from apache_beam.utils import proto_utils from apache_beam.utils import urns - # This module is experimental. No backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 135b1ecf57d8..579983c5af63 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -26,9 +26,9 @@ import time import traceback import uuid +from concurrent import futures import grpc -from concurrent import futures from google.protobuf import text_format from apache_beam.portability.api import beam_job_api_pb2 diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py index 9ff114338176..07ba8fd44f1f 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane_test.py +++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py @@ -25,9 +25,9 @@ import sys import threading import unittest +from concurrent import futures import grpc -from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py index 5a7b5cf729f8..647b8b7e8b4a 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler_test.py +++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py @@ -18,9 +18,9 @@ import logging import unittest +from concurrent import futures import grpc -from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 86a9011446b9..55ecbcc217a5 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -27,9 +27,9 @@ import sys import threading import traceback +from concurrent import futures import grpc -from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py index d3bd249aeed9..2532341df937 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py @@ -23,9 +23,9 @@ import logging import unittest +from concurrent import futures import grpc -from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_fn_api_pb2_grpc diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index 0b479e45a8e5..4171507f3459 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -17,9 +17,8 @@ """Test for Beam type compatibility library.""" -import unittest - import typing +import unittest from apache_beam.typehints import native_type_compatibility from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index d9af2e9396a2..2581457e7ea1 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,8 @@ """Unit tests for the type-hint objects and decorators.""" import inspect -import unittest - import typing +import unittest import apache_beam as beam from apache_beam import pvalue From 6162ec6ea8f57002ef200598fe6efa2cbf669fbf Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Oct 2017 21:17:10 -0700 Subject: [PATCH 15/16] smaller isort changes --- .../apache_beam/typehints/native_type_compatibility_test.py | 3 ++- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index 4171507f3459..0b479e45a8e5 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -17,9 +17,10 @@ """Test for Beam type compatibility library.""" -import typing import unittest +import typing + from apache_beam.typehints import native_type_compatibility from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 2581457e7ea1..d9af2e9396a2 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,10 @@ """Unit tests for the type-hint objects and decorators.""" import inspect -import typing import unittest +import typing + import apache_beam as beam from apache_beam import pvalue from apache_beam import typehints From cb1a6160c4bfdbaf6b1d50aa62a1561a6146fb79 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 21 Oct 2017 21:36:47 -0700 Subject: [PATCH 16/16] more isort suggestiojns --- sdks/python/apache_beam/pvalue.py | 1 - .../apache_beam/typehints/native_type_compatibility_test.py | 3 +-- sdks/python/apache_beam/typehints/typed_pipeline_test.py | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 1ee41ed7c5e0..31922f37bf9a 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -34,7 +34,6 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import urns - __all__ = [ 'PCollection', 'TaggedOutput', diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py index 0b479e45a8e5..4171507f3459 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py @@ -17,9 +17,8 @@ """Test for Beam type compatibility library.""" -import unittest - import typing +import unittest from apache_beam.typehints import native_type_compatibility from apache_beam.typehints import typehints diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index d9af2e9396a2..2581457e7ea1 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -17,9 +17,8 @@ """Unit tests for the type-hint objects and decorators.""" import inspect -import unittest - import typing +import unittest import apache_beam as beam from apache_beam import pvalue