New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9546] DataframeTransform can now consume a schema-aware PCollection #11980
Conversation
CC: @robertwb |
ff822ac
to
55c4920
Compare
R: @robertwb |
retest this please |
55c4920
to
f658cb2
Compare
@@ -36,7 +37,7 @@ | |||
# TODO: Or should this be called as_dataframe? | |||
def to_dataframe( | |||
pcoll, # type: pvalue.PCollection | |||
proxy, # type: pandas.core.generic.NDFrame | |||
proxy=None, # type: pandas.core.generic.NDFrame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo hoo!
self._batch_elements_transform = BatchElements(*args, **kwargs) | ||
|
||
def expand(self, pcoll): | ||
return super(BatchRowsAsDataFrame, self).expand(pcoll) | ParDo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than subclassing, it would probably be cleaner to make this just a PTransform whose expand method returns
`pcoll | BatchElements(...) | Pardo(...)`.
If you want to accept all the parameter from BatchElements, you could construct the BatchElements instance in your constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Looks like I actually started to do it that way with the unused self._batch_elements_transform but then changed my mind
_RowBatchToDataFrameDoFn(pcoll.element_type)) | ||
|
||
|
||
class _RowBatchToDataFrameDoFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than letting this be a full DoFn, you could just let columns be a local variable in the map above, and then write
... | Map(lambda batch: pd.DataFrame.from_records(batch, columns))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import pandas as pd | ||
|
||
from apache_beam import typehints | ||
from apache_beam.transforms.core import DoFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. We typically have the style of importing modules, and then using qualified names (which results in less churn and makes it a bit easier to figure out where things come from). Instead of core, it's typical to do import apache_beam as beam
and use beam.DoFn, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| schemas.BatchRowsAsDataFrame() | ||
| transforms.DataframeTransform( | ||
lambda df: df.groupby('animal').mean(), | ||
proxy=schemas.generate_proxy(Animal))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get rid of this one too? (Or at least drop a TODO to do it in a subsequent PR?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO for now. I guess we'd need to store some type information on a PCollection[DataFrame], should we just store a proxy object when we know it?
|
||
self.assertTrue(schemas.generate_proxy(Animal).equals(expected)) | ||
|
||
def test_batch_with_df_transform(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe do a test using to_dataframe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test using to_dataframe in transforms_test: test_batching_beam_row_to_dataframe.
I intended for these tests to just test schemas.py, while transforms_test verifies the integration with DataframeTransform. The one below was a stepping stone to integrating, we could even remove it now.
AnimalSpeed('Elephant', 35), | ||
AnimalSpeed('Zebra', 40) | ||
]).with_output_types(AnimalSpeed) | ||
| transforms.DataframeTransform(lambda df: df.filter(regex='A.*'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This threw me because I was expecting the result to be ['Aardvark', 'Ant']
. I see now that it's filtering down to the column names that start with A, but perhaps the filter could be written a bit differently to make it more obvious (e.g. filter on the values, let the regex be 'Anim*', or use another operation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You and me both :) I just reused the operation from test_filter
above. Changed it to Anim.* in both places
sdks/python/apache_beam/pvalue.py
Outdated
@@ -88,7 +88,7 @@ class PValue(object): | |||
def __init__(self, | |||
pipeline, # type: Pipeline | |||
tag=None, # type: Optional[str] | |||
element_type=None, # type: Optional[object] | |||
element_type=None, # type: Optional[type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a type constraint. (Not all our type hints are types.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
Returns schema as a list of (name, python_type) tuples""" | ||
if isinstance(element_type, row_type.RowTypeConstraint): | ||
# TODO: Make sure beam.Row generated schemas are registered and de-duped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this worth a JIRA?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed BEAM-10722 for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed all the comments, PTAL. Also pushed 66d258d updating the docstring.
_RowBatchToDataFrameDoFn(pcoll.element_type)) | ||
|
||
|
||
class _RowBatchToDataFrameDoFn(DoFn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
self._batch_elements_transform = BatchElements(*args, **kwargs) | ||
|
||
def expand(self, pcoll): | ||
return super(BatchRowsAsDataFrame, self).expand(pcoll) | ParDo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Looks like I actually started to do it that way with the unused self._batch_elements_transform but then changed my mind
import pandas as pd | ||
|
||
from apache_beam import typehints | ||
from apache_beam.transforms.core import DoFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
self.assertTrue(schemas.generate_proxy(Animal).equals(expected)) | ||
|
||
def test_batch_with_df_transform(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test using to_dataframe in transforms_test: test_batching_beam_row_to_dataframe.
I intended for these tests to just test schemas.py, while transforms_test verifies the integration with DataframeTransform. The one below was a stepping stone to integrating, we could even remove it now.
sorted_df = df.sort_values(by=list(df.columns)) | ||
else: | ||
sorted_df = df.sort_values() | ||
return sorted_df.reset_index(drop=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note there's actually a diff here from the original check_correct
. It sorts by value and resets the index rather than sorting by index. I had to do this because the concatenated indices of the batches (e.g. [0,1,2,0,1,0,]
) wouldn't match the index in my expected df (e.g. [0,1,2,3,4,5]
).
AnimalSpeed('Elephant', 35), | ||
AnimalSpeed('Zebra', 40) | ||
]).with_output_types(AnimalSpeed) | ||
| transforms.DataframeTransform(lambda df: df.filter(regex='A.*'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You and me both :) I just reused the operation from test_filter
above. Changed it to Anim.* in both places
sdks/python/apache_beam/pvalue.py
Outdated
@@ -88,7 +88,7 @@ class PValue(object): | |||
def __init__(self, | |||
pipeline, # type: Pipeline | |||
tag=None, # type: Optional[str] | |||
element_type=None, # type: Optional[object] | |||
element_type=None, # type: Optional[type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Run Python PreCommit |
Run Python2_PVR_Flink PreCommit |
Run PythonDocker PreCommit |
Run Python PreCommit |
…tion (apache#11980) * Add BatchRowsAsDataframe and generate_proxy, integrated into DataFrameTransform * lint * fix ci failures * yapf? * Address PR comments * Update DataframeTransform docstring * lint
…tion (apache#11980) * Add BatchRowsAsDataframe and generate_proxy, integrated into DataFrameTransform * lint * fix ci failures * yapf? * Address PR comments * Update DataframeTransform docstring * lint
…tion (apache#11980) * Add BatchRowsAsDataframe and generate_proxy, integrated into DataFrameTransform * lint * fix ci failures * yapf? * Address PR comments * Update DataframeTransform docstring * lint
Adds batching of schema'd PCollections into dataframes based on BatchElements transform.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.