Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1251] Upgrade from buffer to memoryview (again) #5887

Merged
merged 2 commits into from
Jul 8, 2018

Conversation

cclauss
Copy link

@cclauss cclauss commented Jul 4, 2018

This PR is a second shot at #4820 based on the work that @pitrou has done to allow python-snappy to support Python memoryviews. This has in turn has enabled @martindurant to release python-snappy 0.5.3 to PyPI.

This PR recommends the use of that release to upgrade from buffer to memoryview in avroio.py because buffer was removed in Python 3 in favor of memoryview which is supported in all Python versions that Beam supports.

Reviews please from @aaltay @holdenk @superbobry

flake8 testing of https://github.com/apache/beam on Python 3.6.3

$ flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics

./sdks/python/apache_beam/io/avroio.py:343:34: F821 undefined name 'buffer'
      result = snappy.decompress(buffer(data)[:-4])
                                 ^
1    F821 undefined name 'buffer'
1

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

Copy link
Contributor

@superbobry superbobry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@charlesccychen charlesccychen self-requested a review July 7, 2018 05:38
Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this is a great change! This LGTM. I resolved the merge conflict and will merge after tests pass.

@charlesccychen charlesccychen merged commit afd9cce into apache:master Jul 8, 2018
@charlesccychen
Copy link
Contributor

charlesccychen commented Jul 9, 2018

Heads up that this change caused issues in postcommits. Will investigate more in the morning.

======================================================================
ERROR: test_wordcount_it (apache_beam.examples.wordcount_it_test.WordCountIT)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/examples/wordcount_it_test.py", line 68, in test_wordcount_it
    wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/examples/wordcount.py", line 118, in run
    result = p.run()
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/pipeline.py", line 395, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/pipeline.py", line 408, in run
    return self.runner.run_pipeline(self)
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py", line 61, in run_pipeline
    self.result.wait_until_finish(duration=wait_duration)
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py", line 1135, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    def start(self):
  File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 180, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    self.output(wvalue.with_value((k, wvalue.value)))
  File "apache_beam/runners/worker/operations.py", line 180, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 404, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 405, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 577, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 585, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 583, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 681, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 404, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 405, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 577, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 585, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 583, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 681, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 404, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 405, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 577, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 585, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 626, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_(type(new_exn), new_exn, original_traceback)
  File "apache_beam/runners/common.py", line 583, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 469, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_per_window(
  File "apache_beam/runners/common.py", line 492, in apache_beam.runners.common.PerWindowInvoker._invoke_per_window
    [si[global_window] for si in self.side_inputs]))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/sideinputs.py", line 63, in __getitem__
    _FilteringIterable(self._iterable, target_window), self._view_options)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pvalue.py", line 338, in _from_runtime_iterable
    return options['data'].view_fn(it)
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/pvalue.py", line 315, in <lambda>
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python_Verify/src/sdks/python/apache_beam/pvalue.py", line 417, in _from_runtime_iterable
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/sideinputs.py", line 79, in __iter__
    for wv in self._iterable:
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sideinputs.py", line 180, in __iter__
    raise self.reader_exceptions.get()
TypeError: argument 1 must be string or read-only buffer, not memoryview [while running 'write/Write/WriteImpl/WriteBundles/WriteBundles']

https://builds.apache.org/job/beam_PostCommit_Python_Verify/5529/consoleFull

@charlesccychen
Copy link
Contributor

Root cause is the following. Looks like we need to add "python-snappy == 0.5.3" into the setup.py file.

Encountered exception in PrefetchingSourceSetIterable reader thread: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sideinputs.py", line 137, in _reader_thread
    for value in reader:
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativefileio.py", line 198, in __iter__
    for record in self.read_next_block():
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/nativeavroio.py", line 94, in read_next_block
    self._sync_marker)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 261, in read_block_from_file
    return _AvroBlock(block_bytes, num_records, codec, schema, offset, size)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 313, in __init__
    self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 345, in _decompress_bytes
    result = snappy.decompress(memoryview(data)[:-4])
  File "/usr/local/lib/python2.7/dist-packages/snappy.py", line 91, in uncompress
    return _uncompress(data)
TypeError: argument 1 must be string or read-only buffer, not memoryview

@cclauss cclauss deleted the buffer-to-memoryview-again branch July 9, 2018 19:47
@angoenka
Copy link
Contributor

angoenka commented Jul 9, 2018

The build seems to be broken after this. Can you please take a look.
https://scans.gradle.com/s/ek5enzlgtrm3c/console-log#L3941
https://builds.apache.org/job/beam_PostCommit_Python_Verify/5531/consoleFull

Thannks

@charlesccychen
Copy link
Contributor

We should roll back until we figure out a proper fix. Unfortunately, we can't just add a python-snappy pin to the setup file, since it doesn't compile on all platforms.

@angoenka
Copy link
Contributor

angoenka commented Jul 9, 2018

Is this the only commit that needs to be reverted?

@charlesccychen
Copy link
Contributor

@angoenka Yes

charlesccychen added a commit that referenced this pull request Jul 10, 2018
@charlesccychen
Copy link
Contributor

@angoenka @cclauss The postcommit issue was fixed and merged in #5908.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants