Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9546] DataframeTransform can now consume a schema-aware PCollection #11980

Merged
merged 7 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 7 additions & 10 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
from apache_beam.portability import common_urns
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import row_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.utils import proto_utils

__all__ = ["RowCoder"]
Expand Down Expand Up @@ -90,14 +89,12 @@ def from_runner_api_parameter(schema, components, unused_context):

@staticmethod
def from_type_hint(type_hint, registry):
if isinstance(type_hint, row_type.RowTypeConstraint):
try:
schema = named_fields_to_schema(type_hint._fields)
except ValueError:
# TODO(BEAM-10570): Consider a pythonsdk logical type.
return typecoders.registry.get_coder(object)
else:
schema = named_tuple_to_schema(type_hint)
try:
schema = schema_from_element_type(type_hint)
except ValueError:
# TODO(BEAM-10570): Consider a pythonsdk logical type.
return typecoders.registry.get_coder(object)

return RowCoder(schema)

@staticmethod
Expand Down
13 changes: 12 additions & 1 deletion sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from apache_beam import pvalue
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import schemas
from apache_beam.dataframe import transforms

if TYPE_CHECKING:
Expand All @@ -36,7 +37,7 @@
# TODO: Or should this be called as_dataframe?
def to_dataframe(
pcoll, # type: pvalue.PCollection
proxy, # type: pandas.core.generic.NDFrame
proxy=None, # type: pandas.core.generic.NDFrame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woo hoo!

):
# type: (...) -> frame_base.DeferredFrame

Expand All @@ -52,6 +53,16 @@ def to_dataframe(

A proxy object must be given if the schema for the PCollection is not known.
"""
if proxy is None:
if pcoll.element_type is None:
raise ValueError(
"Cannot infer a proxy because the input PCollection does not have a "
"schema defined. Please make sure a schema type is specified for "
"the input PCollection, or provide a proxy.")
# If no proxy is given, assume this is an element-wise schema-aware
# PCollection that needs to be batched.
proxy = schemas.generate_proxy(pcoll.element_type)
pcoll = pcoll | 'BatchElements' >> schemas.BatchRowsAsDataFrame()
return frame_base.DeferredFrame.wrap(
expressions.PlaceholderExpression(proxy, pcoll))

Expand Down
71 changes: 71 additions & 0 deletions sdks/python/apache_beam/dataframe/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Utilities for relating schema-aware PCollections and dataframe transforms.
"""

# pytype: skip-file

from __future__ import absolute_import

import typing

import pandas as pd

import apache_beam as beam
from apache_beam import typehints
from apache_beam.transforms.util import BatchElements
from apache_beam.typehints.schemas import named_fields_from_element_type

__all__ = ('BatchRowsAsDataFrame', 'generate_proxy')

T = typing.TypeVar('T', bound=typing.NamedTuple)


@typehints.with_input_types(T)
@typehints.with_output_types(pd.DataFrame)
class BatchRowsAsDataFrame(beam.PTransform):
"""A transform that batches schema-aware PCollection elements into DataFrames

Batching parameters are inherited from
:class:`~apache_beam.transforms.util.BatchElements`.
"""
def __init__(self, *args, **kwargs):
self._batch_elements_transform = BatchElements(*args, **kwargs)

def expand(self, pcoll):
columns = [
name for name, _ in named_fields_from_element_type(pcoll.element_type)
]
return pcoll | self._batch_elements_transform | beam.Map(
lambda batch: pd.DataFrame.from_records(batch, columns=columns))


def _make_empty_series(name, typ):
try:
return pd.Series(name=name, dtype=typ)
except TypeError:
raise TypeError("Unable to convert type '%s' for field '%s'" % (name, typ))


def generate_proxy(element_type):
# type: (type) -> pd.DataFrame
return pd.DataFrame({
name: _make_empty_series(name, typ)
for name,
typ in named_fields_from_element_type(element_type)
})
111 changes: 111 additions & 0 deletions sdks/python/apache_beam/dataframe/schemas_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Tests for schemas."""

# pytype: skip-file

from __future__ import absolute_import

import unittest
from typing import NamedTuple

import future.tests.base # pylint: disable=unused-import
# patches unittest.testcase to be python3 compatible
import pandas as pd
from past.builtins import unicode

import apache_beam as beam
from apache_beam.coders import RowCoder
from apache_beam.coders.typecoders import registry as coders_registry
from apache_beam.dataframe import schemas
from apache_beam.dataframe import transforms
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that

Simple = NamedTuple(
'Simple', [('name', unicode), ('id', int), ('height', float)])
coders_registry.register_coder(Simple, RowCoder)
Animal = NamedTuple('Animal', [('animal', unicode), ('max_speed', float)])
coders_registry.register_coder(Animal, RowCoder)


def matches_df(expected):
def check_df_pcoll_equal(actual):
actual = pd.concat(actual)
sorted_actual = actual.sort_values(by=list(actual.columns)).reset_index(
drop=True)
sorted_expected = expected.sort_values(
by=list(expected.columns)).reset_index(drop=True)
if not sorted_actual.equals(sorted_expected):
raise AssertionError(
'Dataframes not equal: \n\nActual:\n%s\n\nExpected:\n%s' %
(sorted_actual, sorted_expected))

return check_df_pcoll_equal


class SchemasTest(unittest.TestCase):
def test_simple_df(self):
expected = pd.DataFrame({
'name': list(unicode(i) for i in range(5)),
'id': list(range(5)),
'height': list(float(i) for i in range(5))
},
columns=['name', 'id', 'height'])

with TestPipeline() as p:
res = (
p
| beam.Create([
Simple(name=unicode(i), id=i, height=float(i)) for i in range(5)
])
| schemas.BatchRowsAsDataFrame(min_batch_size=10, max_batch_size=10))
assert_that(res, matches_df(expected))

def test_generate_proxy(self):
expected = pd.DataFrame({
'animal': pd.Series(dtype=unicode), 'max_speed': pd.Series(dtype=float)
})

self.assertTrue(schemas.generate_proxy(Animal).equals(expected))

def test_batch_with_df_transform(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe do a test using to_dataframe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test using to_dataframe in transforms_test: test_batching_beam_row_to_dataframe.

I intended for these tests to just test schemas.py, while transforms_test verifies the integration with DataframeTransform. The one below was a stepping stone to integrating, we could even remove it now.

with TestPipeline() as p:
res = (
p
| beam.Create([
Animal('Falcon', 380.0),
Animal('Falcon', 370.0),
Animal('Parrot', 24.0),
Animal('Parrot', 26.0)
])
| schemas.BatchRowsAsDataFrame()
| transforms.DataframeTransform(
lambda df: df.groupby('animal').mean(),
# TODO: Generate proxy in this case as well
proxy=schemas.generate_proxy(Animal)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this one too? (Or at least drop a TODO to do it in a subsequent PR?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for now. I guess we'd need to store some type information on a PCollection[DataFrame], should we just store a proxy object when we know it?

assert_that(
res,
matches_df(
pd.DataFrame({'max_speed': [375.0, 25.0]},
index=pd.Index(
data=['Falcon', 'Parrot'], name='animal'))))


if __name__ == '__main__':
unittest.main()
17 changes: 14 additions & 3 deletions sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,23 @@ class DataframeTransform(transforms.PTransform):
"""A PTransform for applying function that takes and returns dataframes
to one or more PCollections.

For example, if pcoll is a PCollection of dataframes, one could write::
DataframeTransform will accept a PCollection with a schema and batch it
into dataframes if necessary. In this case the proxy can be omitted:

(pcoll | beam.Row(key=..., foo=..., bar=...)
| DataframeTransform(lambda df: df.group_by('key').sum()))

It is also possible to process a PCollection of dataframes directly, in this
case a proxy must be provided. For example, if pcoll is a PCollection of
dataframes, one could write::

pcoll | DataframeTransform(lambda df: df.group_by('key').sum(), proxy=...)

To pass multiple PCollections, pass a tuple of PCollections wich will be
passed to the callable as positional arguments, or a dictionary of
PCollections, in which case they will be passed as keyword arguments.
"""
def __init__(self, func, proxy):
def __init__(self, func, proxy=None):
self._func = func
self._proxy = proxy

Expand All @@ -62,7 +70,10 @@ def expand(self, input_pcolls):

# Convert inputs to a flat dict.
input_dict = _flatten(input_pcolls) # type: Dict[Any, PCollection]
proxies = _flatten(self._proxy)
proxies = _flatten(self._proxy) if self._proxy is not None else {
tag: None
for tag in input_dict.keys()
}
input_frames = {
k: convert.to_dataframe(pc, proxies[k])
for k, pc in input_dict.items()
Expand Down