Skip to content

Conversation

@TheNeuralBit
Copy link
Member

Fixes #22197

This updates DoFn.default_type_hints to inspect the output type annotation for the appropriate method (process, or process_batch) depending on the yields_batches/yields_elements decorators.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@github-actions github-actions bot added the python label Jul 8, 2022
@TheNeuralBit
Copy link
Member Author

FYI @robertwb this fixes a bug in Batched DoFns. I'm OOO next week and would like to get this in before the release cut the following Wednesday.

@codecov
Copy link

codecov bot commented Jul 8, 2022

Codecov Report

Merging #22198 (02fa3f9) into master (d44c044) will decrease coverage by 0.02%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master   #22198      +/-   ##
==========================================
- Coverage   74.22%   74.20%   -0.03%     
==========================================
  Files         702      703       +1     
  Lines       92937    93095     +158     
==========================================
+ Hits        68985    69077      +92     
- Misses      22685    22751      +66     
  Partials     1267     1267              
Flag Coverage Δ
python 83.54% <100.00%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/transforms/core.py 92.94% <100.00%> (+0.37%) ⬆️
sdks/python/apache_beam/typehints/decorators.py 92.81% <100.00%> (+0.08%) ⬆️
sdks/python/apache_beam/internal/dill_pickler.py 85.61% <0.00%> (-1.44%) ⬇️
sdks/python/apache_beam/io/source_test_utils.py 88.01% <0.00%> (-1.39%) ⬇️
...ython/apache_beam/io/gcp/bigquery_read_internal.py 53.36% <0.00%> (-1.05%) ⬇️
...hon/apache_beam/runners/direct/test_stream_impl.py 93.28% <0.00%> (-0.75%) ⬇️
...dks/python/apache_beam/options/pipeline_options.py 94.34% <0.00%> (-0.63%) ⬇️
...ks/python/apache_beam/runners/worker/data_plane.py 87.57% <0.00%> (-0.57%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.30% <0.00%> (-0.38%) ⬇️
sdks/python/apache_beam/transforms/external.py 79.73% <0.00%> (-0.11%) ⬇️
... and 26 more

Help us with your feedback. Take ten seconds to tell us how you rate us.

@github-actions
Copy link
Contributor

github-actions bot commented Jul 8, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @ryanthompson591 for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

1 similar comment
@TheNeuralBit
Copy link
Member Author

Run Python PreCommit

| beam.Map(lambda x: x * 3))
| beam.ParDo(ArrayProduceDoFn()))

self.assertEqual(pc.element_type, np.int64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is up to you, my rule of thumb is smaller concise tests for unit tests.

Like:
def test_pardo_maintains_type_hint():
with self.create_pipeline() as p:
pc = (
p
| beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
np.int64)
| beam.ParDo(ArrayProduceDoFn()))

  self.assertEqual(pc.element_type, np.int64)

And then end the test there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I initially added the test here for expediency, but now I have the concise test in batch_dofn_test. I'll drop this change.


class MismatchedBatchProducingDoFn(beam.DoFn):
"""A DoFn that produces batches from both process and process_batch, with
mismatched types. Should yield a construction time error when applied."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took me three looks to realize the mismatch was one produces floats and the other ints. Maybe just slightly amend the comment like this:
...with mismatched types (one has floats the other ints)....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

batch_fn_type_hints = typehints.decorators.IOTypeHints.from_callable(
self.process_batch)

if fn_type_hints is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This nested logic is hard to read and a little messy and hard to follow.

For example here and below we have "if fn_type_hints is not None:"

I had to write this a little to really wrap my head around it. I don't know if any parts of my rewrite are useful.

# default type hints can come from one of two places. The process type hints or the process batch type hints.
# also the process might be set up to yield batches or to yield elements

has_fn_type_hints = fn_type_hints is not None
has_batch_type_hints = batch_fn_type_hints is not None
yields_batches = self._process_yields_batches
yeilds_elements = self._process_yields_elements

if has_fn_type_hints:
  if yields_elements and has_batch_type_hints:
    # because both batch and element type hints are defined, raise an exception if they are not equal.
    self.validate_batch_and_fn_typehints_same()
    fn_type_hints = fn_type_hints.with_output_types_from(
            batch_fn_type_hints)  # TODO: comment why this is done.
  elif yields_batches:
    # Because the process method produces batches, don't use its output typehints.
    fn_type_hints = fn_type_hints.with_output_types_from(
            typehints.decorators.IOTypeHints.empty())
elif yields_elements and has_batch_type_hints:
  # process_batch produces elements, grab its output typehint
  fn_type_hints = batch_fn_type_hints.with_input_types_from(
            typehints.decorators.IOTypeHints.empty())  # TODO comment why does this use with_input_types_from

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed this logic was not pretty. I think part of the issue is that it's has to deal with both None, and the possibility of IOTypeHints.empty() in both sets of typehints. I rewrote it to collapse those two possibilities, and also separated concerns a bit, first we deal with process_yields_batches, then deal with process_batch_yields_elements. WDYT?

Can you clarify the "TODO comment why does this use with_input_types_from"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little concerned rewriting this way could break some esoteric use-case, but I ran all our internal tests and it looks good.

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @ryanthompson591

Copy link
Member Author

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback!

| beam.Map(lambda x: x * 3))
| beam.ParDo(ArrayProduceDoFn()))

self.assertEqual(pc.element_type, np.int64)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I initially added the test here for expediency, but now I have the concise test in batch_dofn_test. I'll drop this change.


class MismatchedBatchProducingDoFn(beam.DoFn):
"""A DoFn that produces batches from both process and process_batch, with
mismatched types. Should yield a construction time error when applied."""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done

batch_fn_type_hints = typehints.decorators.IOTypeHints.from_callable(
self.process_batch)

if fn_type_hints is not None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agreed this logic was not pretty. I think part of the issue is that it's has to deal with both None, and the possibility of IOTypeHints.empty() in both sets of typehints. I rewrote it to collapse those two possibilities, and also separated concerns a bit, first we deal with process_yields_batches, then deal with process_batch_yields_elements. WDYT?

Can you clarify the "TODO comment why does this use with_input_types_from"?

@TheNeuralBit
Copy link
Member Author

assign to next reviewer

@TheNeuralBit
Copy link
Member Author

R: @tvalentyn could you take a look? It looks like Ryan will be out through next week and I'd like to get this in for the release cut.

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@tvalentyn
Copy link
Contributor

I checked that get_type_hints(object).with_defaults(beam.typehints.decorators.IOTypeHints.empty().strip_iterable()) is the same as get_type_hints(object).with_defaults(None). Looks like this doesn't change evaluation for non-batching DoFn's, so shouldn't be a breaking change. But you can also try to import and run TGP if you think this needs more coverage.

@TheNeuralBit
Copy link
Member Author

Thanks for checking on that! I ran a TGP and everything looks good.

@TheNeuralBit TheNeuralBit merged commit d050a08 into apache:master Jul 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: DoFn.process with @yields_batches produces PCollection with incorrect element_type

3 participants