Skip to content

[BEAM-4236, BEAM-2927] Make Python SDK side inputs work with non well known coders and also work with Dataflow#5302

Merged
lukecwik merged 3 commits intoapache:masterfrom
lukecwik:side_input2
May 16, 2018
Merged

[BEAM-4236, BEAM-2927] Make Python SDK side inputs work with non well known coders and also work with Dataflow#5302
lukecwik merged 3 commits intoapache:masterfrom
lukecwik:side_input2

Conversation

@robertwb
Copy link
Contributor

@robertwb robertwb commented May 8, 2018

Several issues addressed:

  • Lack of support for mapping windows
  • relying on reading the side input specification that was being serialized within the DoFn and not read from the pipeline description when using the Fn API
  • eagerly pre-fetching the side input when the process bundle had zero elements which lead to a state request for a bundle containing zero elements

Streaming Dataflow job w/ Fn API side inputs: 2018-05-09_14_40_12-2860575533296523734 (note SIDE_INPUT being logged)
Batch Dataflow job w/ legacy side inputs: 2018-05-09_14_30_24-10730914164827927481


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run ./gradlew build to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.


@staticmethod
def from_runner_api(proto, context):
def from_runner_api(proto, coder):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the coder comes from the PCollection and can be mutated, it's probably better to remove the coder from the side input object altogether, keeping the signature of this method as is, and instead passing it when we bind the PCollection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this could also obviate the need for passing the transform proto itself in the PTransform.from_runner_api too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much cleaner, trying this out now.

input_args = input_args if input_args else []
input_kwargs = input_kwargs if input_kwargs else {}

if not self.has_windowed_inputs:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an important optimization for batch pipelines that use side inputs (e.g the TFX stuff). I see now how we were requesting side inputs in start(), but they're not window dependent. Perhaps we could defer this optimization to the first element that is processed. (At least a TODO would be in order.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like you suggest, checking for at least one element or loading on first element should work, just trying to get everything working E2E before I try to improve this optimization.

def _pardo_fn_data(self):
si_tags_and_types = None
windowing = None
return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is is the cause of the

  File "/usr/local/google/home/lcwik/git/beam/sdks/python/apache_beam/runners/worker/operations.py", line 360, in start
    pickler.loads(self.spec.serialized_fn))
ValueError: need more than 4 values to unpack

failure. I don't remember if we completely removed its use in the legacy worker, but if so, we can probably remove it there too rather than re-introduce it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to remove this in the few places it was referenced.

@charlesccychen
Copy link
Contributor

The following sample fails to run at this commit:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
 
from __future__ import absolute_import
 
import argparse
import logging
 
import six
 
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.util import assert_that, equal_to
 
 
def run(argv=None):
  """Build and run the pipeline."""
 
  pipeline_options = PipelineOptions(argv)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  pipeline_options.view_as(StandardOptions).streaming = True
  p = beam.Pipeline(options=pipeline_options)
 
  main = p | 'MainCreate' >> beam.Create(['a', 'b'])
 
  single_side = p | 'SingletonSide' >> beam.Create(['x'])
  main | beam.Map(lambda x, side: (x, side),
                      beam.pvalue.AsSingleton(single_side))
  # assert_that(
  #     main | beam.Map(lambda x, side: (x, side),
  #                     beam.pvalue.AsSingleton(single_side)),
  #     equal_to([('a', 'x'), ('b', 'x')]),
  #     label='AssertSingleton')
 
  # iter_side = p | 'IterSide' >> beam.Create(['x', 'y', 'z'])
  # assert_that(
  #     main | beam.Map(lambda x, side: (x, sorted(side)),
  #                     beam.pvalue.AsIter(iter_side)),
  #     equal_to([('a', ['x', 'y', 'z']), ('b', ['x', 'y', 'z'])]),
  #     label='AssertIter')
 
  # multimap_side = p | 'MultimapSide' >> beam.Create(
  #     [('a', 'aa'), ('b', 'bb'), ('a', 'aaa')])
  # assert_that(
  #     main | beam.Map(lambda x, side: (x, sorted(side[x])),
  #                     beam.pvalue.AsMultiMap(multimap_side)),
  #     equal_to([('a', ['aa', 'aaa']), ('b', ['bb'])]),
  #     label='AssertMultimap')
 
  result = p.run()
  result.wait_until_finish()
 
 
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The following error is emitted:

INFO:root:LCWIKA [u'ref_PCollection_PCollection_2', u'ref_PCollection_PCollection_1']
INFO:root:LCWIKB [u'ref_PCollection_PCollection_3', u'ref_PCollection_PCollection_2', u'ref_PCollection_PCollection_1']
INFO:root:LCWIKC {u'side0': unique_name: "18SingletonSide/Read.None"
coder_id: "eNprYEpOyczJ0QMRXPE5+Ykp8SWVBalchQyhXMElRZl56SFAbiFjayFTUCGzHgCKDA/Z"
is_bounded: BOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
}
INFO:root:LCWIKD [u'ref_Coder_GlobalWindowCoder_1']
Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/examples/streaming_wordcount.py", line 72, in <module>
    run()
  File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/examples/streaming_wordcount.py", line 66, in run
    result = p.run()
  File "apache_beam/pipeline.py", line 389, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "apache_beam/pipeline.py", line 618, in from_runner_api
    context.transforms.get_by_id(root_transform_id)]
  File "apache_beam/runners/pipeline_context.py", line 85, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 863, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 85, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 854, in from_runner_api
    transform=ptransform.PTransform.from_runner_api(proto, context),
  File "apache_beam/transforms/ptransform.py", line 560, in from_runner_api
    context)
  File "apache_beam/transforms/core.py", line 904, in from_runner_api_parameter
    for tag in pardo_payload.side_inputs.keys()
  File "apache_beam/transforms/core.py", line 904, in <dictcomp>
    for tag in pardo_payload.side_inputs.keys()
  File "apache_beam/runners/pipeline_context.py", line 85, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
KeyError: u'eNprYEpOyczJ0QMRXPE5+Ykp8SWVBalchQyhXMElRZl56SFAbiFjayFTUCGzHgCKDA/Z'

@lukecwik lukecwik force-pushed the side_input2 branch 2 times, most recently from 62ea484 to 1bc4716 Compare May 9, 2018 16:37
@lukecwik lukecwik changed the title Side input2 [BEAM-4236, BEAM-2927] Make Python SDK side inputs work with non well known coders and also work with Dataflow May 9, 2018
@lukecwik lukecwik requested a review from aaltay May 9, 2018 17:38
@lukecwik
Copy link
Member

lukecwik commented May 9, 2018

R: @charlesccychen

@lukecwik
Copy link
Member

lukecwik commented May 9, 2018

CC: @pabloem

@pabloem
Copy link
Member

pabloem commented May 9, 2018

Question:
Is this going to change how batch side inputs work in Python Dataflow?

Where can I find the c9ode that will fetch the side input from now on?

Will we not be using the existing code path in runners/worker/sideinputs.py ?

@lukecwik
Copy link
Member

lukecwik commented May 9, 2018

@pabloem Your right, this breaks the way that the Dataflow SDK does legacy worker batch side inputs.

@lukecwik
Copy link
Member

lukecwik commented May 9, 2018

Reverted the removal of tags_and_types and restored the ability for the legacy batch Python Dataflow code paths.

@lukecwik lukecwik force-pushed the side_input2 branch 3 times, most recently from e7c750f to 93fcc6c Compare May 9, 2018 21:24
@lukecwik
Copy link
Member

lukecwik commented May 9, 2018

The hardest part was understanding the changes that I could make that would be compatible with Dataflow.

@charlesccychen
Copy link
Contributor

For the record, the sample in #5302 (comment) still fails at this PR for an unrelated reason, likely having to do with PTransform replacement logic.

Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/examples/streaming_wordcount.py", line 72, in <module>
    run()
  File "/usr/local/google/home/ccy/git/beam/sdks/python/apache_beam/examples/streaming_wordcount.py", line 66, in run
    result = p.run()
  File "apache_beam/pipeline.py", line 389, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "apache_beam/pipeline.py", line 402, in run
    return self.runner.run_pipeline(self)
  File "apache_beam/runners/dataflow/dataflow_runner.py", line 347, in run_pipeline
    super(DataflowRunner, self).run_pipeline(pipeline)
  File "apache_beam/runners/runner.py", line 170, in run_pipeline
    pipeline.visit(RunVisitor(self))
  File "apache_beam/pipeline.py", line 430, in visit
    self._root_transform().visit(visitor, self, visited)
  File "apache_beam/pipeline.py", line 785, in visit
    part.visit(visitor, pipeline, visited)
  File "apache_beam/pipeline.py", line 788, in visit
    visitor.visit_transform(self)
  File "apache_beam/runners/runner.py", line 165, in visit_transform
    self.runner.run_transform(transform_node)
  File "apache_beam/runners/runner.py", line 208, in run_transform
    return m(transform_node)
  File "apache_beam/runners/dataflow/dataflow_runner.py", line 581, in run_ParDo
    input_step = self._cache.get_pvalue(transform_node.inputs[0])
  File "apache_beam/runners/runner.py", line 286, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: (u'SingletonSide/Decode Values', None)

@charlesccychen
Copy link
Contributor

The issue with the KeyError above happens because of incorrect refcount numbers in PValueCache. The entry is garbage-collected because its refcount reaches zero before it can be used in the side input. Does anyone know why we have refcounts in PValueCache at all? My hunch is that this is not necessary, and is an artifact of when PValueCache was used to hold actual result elements. I am inclined to remove this feature of PValueCache. Does anyone have an opinion here?

(CC: @robertwb @aaltay)

@aaltay
Copy link
Member

aaltay commented May 10, 2018

I don't know why PValueCache has a refcount. I have the same feeling as you about why it has this feature.

@lukecwik
Copy link
Member

An issue with virtualenv exists preventing the Dataflow Python postcommit from running, more details on BEAM-4249

@aaltay
Copy link
Member

aaltay commented May 10, 2018

@charlesccychen could you run the equivalent of python post commit in your own environment. Let's not wait until we get Jenkin's fixes.

@charlesccychen
Copy link
Contributor

charlesccychen commented May 11, 2018

@aaltay: Yes.

@robertwb: I noticed that you very recently touched the ref counting logic in 00f3e22fccb. Can you chime in on whether we still need refcounts in PValueCache?

@robertwb
Copy link
Contributor Author

The refcounts should only be needed for the old DirectRunner.

@lukecwik
Copy link
Member

Run Python Dataflow ValidatesRunner

@lukecwik
Copy link
Member

run python precommit

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All LGTM, thanks!


safe_coders = {}

def fix_pcoll_coder(pcoll, pipeline_components):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this length_prefix_unknown_coders?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lukecwik lukecwik merged commit 1e69890 into apache:master May 16, 2018
udim added a commit to udim/beam that referenced this pull request May 22, 2019
This might also resolve BEAM-4782.

Removes unused _input_element_coder, whose usage was removed in
PR apache#5302.
udim added a commit to udim/beam that referenced this pull request May 23, 2019
This might also resolve BEAM-4782.

Removes unused _input_element_coder, whose usage was removed in
PR apache#5302.

Includes squashed commits by @robertwb.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants