Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities #22575

Merged
merged 10 commits into from
Aug 31, 2022

Conversation

TheNeuralBit
Copy link
Member

@TheNeuralBit TheNeuralBit commented Aug 3, 2022

Fixes #22678

This PR moves the pandas - Beam type mapping from apache_beam.dataframe.schemas to apache_beam.typehints.pandas_type_compatibility, and modifies apache_beam.dataframe.convert to leverage that mapping by defining batch-producing and -consuming DoFns.

The new module now provides BatchConverter implementations that can be re-used in other DoFns that wish to process structured data using the pandas API. It also makes one slight modification in the type mapping: we now have a special field option, beam:dataframe:index:v1. This option is used to indicate that a Beam schema field should map to an index in the pandas DataFrame type system. If a Beam schema has no fields identified as an index, then we assume the user does not care about the index, and a "meaningless" one will be generated when mapping to DataFrames. Similarly when mapping a DataFrame back to the Beam type system, the index will be dropped.

Note apache_beam.dataframe.schemas still exists, for two purposes:

  • To maintain backwards compatibility, we still define BatchRowsAsDataFrame and UnbatchPandas transforms. These transforms are no longer used in apache_beam.dataframe.convert though.
  • To handle proxy generation and consumption (generate_proxy, element_type_from_dataframe). These functions are still used in apache_beam.dataframe.convert.

All of the logic in apache_beam.dataframe.schemas defers to apache_beam.typehints.pandas_type_compatibility as much as possible.

The following PRs were separated from this one to ease review:

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@TheNeuralBit
Copy link
Member Author

Run Python 3.8 PostCommit

@github-actions github-actions bot added the python label Aug 3, 2022
@TheNeuralBit
Copy link
Member Author

Run Python 3.7 PostCommit

@codecov
Copy link

codecov bot commented Aug 4, 2022

Codecov Report

Merging #22575 (c088431) into master (63ba9c7) will decrease coverage by 0.01%.
The diff coverage is 93.36%.

@@            Coverage Diff             @@
##           master   #22575      +/-   ##
==========================================
- Coverage   74.19%   74.17%   -0.02%     
==========================================
  Files         709      712       +3     
  Lines       93499    93802     +303     
==========================================
+ Hits        69367    69582     +215     
- Misses      22855    22943      +88     
  Partials     1277     1277              
Flag Coverage Δ
python 83.53% <93.36%> (-0.06%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/typehints/__init__.py 77.77% <66.66%> (-22.23%) ⬇️
sdks/python/apache_beam/dataframe/schemas.py 96.62% <92.30%> (-1.05%) ⬇️
sdks/python/apache_beam/dataframe/convert.py 91.20% <93.47%> (+0.83%) ⬆️
...apache_beam/typehints/pandas_type_compatibility.py 94.95% <94.95%> (ø)
sdks/python/apache_beam/typehints/batch.py 90.38% <100.00%> (+1.99%) ⬆️
...examples/inference/sklearn_mnist_classification.py 43.75% <0.00%> (-3.75%) ⬇️
sdks/python/apache_beam/internal/metrics/metric.py 93.00% <0.00%> (-1.00%) ⬇️
sdks/python/apache_beam/io/localfilesystem.py 90.97% <0.00%> (-0.76%) ⬇️
...hon/apache_beam/runners/direct/test_stream_impl.py 93.28% <0.00%> (-0.75%) ⬇️
sdks/python/apache_beam/typehints/schemas.py 93.84% <0.00%> (-0.48%) ⬇️
... and 25 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@TheNeuralBit
Copy link
Member Author

Clarify separation of concerns between pandas_type_compatibility and dataframe.schemas

dataframe.schemas:

  • Maintain its current public API (possibly with deprecation notices)
  • Responsible for making proxies for the DataFrame API

typehints.pandas_type_compatibility:

  • pandas-Beam type mapping
  • BatchConverter implementations

@TheNeuralBit TheNeuralBit changed the title WIP: Use Batched DoFns in DataFrame convert utilities WIP: Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities Aug 12, 2022
@TheNeuralBit TheNeuralBit changed the title WIP: Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities Add BatchConverter implementations for pandas types, use Batched DoFns in DataFrame convert utilities Aug 12, 2022
@TheNeuralBit
Copy link
Member Author

CC: @robertwb

@TheNeuralBit
Copy link
Member Author

Run Python 3.8 PostCommit

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @y1chi for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @y1chi

@TheNeuralBit
Copy link
Member Author

@y1chi do you have time to review this?

@TheNeuralBit
Copy link
Member Author

R: @yeandy

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

Copy link
Contributor

@yeandy yeandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM. Left a few questions/comments

Comment on lines +141 to +142
else:
raise TypeError(f"Encountered unexpected type, left is a {type(left)!r}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be checking against the type of right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_series_equal or assert_frame_equal will raise if right isn't the appropriate type.

typehints.validate_composite_type_param(self.batch_typehint, '')
typehints.validate_composite_type_param(self.element_typehint, '')

def test_type_check(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_type_check(self):
def test_type_check_batch(self):

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

raise NotImplementedError

def explode_batch(self, batch: pd.DataFrame):
# TODO: Only do null checks for nullable types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now :) (#22948)


def produce_batch(self, elements):
# Note from_records has an index= parameter
batch = pd.DataFrame.from_records(elements, columns=self._columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use index= parameter here? Is it so it's easier to set the data type in the next 2 lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right, I think the above comment was just a note to self as I was iterating on this. I dropped the comment.

Comment on lines +202 to +206
def estimate_byte_size(self, batch: pd.DataFrame):
return batch.memory_usage().sum()

def get_length(self, batch: pd.DataFrame):
return len(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for these? And also for SeriesBatchConverter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thank you! I also filed #22950 - we should have a standard test suite to test all the BatchConverter implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment on lines +288 to +290
def explode_batch(self, batch: pd.Series):
raise NotImplementedError(
"explode_batch should be generated in SeriesBatchConverter.__init__")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is should this generated in __init__?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We branch on is_nullable one time in __init__ and assign explode_batch with a null-checking or non-null-checking alternative.

all_series = self._get_series(batch)
iterators = [make_null_checking_generator(series) for series in all_series]

for values in zip(*iterators):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we zip the self._columns along with the iterators? Might make it harder to read though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not quite sure what this would look like, could you clarify?

Related: It would be good to add a microbenchmark for produce_batch and explode_batch so we can easily evaluate alternative implementations. But I'd prefer to leave that as future work. For now this just preserves the implementation from apache_beam.dataframe.schemas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking of

    for values, columns in zip(*iterators, self._columns):
       ...

But I had to take a look again to wrap my head around it. Looks like you're zipping to create the rows first, and then in the second zip, you line them up with the column names. The length of an individual iterator in iterators isn't necessarily the same as the length of self._columns. Plus, we'd probably get too many values to unpack error if we had values, columns.

return SeriesBatchConverter.from_typehints(
element_type=element_type, batch_type=batch_type)

return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if we return None? Do we have checks in other places to detect for a None BatchConvertor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, this is handled when we try construction all the registered implementations:

def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
element_type = typehints.normalize(element_type)
batch_type = typehints.normalize(batch_type)
for constructor in BATCH_CONVERTER_REGISTRY:
result = constructor(element_type, batch_type)
if result is not None:
return result
# TODO(https://github.com/apache/beam/issues/21654): Aggregate error
# information from the failed BatchConverter matches instead of this
# generic error.
raise TypeError(
f"Unable to find BatchConverter for element_type {element_type!r} and "
f"batch_type {batch_type!r}")

Note this is very naive right now. In the future this should include helpful debug information to handle cases where one or more implementations almost matches. Tracked in #21654

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

yield element

def infer_output_type(self, input_element_type):
# Raise a TypeError if proxy has an unknown type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have missed this, but where does the error get raised?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this comment references behavior that was removed in 2b0597e

Now we will just shunt to Any in this case. I removed the comment. Thanks for raising this!

Comment on lines +183 to +184
self.assertTrue(self.converter == self.create_batch_converter())
self.assertTrue(self.create_batch_converter() == self.converter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the purpose of checking the equality both ways?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just being overly cautious - in theory the instances on either side could be a different type and could have a different __eq__ implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +263 to +271
if is_nullable(element_type):

def unbatch(series):
for isnull, value in zip(pd.isnull(series), series):
yield None if isnull else value
else:

def unbatch(series):
yield from series
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I actually don't mind the extra lines, especially since we're defining functions here, so it's easier to read. I'll leave it up to you.

Suggested change
if is_nullable(element_type):
def unbatch(series):
for isnull, value in zip(pd.isnull(series), series):
yield None if isnull else value
else:
def unbatch(series):
yield from series
if is_nullable(element_type):
def unbatch(series):
for isnull, value in zip(pd.isnull(series), series):
yield None if isnull else value
else:
def unbatch(series):
yield from series

(3, ),
(10, ),
])
def test_get_lenth(self, N):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_get_lenth(self, N):
def test_get_length(self, N):

Comment on lines +202 to +206
def estimate_byte_size(self, batch: pd.DataFrame):
return batch.memory_usage().sum()

def get_length(self, batch: pd.DataFrame):
return len(batch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@TheNeuralBit
Copy link
Member Author

Run Python 3.8 PostCommit

@TheNeuralBit
Copy link
Member Author

Run Python Examples_Direct

@TheNeuralBit
Copy link
Member Author

Run Python Examples_Dataflow

@TheNeuralBit
Copy link
Member Author

retest this please

1 similar comment
@TheNeuralBit
Copy link
Member Author

retest this please

@TheNeuralBit
Copy link
Member Author

Run Python Examples_Direct

@TheNeuralBit
Copy link
Member Author

Run Python Examples_Dataflow

@TheNeuralBit
Copy link
Member Author

Run Python 3.8 PostCommit

@TheNeuralBit
Copy link
Member Author

PythonDocs PreCommit has passed (https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Commit/9575/), merging

@TheNeuralBit TheNeuralBit merged commit a6329a5 into apache:master Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Task]: Use Batched DoFn API in the DataFrame API
2 participants