Skip to content

Conversation

@robertwb
Copy link
Contributor

@robertwb robertwb commented Sep 8, 2022

This works around bugs in dill regarding functions defined with eval and exec used for the PythonCallable types here.

Also fix flag that was placed on the wrong binary.

This is a cherry-pick for #23073.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

This works around bugs in dill regarding functions defined with
eval and exec used for the PythonCallable types here.

Also fix flag that was placed on the wrong binary.
@robertwb
Copy link
Contributor Author

robertwb commented Sep 8, 2022

R: @ihji @lostluck

@codecov
Copy link

codecov bot commented Sep 8, 2022

Codecov Report

Merging #23093 (0d62378) into release-2.42.0 (bf435b6) will increase coverage by 0.00%.
The diff coverage is 0.00%.

@@               Coverage Diff               @@
##           release-2.42.0   #23093   +/-   ##
===============================================
  Coverage           73.58%   73.58%           
===============================================
  Files                 716      716           
  Lines               95301    95301           
===============================================
+ Hits                70129    70131    +2     
+ Misses              23876    23874    -2     
  Partials             1296     1296           
Flag Coverage Δ
python 83.41% <0.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...beam/runners/portability/expansion_service_main.py 0.00% <0.00%> (ø)
...beam/runners/portability/local_job_service_main.py 14.43% <ø> (-0.42%) ⬇️
sdks/python/apache_beam/io/localfilesystem.py 90.97% <0.00%> (-0.76%) ⬇️
sdks/python/apache_beam/runners/direct/executor.py 96.46% <0.00%> (-0.55%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.54% <0.00%> (+0.12%) ⬆️
sdks/python/apache_beam/internal/metrics/metric.py 94.00% <0.00%> (+1.00%) ⬆️
...che_beam/runners/interactive/interactive_runner.py 91.39% <0.00%> (+1.32%) ⬆️
.../python/apache_beam/testing/test_stream_service.py 92.85% <0.00%> (+4.76%) ⬆️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@lostluck
Copy link
Contributor

lostluck commented Sep 8, 2022

Python Precommit fails:

Task :sdks:python:test-suites:tox:py37:testPy37Cloud
.ss...ss........................... [ 52%]
.................s.....s........s......Exception in thread Thread-1769:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", line 296, in _run_job
self.result = self._invoke_runner()
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/local_job_service.py", line 320, in _invoke_runner
self._pipeline_proto, self.pipeline_options())
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 212, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 443, in run_stages
runner_execution_context, bundle_context_manager, bundle_input)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 776, in _execute_bundle
bundle_manager))
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1000, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py", line 1417, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py", line 838, in invoke_process
windowed_value, additional_args, additional_kwargs)
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/runners/common.py", line 983, in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/transforms/core.py", line 1877, in
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/apache_beam/testing/util.py", line 191, in _equal
raise BeamAssertException(msg)
apache_beam.testing.util.BeamAssertException: Failed assert: ['a'] == ['a', 'b'], unexpected elements ['b']
|


Task :sdks:python:test-suites:tox:py37:testPy37Cloud FAILED
============================= test session starts ==============================
platform linux -- Python 3.7.12, pytest-7.1.3, pluggy-1.0.0
cachedir: target/.tox-py37-cloud/py37-cloud/.pytest_cache
rootdir: /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python, configfile: pytest.ini
plugins: xdist-2.5.0, timeout-2.1.0, forked-1.4.0, requests-mock-1.10.0
timeout: 600.0s
timeout method: signal
timeout func_only: False
collected 6657 items / 6576 deselected / 2 skipped / 81 selected

apache_beam/coders/fast_coders_test.py ................................. [ 40%]
.. [ 43%]
apache_beam/coders/slow_coders_test.py ................................. [ 83%]
.. [ 86%]
apache_beam/examples/complete/autocomplete_it_test.py s [ 87%]
apache_beam/examples/complete/estimate_pi_it_test.py s [ 88%]
apache_beam/examples/complete/top_wikipedia_sessions_it_test.py s [ 90%]
apache_beam/examples/complete/game/hourly_team_score_it_test.py s [ 91%]
apache_beam/examples/complete/game/user_score_it_test.py s [ 92%]
apache_beam/examples/cookbook/bigquery_side_input_it_test.py s [ 93%]
apache_beam/examples/cookbook/coders_it_test.py s [ 95%]
apache_beam/runners/dataflow/dataflow_exercise_streaming_metrics_pipeline_test.py s [ 96%]
[ 96%]
apache_beam/runners/portability/stager_test.py .. [ 98%]
apache_beam/transforms/userstate_test.py F [100%]

=================================== FAILURES ===================================
____ StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer ____

self = <apache_beam.transforms.userstate_test.StatefulDoFnOnDirectRunnerTest testMethod=test_dynamic_timer_clear_then_set_timer>

@pytest.mark.no_xdist
@pytest.mark.timeout(3)
def test_dynamic_timer_clear_then_set_timer(self):
  class EmitTwoEvents(DoFn):
    EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)

    def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
      yield ('1', 'set')
      emit.set(1)

    @on_timer(EMIT_CLEAR_SET_TIMER)
    def emit_clear(self):
      yield ('1', 'clear')

  class DynamicTimerDoFn(DoFn):
    EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)

    def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
      if element[1] == 'set':
        emit.set(10, dynamic_timer_tag='emit1')
        emit.set(20, dynamic_timer_tag='emit2')
      if element[1] == 'clear':
        emit.set(30, dynamic_timer_tag='emit3')
        emit.clear(dynamic_timer_tag='emit3')
        emit.set(40, dynamic_timer_tag='emit3')
      return []

    @on_timer(EMIT_TIMER_FAMILY)
    def emit_callback(
        self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
      yield (tag, ts)

  with TestPipeline() as p:
    res = (
        p
        | beam.Create([('1', 'impulse')])
        | beam.ParDo(EmitTwoEvents())
        | beam.ParDo(DynamicTimerDoFn()))
  assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]))

apache_beam/transforms/userstate_test.py:1033:


apache_beam/pipeline.py:597: in exit
self.result = self.run()
apache_beam/testing/test_pipeline.py:114: in run
False if self.not_use_test_runner_api else test_runner_api))
apache_beam/pipeline.py:550: in run
self._options).run(False)
apache_beam/pipeline.py:574: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:131: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:201: in run_pipeline
options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:212: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:443: in run_stages
runner_execution_context, bundle_context_manager, bundle_input)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:776: in _execute_bundle
bundle_manager))
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1000: in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1309: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:597: in do_instruction
getattr(request, request_type), request.instruction_id)
apache_beam/runners/worker/sdk_worker.py:628: in process_bundle
instruction_id, request.process_bundle_descriptor_id)
apache_beam/runners/worker/sdk_worker.py:463: in get
self.data_channel_factory)
apache_beam/runners/worker/bundle_processor.py:873: in init
op.setup()
apache_beam/runners/worker/operations.py:881: in setup
operation_name=self.name_context.metrics_name())
apache_beam/runners/common.py:1366: in init
do_fn_signature = DoFnSignature(fn)
apache_beam/runners/common.py:293: in init
self._is_stateful_dofn = userstate.is_stateful_dofn(do_fn)
apache_beam/transforms/userstate.py:274: in is_stateful_dofn
all_state_specs, all_timer_specs = get_dofn_specs(dofn)
apache_beam/transforms/userstate.py:251: in get_dofn_specs
method = MethodWrapper(dofn, method_name)


self = <apache_beam.runners.common.MethodWrapper object at 0x7fafc7c10390>
obj_to_invoke = <apache_beam.runners.worker.bundle_processor.WindowIntoDoFn object at 0x7fafc7c107d0>
method_name = 'with_input_types'

def __init__(self, obj_to_invoke, method_name):
  """
  Initiates a ``MethodWrapper``.

  Args:
    obj_to_invoke: the object that contains the method. Has to either be a
                  `DoFn` object or a `RestrictionProvider` object.
    method_name: name of the method as a string.
  """

  if not isinstance(obj_to_invoke,
                    (DoFn, RestrictionProvider, WatermarkEstimatorProvider)):
    raise ValueError(
        '\'obj_to_invoke\' has to be either a \'DoFn\' or '
        'a \'RestrictionProvider\'. Received %r instead.' % obj_to_invoke)

  self.args, self.defaults = core.get_function_arguments(obj_to_invoke,
                                                         method_name)

  # TODO(BEAM-5878) support kwonlyargs on Python 3.
  self.method_value = getattr(obj_to_invoke, method_name)
  self.method_name = method_name

  self.has_userstate_arguments = False
  self.state_args_to_replace = {}  # type: Dict[str, core.StateSpec]
  self.timer_args_to_replace = {}  # type: Dict[str, core.TimerSpec]
  self.timestamp_arg_name = None  # type: Optional[str]
  self.window_arg_name = None  # type: Optional[str]
  self.key_arg_name = None  # type: Optional[str]
  self.restriction_provider = None
  self.restriction_provider_arg_name = None
  self.watermark_estimator_provider = None
  self.watermark_estimator_provider_arg_name = None
  self.dynamic_timer_tag_arg_name = None

  if hasattr(self.method_value, 'unbounded_per_element'):
    self.unbounded_per_element = True
  else:
  self.unbounded_per_element = False

E Failed: Timeout >3.0s

apache_beam/runners/common.py:168: Failed

@lostluck
Copy link
Contributor

lostluck commented Sep 8, 2022

Run Python PreCommit

@github-actions
Copy link
Contributor

github-actions bot commented Sep 8, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@robertwb
Copy link
Contributor Author

robertwb commented Sep 8, 2022

That failure looks irrelevant to these changes.

@lostluck lostluck merged commit 3d64d63 into apache:release-2.42.0 Sep 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants