From 5fe1574bf4febcb93db095f0e1ed048df62c2908 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 12:03:23 -0700 Subject: [PATCH 01/10] Extract utilities in dataframe.schemas --- sdks/python/apache_beam/dataframe/schemas.py | 45 +++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py index b240dec591900..294840b3db2b6 100644 --- a/sdks/python/apache_beam/dataframe/schemas.py +++ b/sdks/python/apache_beam/dataframe/schemas.py @@ -51,12 +51,14 @@ # pytype: skip-file from typing import Any +from typing import Dict from typing import NamedTuple from typing import Optional +from typing import Sequence +from typing import Tuple from typing import TypeVar from typing import Union -import numpy as np import pandas as pd import apache_beam as beam @@ -64,8 +66,9 @@ from apache_beam.portability.api import schema_pb2 from apache_beam.transforms.util import BatchElements from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.typehints.schemas import named_fields_from_element_type -from apache_beam.typehints.schemas import named_fields_to_schema +from apache_beam.typehints.typehints import normalize from apache_beam.typehints.schemas import named_tuple_from_schema from apache_beam.typehints.schemas import named_tuple_to_schema from apache_beam.typehints.typehints import normalize @@ -154,6 +157,12 @@ def expand(self, pcoll): return pcoll | self._batch_elements_transform | beam.Map(construct) +def dtype_from_typehint(typehint): + # Default to np.object. This is lossy, we won't be able to recover + # the type at the output. + return BEAM_TO_PANDAS.get(typehint, object) + + def generate_proxy(element_type): # type: (type) -> pd.DataFrame @@ -162,16 +171,14 @@ def generate_proxy(element_type): Currently only supports generating a DataFrame proxy from a schema-aware PCollection or a Series proxy from a primitively typed PCollection. """ - if element_type != Any and element_type in BEAM_TO_PANDAS: - return pd.Series(dtype=BEAM_TO_PANDAS[element_type]) - + dtype = dtype_from_typehint(element_type) + if dtype is not object: + return pd.Series(dtype=dtype) else: fields = named_fields_from_element_type(element_type) proxy = pd.DataFrame(columns=[name for name, _ in fields]) for name, typehint in fields: - # Default to np.object. This is lossy, we won't be able to recover - # the type at the output. - dtype = BEAM_TO_PANDAS.get(typehint, object) + dtype = dtype_from_typehint(typehint) proxy[name] = proxy[name].astype(dtype) return proxy @@ -187,6 +194,12 @@ def element_type_from_dataframe(proxy, include_indexes=False): Currently only supports generating a DataFrame proxy from a schema-aware PCollection. """ + return element_typehint_from_dataframe_proxy(proxy, include_indexes).user_type + + +def element_typehint_from_dataframe_proxy( + proxy: pd.DataFrame, include_indexes: bool = False) -> RowTypeConstraint: + output_columns = [] if include_indexes: remaining_index_names = list(proxy.index.names) @@ -220,9 +233,19 @@ def element_type_from_dataframe(proxy, include_indexes=False): output_columns.extend(zip(proxy.columns, proxy.dtypes)) - return named_tuple_from_schema( - named_fields_to_schema([(column, _dtype_to_fieldtype(dtype)) - for (column, dtype) in output_columns])) + fields = [(column, _dtype_to_fieldtype(dtype)) + for (column, dtype) in output_columns] + field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] + if include_indexes: + field_options = { + # TODO: Reference the constant in pandas_type_compatibility + index_name: [('beam:dataframe:index', None)] + for index_name in proxy.index.names + } + else: + field_options = None + + return RowTypeConstraint.from_fields(fields, field_options=field_options) class _BaseDataframeUnbatchDoFn(beam.DoFn): From c913e05ce025fb98e64743600a95995ee45ae07f Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 11:58:20 -0700 Subject: [PATCH 02/10] Add pandas_type_compatibility with pandas BatchConverter implementations --- .../typehints/pandas_type_compatibility.py | 195 ++++++++++++++++++ .../pandas_type_compatibility_test.py | 192 +++++++++++++++++ 2 files changed, 387 insertions(+) create mode 100644 sdks/python/apache_beam/typehints/pandas_type_compatibility.py create mode 100644 sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py new file mode 100644 index 0000000000000..1314ddf3b7d15 --- /dev/null +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for converting between Beam schemas and pandas DataFrames. + +For internal use only, no backward compatibility guarantees. +""" + +from apache_beam.typehints.batch import BatchConverter +from apache_beam.typehints.typehints import is_nullable +from apache_beam.typehints.row_type import RowTypeConstraint +from apache_beam.dataframe.schemas import dtype_from_typehint +from apache_beam.dataframe.schemas import generate_proxy + +import pandas as pd +import numpy as np + +from typing import Any +from typing import Optional +from typing import List +from typing import Tuple +from typing import Dict + +# Name for a valueless field-level option which, when present, indicates that +# a field should map to an index in the Beam DataFrame API. +INDEX_OPTION_NAME = 'beam:dataframe:index' + + +class DataFrameBatchConverter(BatchConverter): + def __init__( + self, + element_type: RowTypeConstraint, + ): + super().__init__(pd.DataFrame, element_type) + self._columns = [name for name, _ in element_type._fields] + + @staticmethod + @BatchConverter.register + def from_typehints(element_type, + batch_type) -> Optional['DataFrameBatchConverter']: + if not batch_type == pd.DataFrame: + return None + + if not isinstance(element_type, RowTypeConstraint): + element_type = RowTypeConstraint.from_user_type(element_type) + if element_type is None: + return None + + index_columns = [ + field_name + for (field_name, field_options) in element_type._field_options.items() + if any(key == INDEX_OPTION_NAME for key, value in field_options) + ] + + if index_columns: + return DataFrameBatchConverterKeepIndex(element_type, index_columns) + else: + return DataFrameBatchConverterDropIndex(element_type) + + def _get_series(self, batch: pd.DataFrame): + raise NotImplementedError + + def explode_batch(self, batch: pd.DataFrame): + # TODO: Only do null checks for nullable types + def make_null_checking_generator(series): + nulls = pd.isnull(series) + return (None if isnull else value for isnull, value in zip(nulls, series)) + + all_series = self._get_series(batch) + iterators = [make_null_checking_generator(series) for series in all_series] + + for values in zip(*iterators): + yield self._element_type.user_type( + **{column: value + for column, value in zip(self._columns, values)}) + + def combine_batches(self, batches: List[pd.DataFrame]): + return pd.concat(batches) + + def estimate_byte_size(self, batch: pd.DataFrame): + return batch.memory_usage().sum() + + def get_length(self, batch: pd.DataFrame): + return len(batch) + + +class DataFrameBatchConverterDropIndex(DataFrameBatchConverter): + """A DataFrameBatchConverter that assumes the DataFrame index has no meaning. + + When producing a DataFrame from Rows, a meaningless index will be generated. + When exploding a DataFrame into Rows, the index will be dropped. + """ + def _get_series(self, batch: pd.DataFrame): + return [batch[column] for column in batch.columns] + + def produce_batch(self, elements): + # Note from_records has an index= parameter + batch = pd.DataFrame.from_records(elements, columns=self._columns) + + for column, typehint in self._element_type._fields: + batch[column] = batch[column].astype(dtype_from_typehint(typehint)) + + return batch + + +class DataFrameBatchConverterKeepIndex(DataFrameBatchConverter): + """A DataFrameBatchConverter that preserves the DataFrame index. + + This is tracked via options on the Beam schema. Each field in the schema that + should map to the index is tagged in an option with name 'dataframe:index'. + """ + def __init__(self, element_type: RowTypeConstraint, index_columns: List[Any]): + super().__init__(element_type) + self._index_columns = index_columns + + def _get_series(self, batch: pd.DataFrame): + assert list(batch.index.names) == self._index_columns + return [ + batch.index.get_level_values(i) for i in range(len(batch.index.names)) + ] + [batch[column] for column in batch.columns] + + def produce_batch(self, elements): + # Note from_records has an index= parameter + batch = pd.DataFrame.from_records(elements, columns=self._columns) + + for column, typehint in self._element_type._fields: + batch[column] = batch[column].astype(dtype_from_typehint(typehint)) + + return batch.set_index(self._index_columns) + + +class SeriesBatchConverter(BatchConverter): + def __init__( + self, + element_type: type, + dtype, + ): + super().__init__(pd.DataFrame, element_type) + self._dtype = dtype + + if is_nullable(element_type): + + def unbatch(series): + for isnull, value in zip(pd.isnull(series), series): + yield None if isnull else value + else: + + def unbatch(series): + yield from series + + self.explode_batch = unbatch + + @staticmethod + @BatchConverter.register + def from_typehints(element_type, + batch_type) -> Optional['SeriesBatchConverter']: + if not batch_type == pd.Series: + return None + + dtype = dtype_from_typehint(element_type) + if dtype == np.object: + # Don't create Any <-> Series[np.object] mapping + return None + + return SeriesBatchConverter(element_type, dtype) + + def produce_batch(self, elements: List[Any]) -> pd.Series: + return pd.Series(elements, dtype=self._dtype) + + def explode_batch(self, batch: pd.Series): + raise NotImplementedError( + "explode_batch should be generated in SeriesBatchConverter.__init__") + + def combine_batches(self, batches: List[pd.Series]): + return pd.concat(batches) + + def estimate_byte_size(self, batch: pd.Series): + return batch.memory_usage() + + def get_length(self, batch: pd.Series): + return len(batch) diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py new file mode 100644 index 0000000000000..5d629ebca9a90 --- /dev/null +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py @@ -0,0 +1,192 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for pandas batched type converters.""" + +from typing import Optional +import unittest + +import pandas as pd +import numpy as np +from parameterized import parameterized +from parameterized import parameterized_class + +from apache_beam.typehints import typehints +from apache_beam.typehints import row_type +from apache_beam.typehints.batch import BatchConverter +from apache_beam.typehints.pandas_type_compatibility import DataFrameBatchConverter + + +@parameterized_class([ + { + 'batch_typehint': pd.DataFrame, + 'element_typehint': row_type.RowTypeConstraint.from_fields([ + ('foo', int), + ('bar', float), + ('baz', str), + ]), + 'match_index': False, + 'batch': pd.DataFrame({ + 'foo': pd.Series(range(100), dtype='int64'), + 'bar': pd.Series([i / 100 for i in range(100)], dtype='float64'), + 'baz': pd.Series([str(i) for i in range(100)], + dtype=pd.StringDtype()), + }), + }, + { + 'batch_typehint': pd.DataFrame, + 'element_typehint': row_type.RowTypeConstraint.from_fields( + [ + ('an_index', int), + ('foo', int), + ('bar', float), + ('baz', str), + ], + field_options={'an_index': [('beam:dataframe:index', None)]}, + ), + 'match_index': True, + 'batch': pd.DataFrame({ + 'foo': pd.Series(range(100), dtype='int64'), + 'bar': pd.Series([i / 100 for i in range(100)], dtype='float64'), + 'baz': pd.Series([str(i) for i in range(100)], + dtype=pd.StringDtype()), + }).set_index(pd.Int64Index(range(123, 223), name='an_index')), + }, + { + 'batch_typehint': pd.DataFrame, + 'element_typehint': row_type.RowTypeConstraint.from_fields( + [ + ('an_index', int), + ('another_index', int), + ('foo', int), + ('bar', float), + ('baz', str), + ], + field_options={ + 'an_index': [('beam:dataframe:index', None)], + 'another_index': [('beam:dataframe:index', None)], + }), + 'match_index': True, + 'batch': pd.DataFrame({ + 'foo': pd.Series(range(100), dtype='int64'), + 'bar': pd.Series([i / 100 for i in range(100)], dtype='float64'), + 'baz': pd.Series([str(i) for i in range(100)], + dtype=pd.StringDtype()), + }).set_index([ + pd.Int64Index(range(123, 223), name='an_index'), + pd.Int64Index(range(475, 575), name='another_index'), + ]), + }, + { + 'batch_typehint': pd.Series, + 'element_typehint': int, + 'match_index': False, + 'batch': pd.Series(range(500)), + }, + { + 'batch_typehint': pd.Series, + 'element_typehint': str, + 'batch': pd.Series(['foo', 'bar', 'baz', 'def', 'ghi', 'abc'] * 10, + dtype=pd.StringDtype()), + }, + { + 'batch_typehint': pd.Series, + 'element_typehint': Optional[np.int64], + 'batch': pd.Series((i if i % 11 else None for i in range(500)), + dtype=pd.Int64Dtype()), + }, + { + 'batch_typehint': pd.Series, + 'element_typehint': Optional[str], + 'batch': pd.Series(['foo', None, 'bar', 'baz', None, 'def', 'ghi'] * 10, + dtype=pd.StringDtype()), + }, +]) +class DataFrameBatchConverterTest(unittest.TestCase): + def create_batch_converter(self): + return BatchConverter.from_typehints( + element_type=self.element_typehint, batch_type=self.batch_typehint) + + def setUp(self): + self.converter = self.create_batch_converter() + self.normalized_batch_typehint = typehints.normalize(self.batch_typehint) + self.normalized_element_typehint = typehints.normalize( + self.element_typehint) + + def equality_check(self, left, right): + if isinstance(left, pd.DataFrame): + if self.match_index: + pd.testing.assert_frame_equal(left.sort_index(), right.sort_index()) + else: + pd.testing.assert_frame_equal( + left.sort_values(by=list(left.columns)).reset_index(drop=True), + right.sort_values(by=list(right.columns)).reset_index(drop=True)) + elif isinstance(left, pd.Series): + pd.testing.assert_series_equal( + left.sort_values().reset_index(drop=True), + right.sort_values().reset_index(drop=True)) + else: + raise TypeError(f"Encountered unexpected type, left is a {type(left)!r}") + + def test_typehint_validates(self): + typehints.validate_composite_type_param(self.batch_typehint, '') + typehints.validate_composite_type_param(self.element_typehint, '') + + def test_type_check(self): + typehints.check_constraint(self.normalized_batch_typehint, self.batch) + + def test_type_check_element(self): + for element in self.converter.explode_batch(self.batch): + typehints.check_constraint(self.normalized_element_typehint, element) + + def test_explode_rebatch(self): + exploded = list(self.converter.explode_batch(self.batch)) + rebatched = self.converter.produce_batch(exploded) + + typehints.check_constraint(self.normalized_batch_typehint, rebatched) + self.equality_check(self.batch, rebatched) + + @parameterized.expand([ + (2, ), + (3, ), + (10, ), + ]) + def test_combine_batches(self, N): + elements = list(self.converter.explode_batch(self.batch)) + + # Split elements into N contiguous partitions, create a batch out of each + batches = [ + self.converter.produce_batch( + elements[len(elements) * i // N:len(elements) * (i + 1) // N]) + for i in range(N) + ] + + # Combine the batches, output should be equivalent to the original batch + combined = self.converter.combine_batches(batches) + + self.equality_check(self.batch, combined) + + def test_equals(self): + self.assertTrue(self.converter == self.create_batch_converter()) + self.assertTrue(self.create_batch_converter() == self.converter) + + def test_hash(self): + self.assertEqual(hash(self.create_batch_converter()), hash(self.converter)) + + +if __name__ == '__main__': + unittest.main() From 10bb964e4a588ef2a883e9da70232e47f748070f Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 12:03:55 -0700 Subject: [PATCH 03/10] Use Batched DoFns at DataFrame API boundaries --- sdks/python/apache_beam/dataframe/convert.py | 78 +++++++++++++++++++- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index 2b207dda7270d..c8bbb363b2d0f 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -15,9 +15,11 @@ # limitations under the License. import inspect +import warnings import weakref from typing import TYPE_CHECKING from typing import Any +from typing import Iterable from typing import Dict from typing import Tuple from typing import Union @@ -57,6 +59,9 @@ def to_dataframe( A proxy object must be given if the schema for the PCollection is not known. """ if proxy is None: + # TODO: Find a better way to ensure pandas BatchConverters are registered + import apache_beam.typehints.pandas_type_compatibility + if pcoll.element_type is None: raise ValueError( "Cannot infer a proxy because the input PCollection does not have a " @@ -69,7 +74,16 @@ def to_dataframe( # the name of these variables in the calling context. label = 'BatchElements(%s)' % _var_name(pcoll, 2) proxy = schemas.generate_proxy(pcoll.element_type) - pcoll = pcoll | label >> schemas.BatchRowsAsDataFrame(proxy=proxy) + + shim_dofn: beam.DoFn + if isinstance(proxy, pd.DataFrame): + shim_dofn = RowsToDataFrameFn() + elif isinstance(proxy, pd.Series): + shim_dofn = ElementsToSeriesFn() + else: + raise AssertionError("Unknown proxy type: %s" % proxy) + + pcoll = pcoll | label >> beam.ParDo(shim_dofn) return frame_base.DeferredFrame.wrap( expressions.PlaceholderExpression(proxy, pcoll)) @@ -86,6 +100,18 @@ def to_dataframe( ) # type: weakref.WeakValueDictionary[str, pvalue.PCollection] +class RowsToDataFrameFn(beam.DoFn): + @beam.DoFn.yields_elements + def process_batch(self, batch: pd.DataFrame) -> Iterable[pd.DataFrame]: + yield batch + + +class ElementsToSeriesFn(beam.DoFn): + @beam.DoFn.yields_elements + def process_batch(self, batch: pd.Series) -> Iterable[pd.Series]: + yield batch + + def _make_unbatched_pcoll( pc: pvalue.PCollection, expr: expressions.Expression, include_indexes: bool): @@ -94,14 +120,58 @@ def _make_unbatched_pcoll( label += " with indexes" if label not in UNBATCHED_CACHE: - UNBATCHED_CACHE[label] = pc | label >> schemas.UnbatchPandas( - expr.proxy(), include_indexes=include_indexes) + proxy = expr.proxy() + shim_dofn: beam.DoFn + if isinstance(proxy, pd.DataFrame): + shim_dofn = DataFrameToRowsFn(proxy, include_indexes) + elif isinstance(proxy, pd.Series): + if include_indexes: + warnings.warn( + "Pipeline is converting a DeferredSeries to PCollection " + "with include_indexes=True. Note that this parameter is " + "_not_ respected for DeferredSeries conversion. To " + "include the index with your data, produce a" + "DeferredDataFrame instead.") + + shim_dofn = SeriesToElementsFn(proxy) + else: + raise TypeError(f"Proxy '{proxy}' has unsupported type '{type(proxy)}'") + + UNBATCHED_CACHE[label] = pc | label >> beam.ParDo(shim_dofn) # Note unbatched cache is keyed by the expression id as well as parameters # for the unbatching (i.e. include_indexes) return UNBATCHED_CACHE[label] +class DataFrameToRowsFn(beam.DoFn): + def __init__(self, proxy, include_indexes): + self._proxy = proxy + self._include_indexes = include_indexes + + @beam.DoFn.yields_batches + def process(self, element: pd.DataFrame) -> Iterable[pd.DataFrame]: + yield element + + def infer_output_type(self, input_element_type): + return schemas.element_typehint_from_dataframe_proxy( + self._proxy, self._include_indexes) + + +class SeriesToElementsFn(beam.DoFn): + def __init__(self, proxy): + self._proxy = proxy + + @beam.DoFn.yields_batches + def process(self, element: pd.Series) -> Iterable[pd.Series]: + yield element + + def infer_output_type(self, input_element_type): + # Raise a TypeError if proxy has an unknown type + output_type = schemas._dtype_to_fieldtype(self._proxy.dtype) + return output_type + + # TODO: Or should this be called from_dataframe? @@ -209,6 +279,8 @@ def extract_input(placeholder): } if yield_elements == "schemas": + # TODO: Find a better way to ensure pandas BatchConverters are registered + import apache_beam.typehints.pandas_type_compatibility def maybe_unbatch(pc, value): if isinstance(value, frame_base._DeferredScalar): From ff31656e4d925f1deafceadec21313ceaaade99f Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 13:10:48 -0700 Subject: [PATCH 04/10] Move dtype conversion to pandas_type_compatibility --- sdks/python/apache_beam/dataframe/convert.py | 3 +- sdks/python/apache_beam/dataframe/schemas.py | 106 ++---------------- .../typehints/pandas_type_compatibility.py | 100 ++++++++++++++++- .../pandas_type_compatibility_test.py | 7 +- 4 files changed, 110 insertions(+), 106 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index c8bbb363b2d0f..141d18dd294f1 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -29,6 +29,7 @@ import apache_beam as beam from apache_beam import pvalue from apache_beam.dataframe import expressions +from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype from apache_beam.dataframe import frame_base from apache_beam.dataframe import schemas from apache_beam.dataframe import transforms @@ -168,7 +169,7 @@ def process(self, element: pd.Series) -> Iterable[pd.Series]: def infer_output_type(self, input_element_type): # Raise a TypeError if proxy has an unknown type - output_type = schemas._dtype_to_fieldtype(self._proxy.dtype) + output_type = dtype_to_fieldtype(self._proxy.dtype) return output_type diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py index 294840b3db2b6..4f2b8cdd8ed1a 100644 --- a/sdks/python/apache_beam/dataframe/schemas.py +++ b/sdks/python/apache_beam/dataframe/schemas.py @@ -15,37 +15,10 @@ # limitations under the License. # -r"""Utilities for relating schema-aware PCollections and dataframe transforms. - -Imposes a mapping between native Python typings (specifically those compatible -with :mod:`apache_beam.typehints.schemas`), and common pandas dtypes:: - - pandas dtype Python typing - np.int{8,16,32,64} <-----> np.int{8,16,32,64}* - pd.Int{8,16,32,64}Dtype <-----> Optional[np.int{8,16,32,64}]* - np.float{32,64} <-----> Optional[np.float{32,64}] - \--- np.float{32,64} - Not supported <------ Optional[bytes] - np.bool <-----> np.bool - np.dtype('S') <-----> bytes - pd.BooleanDType() <-----> Optional[bool] - pd.StringDType() <-----> Optional[str] - \--- str - np.object <-----> Any - - * int, float, bool are treated the same as np.int64, np.float64, np.bool - -Note that when converting to pandas dtypes, any types not specified here are -shunted to ``np.object``. - -Similarly when converting from pandas to Python types, types that aren't -otherwise specified here are shunted to ``Any``. Notably, this includes -``np.datetime64``. - -Pandas does not support hierarchical data natively. Currently, all structured -types (``Sequence``, ``Mapping``, nested ``NamedTuple`` types), are -shunted to ``np.object`` like all other unknown types. In the future these -types may be given special consideration. +"""Utilities for relating schema-aware PCollections and DataFrame transforms. + +The utilities here enforce the type mapping defined in +:mod:`apache_beam.typehints.pandas_type_compatibility`. """ # pytype: skip-file @@ -66,9 +39,10 @@ from apache_beam.portability.api import schema_pb2 from apache_beam.transforms.util import BatchElements from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.pandas_type_compatibility import dtype_from_typehint +from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.typehints.schemas import named_fields_from_element_type -from apache_beam.typehints.typehints import normalize from apache_beam.typehints.schemas import named_tuple_from_schema from apache_beam.typehints.schemas import named_tuple_to_schema from apache_beam.typehints.typehints import normalize @@ -82,53 +56,6 @@ T = TypeVar('T', bound=NamedTuple) -# Generate type map (presented visually in the docstring) -_BIDIRECTIONAL = [ - (bool, bool), - (np.int8, np.int8), - (np.int16, np.int16), - (np.int32, np.int32), - (np.int64, np.int64), - (pd.Int8Dtype(), Optional[np.int8]), - (pd.Int16Dtype(), Optional[np.int16]), - (pd.Int32Dtype(), Optional[np.int32]), - (pd.Int64Dtype(), Optional[np.int64]), - (np.float32, Optional[np.float32]), - (np.float64, Optional[np.float64]), - (object, Any), - (pd.StringDtype(), Optional[str]), - (pd.BooleanDtype(), Optional[bool]), -] - -PANDAS_TO_BEAM = { - pd.Series([], dtype=dtype).dtype: fieldtype - for dtype, - fieldtype in _BIDIRECTIONAL -} -BEAM_TO_PANDAS = {fieldtype: dtype for dtype, fieldtype in _BIDIRECTIONAL} - -# Shunt non-nullable Beam types to the same pandas types as their non-nullable -# equivalents for FLOATs, DOUBLEs, and STRINGs. pandas has no non-nullable dtype -# for these. -OPTIONAL_SHUNTS = [np.float32, np.float64, str] - -for typehint in OPTIONAL_SHUNTS: - BEAM_TO_PANDAS[typehint] = BEAM_TO_PANDAS[Optional[typehint]] - -# int, float -> int64, np.float64 -BEAM_TO_PANDAS[int] = BEAM_TO_PANDAS[np.int64] -BEAM_TO_PANDAS[Optional[int]] = BEAM_TO_PANDAS[Optional[np.int64]] -BEAM_TO_PANDAS[float] = BEAM_TO_PANDAS[np.float64] -BEAM_TO_PANDAS[Optional[float]] = BEAM_TO_PANDAS[Optional[np.float64]] - -BEAM_TO_PANDAS[bytes] = 'bytes' - -# Add shunts for normalized (Beam) typehints as well -BEAM_TO_PANDAS.update({ - normalize(typehint): pandas_dtype - for (typehint, pandas_dtype) in BEAM_TO_PANDAS.items() -}) - @typehints.with_input_types(T) @typehints.with_output_types(pd.DataFrame) @@ -157,12 +84,6 @@ def expand(self, pcoll): return pcoll | self._batch_elements_transform | beam.Map(construct) -def dtype_from_typehint(typehint): - # Default to np.object. This is lossy, we won't be able to recover - # the type at the output. - return BEAM_TO_PANDAS.get(typehint, object) - - def generate_proxy(element_type): # type: (type) -> pd.DataFrame @@ -233,7 +154,7 @@ def element_typehint_from_dataframe_proxy( output_columns.extend(zip(proxy.columns, proxy.dtypes)) - fields = [(column, _dtype_to_fieldtype(dtype)) + fields = [(column, dtype_to_fieldtype(dtype)) for (column, dtype) in output_columns] field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] if include_indexes: @@ -307,7 +228,7 @@ def _unbatch_transform(proxy, include_indexes): _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)) elif isinstance(proxy, pd.Series): # Raise a TypeError if proxy has an unknown type - output_type = _dtype_to_fieldtype(proxy.dtype) + output_type = dtype_to_fieldtype(proxy.dtype) # TODO: Should the index ever be included for a Series? if _match_is_optional(output_type): @@ -326,17 +247,6 @@ def unbatch(series): "Proxy '%s' has unsupported type '%s'" % (proxy, type(proxy))) -def _dtype_to_fieldtype(dtype): - fieldtype = PANDAS_TO_BEAM.get(dtype) - - if fieldtype is not None: - return fieldtype - elif dtype.kind == 'S': - return bytes - else: - return Any - - @typehints.with_input_types(Union[pd.DataFrame, pd.Series]) class UnbatchPandas(beam.PTransform): """A transform that explodes a PCollection of DataFrame or Series. DataFrame diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py index 1314ddf3b7d15..9ac1ddf18fcbe 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py @@ -17,14 +17,44 @@ """Utilities for converting between Beam schemas and pandas DataFrames. -For internal use only, no backward compatibility guarantees. +Imposes a mapping between native Python typings (specifically those compatible +with :mod:`apache_beam.typehints.schemas`), and common pandas dtypes:: + + pandas dtype Python typing + np.int{8,16,32,64} <-----> np.int{8,16,32,64}* + pd.Int{8,16,32,64}Dtype <-----> Optional[np.int{8,16,32,64}]* + np.float{32,64} <-----> Optional[np.float{32,64}] + \--- np.float{32,64} + Not supported <------ Optional[bytes] + np.bool <-----> np.bool + np.dtype('S') <-----> bytes + pd.BooleanDType() <-----> Optional[bool] + pd.StringDType() <-----> Optional[str] + \--- str + np.object <-----> Any + + * int, float, bool are treated the same as np.int64, np.float64, np.bool + +Note that when converting to pandas dtypes, any types not specified here are +shunted to ``np.object``. + +Similarly when converting from pandas to Python types, types that aren't +otherwise specified here are shunted to ``Any``. Notably, this includes +``np.datetime64``. + +Pandas does not support hierarchical data natively. Currently, all structured +types (``Sequence``, ``Mapping``, nested ``NamedTuple`` types), are +shunted to ``np.object`` like all other unknown types. In the future these +types may be given special consideration. + +Note utilities in this package are for internal use only, we make no backward +compatibility guarantees, except for the type mapping itself. """ from apache_beam.typehints.batch import BatchConverter from apache_beam.typehints.typehints import is_nullable +from apache_beam.typehints.typehints import normalize from apache_beam.typehints.row_type import RowTypeConstraint -from apache_beam.dataframe.schemas import dtype_from_typehint -from apache_beam.dataframe.schemas import generate_proxy import pandas as pd import numpy as np @@ -39,6 +69,70 @@ # a field should map to an index in the Beam DataFrame API. INDEX_OPTION_NAME = 'beam:dataframe:index' +# Generate type map (presented visually in the docstring) +_BIDIRECTIONAL = [ + (bool, bool), + (np.int8, np.int8), + (np.int16, np.int16), + (np.int32, np.int32), + (np.int64, np.int64), + (pd.Int8Dtype(), Optional[np.int8]), + (pd.Int16Dtype(), Optional[np.int16]), + (pd.Int32Dtype(), Optional[np.int32]), + (pd.Int64Dtype(), Optional[np.int64]), + (np.float32, Optional[np.float32]), + (np.float64, Optional[np.float64]), + (object, Any), + (pd.StringDtype(), Optional[str]), + (pd.BooleanDtype(), Optional[bool]), +] + +PANDAS_TO_BEAM = { + pd.Series([], dtype=dtype).dtype: fieldtype + for dtype, + fieldtype in _BIDIRECTIONAL +} +BEAM_TO_PANDAS = {fieldtype: dtype for dtype, fieldtype in _BIDIRECTIONAL} + +# Shunt non-nullable Beam types to the same pandas types as their non-nullable +# equivalents for FLOATs, DOUBLEs, and STRINGs. pandas has no non-nullable dtype +# for these. +OPTIONAL_SHUNTS = [np.float32, np.float64, str] + +for typehint in OPTIONAL_SHUNTS: + BEAM_TO_PANDAS[typehint] = BEAM_TO_PANDAS[Optional[typehint]] + +# int, float -> int64, np.float64 +BEAM_TO_PANDAS[int] = BEAM_TO_PANDAS[np.int64] +BEAM_TO_PANDAS[Optional[int]] = BEAM_TO_PANDAS[Optional[np.int64]] +BEAM_TO_PANDAS[float] = BEAM_TO_PANDAS[np.float64] +BEAM_TO_PANDAS[Optional[float]] = BEAM_TO_PANDAS[Optional[np.float64]] + +BEAM_TO_PANDAS[bytes] = 'bytes' + +# Add shunts for normalized (Beam) typehints as well +BEAM_TO_PANDAS.update({ + normalize(typehint): pandas_dtype + for (typehint, pandas_dtype) in BEAM_TO_PANDAS.items() +}) + + +def dtype_from_typehint(typehint): + # Default to np.object. This is lossy, we won't be able to recover + # the type at the output. + return BEAM_TO_PANDAS.get(typehint, object) + + +def dtype_to_fieldtype(dtype): + fieldtype = PANDAS_TO_BEAM.get(dtype) + + if fieldtype is not None: + return fieldtype + elif dtype.kind == 'S': + return bytes + else: + return Any + class DataFrameBatchConverter(BatchConverter): def __init__( diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py index 5d629ebca9a90..ab5b5a1c5dc84 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py @@ -17,18 +17,17 @@ """Unit tests for pandas batched type converters.""" -from typing import Optional import unittest +from typing import Optional -import pandas as pd import numpy as np +import pandas as pd from parameterized import parameterized from parameterized import parameterized_class -from apache_beam.typehints import typehints from apache_beam.typehints import row_type +from apache_beam.typehints import typehints from apache_beam.typehints.batch import BatchConverter -from apache_beam.typehints.pandas_type_compatibility import DataFrameBatchConverter @parameterized_class([ From 91018e13f49c9e8b36d88c3c1343a7be040488a7 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 11:55:05 -0700 Subject: [PATCH 05/10] Always register pandas BatchConverters --- sdks/python/apache_beam/dataframe/convert.py | 9 ++------ sdks/python/apache_beam/typehints/__init__.py | 9 ++++++++ sdks/python/apache_beam/typehints/batch.py | 2 ++ .../typehints/pandas_type_compatibility.py | 22 ++++++++++--------- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index 141d18dd294f1..820e9a2531bd0 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -19,8 +19,8 @@ import weakref from typing import TYPE_CHECKING from typing import Any -from typing import Iterable from typing import Dict +from typing import Iterable from typing import Tuple from typing import Union @@ -29,10 +29,10 @@ import apache_beam as beam from apache_beam import pvalue from apache_beam.dataframe import expressions -from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype from apache_beam.dataframe import frame_base from apache_beam.dataframe import schemas from apache_beam.dataframe import transforms +from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype if TYPE_CHECKING: # pylint: disable=ungrouped-imports @@ -60,9 +60,6 @@ def to_dataframe( A proxy object must be given if the schema for the PCollection is not known. """ if proxy is None: - # TODO: Find a better way to ensure pandas BatchConverters are registered - import apache_beam.typehints.pandas_type_compatibility - if pcoll.element_type is None: raise ValueError( "Cannot infer a proxy because the input PCollection does not have a " @@ -280,8 +277,6 @@ def extract_input(placeholder): } if yield_elements == "schemas": - # TODO: Find a better way to ensure pandas BatchConverters are registered - import apache_beam.typehints.pandas_type_compatibility def maybe_unbatch(pc, value): if isinstance(value, frame_base._DeferredScalar): diff --git a/sdks/python/apache_beam/typehints/__init__.py b/sdks/python/apache_beam/typehints/__init__.py index e89afa1285a76..46a8579c6c653 100644 --- a/sdks/python/apache_beam/typehints/__init__.py +++ b/sdks/python/apache_beam/typehints/__init__.py @@ -20,3 +20,12 @@ # pylint: disable=wildcard-import from apache_beam.typehints.typehints import * from apache_beam.typehints.decorators import * +from apache_beam.typehints.batch import * + +# pylint: disable=ungrouped-imports +try: + import pandas as _ +except ImportError: + pass +else: + from apache_beam.typehints.pandas_type_compatibility import * diff --git a/sdks/python/apache_beam/typehints/batch.py b/sdks/python/apache_beam/typehints/batch.py index 4df4ea9dd9782..322ba717a05c5 100644 --- a/sdks/python/apache_beam/typehints/batch.py +++ b/sdks/python/apache_beam/typehints/batch.py @@ -44,6 +44,8 @@ BATCH_CONVERTER_REGISTRY: List[Callable[[type, type], 'BatchConverter']] = [] +__all__ = ['BatchConverter'] + class BatchConverter(Generic[B, E]): def __init__(self, batch_type, element_type): diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py index 9ac1ddf18fcbe..ae4bc20337525 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Utilities for converting between Beam schemas and pandas DataFrames. +r"""Utilities for converting between Beam schemas and pandas DataFrames. Imposes a mapping between native Python typings (specifically those compatible with :mod:`apache_beam.typehints.schemas`), and common pandas dtypes:: @@ -51,19 +51,21 @@ compatibility guarantees, except for the type mapping itself. """ +from typing import Any +from typing import List +from typing import Optional + +import numpy as np +import pandas as pd + from apache_beam.typehints.batch import BatchConverter +from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.typehints.typehints import is_nullable from apache_beam.typehints.typehints import normalize -from apache_beam.typehints.row_type import RowTypeConstraint - -import pandas as pd -import numpy as np -from typing import Any -from typing import Optional -from typing import List -from typing import Tuple -from typing import Dict +# No public API currently, this just exists to register BatchConverter +# implementations. +__all__ = [] # Name for a valueless field-level option which, when present, indicates that # a field should map to an index in the Beam DataFrame API. From 5be9d197e49c4ccc2f43508182067b7803fc5d24 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 5 Aug 2022 11:57:50 -0700 Subject: [PATCH 06/10] Fix interactive runner tests --- .../runners/interactive/interactive_runner_test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index d5bc475c92191..e1279c0869fed 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -351,7 +351,9 @@ def test_dataframes_with_multi_index(self): aggregate = lambda df: df.groupby(['name', 'height']).mean() deferred_df = aggregate(to_dataframe(p | beam.Create(data))) - df_expected = aggregate(pd.DataFrame(data)) + df_input = pd.DataFrame(data) + df_input.name = df_input.name.astype(pd.StringDtype()) + df_expected = aggregate(df_input) # Watch the local scope for Interactive Beam so that values will be cached. ib.watch(locals()) @@ -378,7 +380,9 @@ def test_dataframes_with_multi_index_get_result(self): aggregate = lambda df: df.groupby(['name', 'height']).mean()['age'] deferred_df = aggregate(to_dataframe(p | beam.Create(data))) - df_expected = aggregate(pd.DataFrame(data)) + df_input = pd.DataFrame(data) + df_input.name = df_input.name.astype(pd.StringDtype()) + df_expected = aggregate(df_input) # Watch the local scope for Interactive Beam so that values will be cached. ib.watch(locals()) From 865b23bc6e820d6ba0e8abf351b1875bc7a1e7ac Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Mon, 8 Aug 2022 16:32:51 -0700 Subject: [PATCH 07/10] Use pandas_type_compatibility BatchConverters for dataframe.schemas utilities --- sdks/python/apache_beam/dataframe/convert.py | 7 +- sdks/python/apache_beam/dataframe/schemas.py | 131 ++++++------------ .../apache_beam/dataframe/schemas_test.py | 29 +++- .../typehints/pandas_type_compatibility.py | 18 ++- 4 files changed, 80 insertions(+), 105 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index 820e9a2531bd0..138a1b2f93320 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -30,8 +30,9 @@ from apache_beam import pvalue from apache_beam.dataframe import expressions from apache_beam.dataframe import frame_base -from apache_beam.dataframe import schemas from apache_beam.dataframe import transforms +from apache_beam.dataframe.schemas import element_typehint_from_dataframe_proxy +from apache_beam.dataframe.schemas import generate_proxy from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype if TYPE_CHECKING: @@ -71,7 +72,7 @@ def to_dataframe( # Attempt to come up with a reasonable, stable label by retrieving # the name of these variables in the calling context. label = 'BatchElements(%s)' % _var_name(pcoll, 2) - proxy = schemas.generate_proxy(pcoll.element_type) + proxy = generate_proxy(pcoll.element_type) shim_dofn: beam.DoFn if isinstance(proxy, pd.DataFrame): @@ -152,7 +153,7 @@ def process(self, element: pd.DataFrame) -> Iterable[pd.DataFrame]: yield element def infer_output_type(self, input_element_type): - return schemas.element_typehint_from_dataframe_proxy( + return element_typehint_from_dataframe_proxy( self._proxy, self._include_indexes) diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py index 4f2b8cdd8ed1a..6356945e05f9c 100644 --- a/sdks/python/apache_beam/dataframe/schemas.py +++ b/sdks/python/apache_beam/dataframe/schemas.py @@ -23,6 +23,7 @@ # pytype: skip-file +import warnings from typing import Any from typing import Dict from typing import NamedTuple @@ -36,17 +37,14 @@ import apache_beam as beam from apache_beam import typehints -from apache_beam.portability.api import schema_pb2 from apache_beam.transforms.util import BatchElements -from apache_beam.typehints.native_type_compatibility import _match_is_optional +from apache_beam.typehints.pandas_type_compatibility import INDEX_OPTION_NAME +from apache_beam.typehints.pandas_type_compatibility import create_pandas_batch_converter from apache_beam.typehints.pandas_type_compatibility import dtype_from_typehint from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.typehints.schemas import named_fields_from_element_type -from apache_beam.typehints.schemas import named_tuple_from_schema -from apache_beam.typehints.schemas import named_tuple_to_schema from apache_beam.typehints.typehints import normalize -from apache_beam.utils import proto_utils __all__ = ( 'BatchRowsAsDataFrame', @@ -70,18 +68,21 @@ def __init__(self, *args, proxy=None, **kwargs): self._proxy = proxy def expand(self, pcoll): - proxy = generate_proxy( - pcoll.element_type) if self._proxy is None else self._proxy - if isinstance(proxy, pd.DataFrame): - columns = proxy.columns - construct = lambda batch: pd.DataFrame.from_records( - batch, columns=columns) - elif isinstance(proxy, pd.Series): - dtype = proxy.dtype - construct = lambda batch: pd.Series(batch, dtype=dtype) + if self._proxy is not None: + # Generate typehint + proxy = self._proxy + element_typehint = _element_typehint_from_proxy(proxy) else: - raise NotImplementedError("Unknown proxy type: %s" % proxy) - return pcoll | self._batch_elements_transform | beam.Map(construct) + # Generate proxy + proxy = generate_proxy(pcoll.element_type) + element_typehint = pcoll.element_type + + converter = create_pandas_batch_converter( + element_type=element_typehint, batch_type=type(proxy)) + + return ( + pcoll | self._batch_elements_transform + | beam.Map(converter.produce_batch)) def generate_proxy(element_type): @@ -118,6 +119,22 @@ def element_type_from_dataframe(proxy, include_indexes=False): return element_typehint_from_dataframe_proxy(proxy, include_indexes).user_type +def _element_typehint_from_proxy( + proxy: pd.core.generic.NDFrame, include_indexes: bool = False): + if isinstance(proxy, pd.DataFrame): + return element_typehint_from_dataframe_proxy( + proxy, include_indexes=include_indexes) + elif isinstance(proxy, pd.Series): + if include_indexes: + warnings.warn( + "include_indexes=True for a Series input. Note that this " + "parameter is _not_ respected for DeferredSeries " + "conversion.") + return dtype_to_fieldtype(proxy.dtype) + else: + raise TypeError(f"Proxy '{proxy}' has unsupported type '{type(proxy)}'") + + def element_typehint_from_dataframe_proxy( proxy: pd.DataFrame, include_indexes: bool = False) -> RowTypeConstraint: @@ -159,8 +176,7 @@ def element_typehint_from_dataframe_proxy( field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] if include_indexes: field_options = { - # TODO: Reference the constant in pandas_type_compatibility - index_name: [('beam:dataframe:index', None)] + index_name: [(INDEX_OPTION_NAME, None)] for index_name in proxy.index.names } else: @@ -169,82 +185,15 @@ def element_typehint_from_dataframe_proxy( return RowTypeConstraint.from_fields(fields, field_options=field_options) -class _BaseDataframeUnbatchDoFn(beam.DoFn): - def __init__(self, namedtuple_ctor): - self._namedtuple_ctor = namedtuple_ctor - - def _get_series(self, df): - raise NotImplementedError() - - def process(self, df): - # TODO: Only do null checks for nullable types - def make_null_checking_generator(series): - nulls = pd.isnull(series) - return (None if isnull else value for isnull, value in zip(nulls, series)) - - all_series = self._get_series(df) - iterators = [ - make_null_checking_generator(series) for series, - typehint in zip(all_series, self._namedtuple_ctor.__annotations__) - ] - - # TODO: Avoid materializing the rows. Produce an object that references the - # underlying dataframe - for values in zip(*iterators): - yield self._namedtuple_ctor(*values) - - def infer_output_type(self, input_type): - return self._namedtuple_ctor - - @classmethod - def _from_serialized_schema(cls, schema_str): - return cls( - named_tuple_from_schema( - proto_utils.parse_Bytes(schema_str, schema_pb2.Schema))) - - def __reduce__(self): - # when pickling, use bytes representation of the schema. - return ( - self._from_serialized_schema, - (named_tuple_to_schema(self._namedtuple_ctor).SerializeToString(), )) - - -class _UnbatchNoIndex(_BaseDataframeUnbatchDoFn): - def _get_series(self, df): - return [df[column] for column in df.columns] - - -class _UnbatchWithIndex(_BaseDataframeUnbatchDoFn): - def _get_series(self, df): - return [df.index.get_level_values(i) for i in range(len(df.index.names)) - ] + [df[column] for column in df.columns] - - def _unbatch_transform(proxy, include_indexes): - if isinstance(proxy, pd.DataFrame): - ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes) - - return beam.ParDo( - _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)) - elif isinstance(proxy, pd.Series): - # Raise a TypeError if proxy has an unknown type - output_type = dtype_to_fieldtype(proxy.dtype) - # TODO: Should the index ever be included for a Series? - if _match_is_optional(output_type): - - def unbatch(series): - for isnull, value in zip(pd.isnull(series), series): - yield None if isnull else value - else: + element_typehint = normalize( + _element_typehint_from_proxy(proxy, include_indexes=include_indexes)) - def unbatch(series): - yield from series + converter = create_pandas_batch_converter( + element_type=element_typehint, batch_type=type(proxy)) - return beam.FlatMap(unbatch).with_output_types(output_type) - # TODO: What about scalar inputs? - else: - raise TypeError( - "Proxy '%s' has unsupported type '%s'" % (proxy, type(proxy))) + return beam.FlatMap( + converter.explode_batch).with_output_types(element_typehint) @typehints.with_input_types(Union[pd.DataFrame, pd.Series]) diff --git a/sdks/python/apache_beam/dataframe/schemas_test.py b/sdks/python/apache_beam/dataframe/schemas_test.py index ec0c466fa859b..699649a3e9358 100644 --- a/sdks/python/apache_beam/dataframe/schemas_test.py +++ b/sdks/python/apache_beam/dataframe/schemas_test.py @@ -34,6 +34,7 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.typehints import row_type from apache_beam.typehints import typehints from apache_beam.typehints.native_type_compatibility import match_is_named_tuple @@ -52,10 +53,7 @@ def check_df_pcoll_equal(actual): drop=True) sorted_expected = expected.sort_values( by=list(expected.columns)).reset_index(drop=True) - if not sorted_actual.equals(sorted_expected): - raise AssertionError( - 'Dataframes not equal: \n\nActual:\n%s\n\nExpected:\n%s' % - (sorted_actual, sorted_expected)) + pd.testing.assert_frame_equal(sorted_actual, sorted_expected) return check_df_pcoll_equal @@ -145,6 +143,8 @@ def test_simple_df(self): }, columns=['name', 'id', 'height']) + expected.name = expected.name.astype(pd.StringDtype()) + with TestPipeline() as p: res = ( p @@ -160,6 +160,7 @@ def test_simple_df_with_beam_row(self): 'height': list(float(i) for i in range(5)) }, columns=['name', 'id', 'height']) + expected.name = expected.name.astype(pd.StringDtype()) with TestPipeline() as p: res = ( @@ -242,8 +243,14 @@ def test_batch_with_df_transform(self): assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)])) def assert_typehints_equal(self, left, right): - left = typehints.normalize(left) - right = typehints.normalize(right) + def maybe_drop_rowtypeconstraint(typehint): + if isinstance(typehint, row_type.RowTypeConstraint): + return typehint.user_type + else: + return typehint + + left = maybe_drop_rowtypeconstraint(typehints.normalize(left)) + right = maybe_drop_rowtypeconstraint(typehints.normalize(right)) if match_is_named_tuple(left): self.assertTrue(match_is_named_tuple(right)) @@ -280,6 +287,16 @@ def test_unbatch_with_index(self, df_or_series, rows, _): assert_that(res, equal_to(rows)) + @parameterized.expand(SERIES_TESTS, name_func=test_name_func) + def test_unbatch_series_with_index_warns( + self, series, unused_rows, unused_type): + proxy = series[:0] + + with TestPipeline() as p: + input_pc = p | beam.Create([series[::2], series[1::2]]) + with self.assertWarns(UserWarning): + _ = input_pc | schemas.UnbatchPandas(proxy, include_indexes=True) + def test_unbatch_include_index_unnamed_index_raises(self): df = pd.DataFrame({'foo': [1, 2, 3, 4]}) proxy = df[:0] diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py index ae4bc20337525..a966205dcc18d 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py @@ -136,6 +136,19 @@ def dtype_to_fieldtype(dtype): return Any +@BatchConverter.register +def create_pandas_batch_converter( + element_type: type, batch_type: type) -> BatchConverter: + if batch_type == pd.DataFrame: + return DataFrameBatchConverter.from_typehints( + element_type=element_type, batch_type=batch_type) + elif batch_type == pd.Series: + return SeriesBatchConverter.from_typehints( + element_type=element_type, batch_type=batch_type) + + return None + + class DataFrameBatchConverter(BatchConverter): def __init__( self, @@ -145,7 +158,6 @@ def __init__( self._columns = [name for name, _ in element_type._fields] @staticmethod - @BatchConverter.register def from_typehints(element_type, batch_type) -> Optional['DataFrameBatchConverter']: if not batch_type == pd.DataFrame: @@ -261,16 +273,12 @@ def unbatch(series): self.explode_batch = unbatch @staticmethod - @BatchConverter.register def from_typehints(element_type, batch_type) -> Optional['SeriesBatchConverter']: if not batch_type == pd.Series: return None dtype = dtype_from_typehint(element_type) - if dtype == np.object: - # Don't create Any <-> Series[np.object] mapping - return None return SeriesBatchConverter(element_type, dtype) From 953beeb544b77a34115ea2990874cc2f7ec5b8db Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 12 Aug 2022 15:37:42 -0700 Subject: [PATCH 08/10] Skip test cases broken in pandas 1.1.x --- sdks/python/apache_beam/dataframe/schemas_test.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/schemas_test.py b/sdks/python/apache_beam/dataframe/schemas_test.py index 699649a3e9358..ed0ba6b342af7 100644 --- a/sdks/python/apache_beam/dataframe/schemas_test.py +++ b/sdks/python/apache_beam/dataframe/schemas_test.py @@ -117,7 +117,8 @@ def check_df_pcoll_equal(actual): NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT, BEAM_SCHEMA)] -PD_VERSION = tuple(int(n) for n in pd.__version__.split('.')) +# Get major, minor, bugfix version +PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:3])) def test_name_func(testcase_func, param_num, params): @@ -280,6 +281,12 @@ def test_unbatch_no_index(self, df_or_series, rows, beam_type): def test_unbatch_with_index(self, df_or_series, rows, _): proxy = df_or_series[:0] + if (PD_VERSION < (1, 2) and + set(['i32_nullable', 'i64_nullable']).intersection(proxy.index.names)): + self.skipTest( + "pandas<1.2 incorrectly changes Int64Dtype to int64 when " + "moved to index.") + with TestPipeline() as p: res = ( p | beam.Create([df_or_series[::2], df_or_series[1::2]]) From 54c5a62e60a1e2888862bfdc14bfb0d0a8e3db6e Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Mon, 29 Aug 2022 16:13:31 -0700 Subject: [PATCH 09/10] Address review comments --- sdks/python/apache_beam/dataframe/convert.py | 4 +- .../typehints/pandas_type_compatibility.py | 4 +- .../pandas_type_compatibility_test.py | 38 ++++++++++++++----- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index 138a1b2f93320..96d0c4f8b9f5e 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -166,9 +166,7 @@ def process(self, element: pd.Series) -> Iterable[pd.Series]: yield element def infer_output_type(self, input_element_type): - # Raise a TypeError if proxy has an unknown type - output_type = dtype_to_fieldtype(self._proxy.dtype) - return output_type + return dtype_to_fieldtype(self._proxy.dtype) # TODO: Or should this be called from_dataframe? diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py index a966205dcc18d..a143f9c4ef379 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility.py @@ -183,7 +183,8 @@ def _get_series(self, batch: pd.DataFrame): raise NotImplementedError def explode_batch(self, batch: pd.DataFrame): - # TODO: Only do null checks for nullable types + # TODO(https://github.com/apache/beam/issues/22948): Only do null checks for + # nullable types def make_null_checking_generator(series): nulls = pd.isnull(series) return (None if isnull else value for isnull, value in zip(nulls, series)) @@ -216,7 +217,6 @@ def _get_series(self, batch: pd.DataFrame): return [batch[column] for column in batch.columns] def produce_batch(self, elements): - # Note from_records has an index= parameter batch = pd.DataFrame.from_records(elements, columns=self._columns) for column, typehint in self._element_type._fields: diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py index ab5b5a1c5dc84..8918ad09bcaa1 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py @@ -145,7 +145,7 @@ def test_typehint_validates(self): typehints.validate_composite_type_param(self.batch_typehint, '') typehints.validate_composite_type_param(self.element_typehint, '') - def test_type_check(self): + def test_type_check_batch(self): typehints.check_constraint(self.normalized_batch_typehint, self.batch) def test_type_check_element(self): @@ -159,26 +159,46 @@ def test_explode_rebatch(self): typehints.check_constraint(self.normalized_batch_typehint, rebatched) self.equality_check(self.batch, rebatched) + def _split_batch_into_n_partitions(self, N): + elements = list(self.converter.explode_batch(self.batch)) + + # Split elements into N contiguous partitions + element_batches = [ + elements[len(elements) * i // N:len(elements) * (i + 1) // N] + for i in range(N) + ] + + lengths = [len(element_batch) for element_batch in element_batches] + batches = [self.converter.produce_batch(element_batch) + for element_batch in element_batches] + + return batches, lengths + @parameterized.expand([ (2, ), (3, ), (10, ), ]) def test_combine_batches(self, N): - elements = list(self.converter.explode_batch(self.batch)) - - # Split elements into N contiguous partitions, create a batch out of each - batches = [ - self.converter.produce_batch( - elements[len(elements) * i // N:len(elements) * (i + 1) // N]) - for i in range(N) - ] + batches, _ = self._split_batch_into_n_partitions(N) # Combine the batches, output should be equivalent to the original batch combined = self.converter.combine_batches(batches) self.equality_check(self.batch, combined) + @parameterized.expand([ + (2, ), + (3, ), + (10, ), + ]) + def test_get_lenth(self, N): + batches, lengths = self._split_batch_into_n_partitions(N) + + for batch, expected_length in zip(batches, lengths): + self.assertEqual(self.converter.get_length(batch), expected_length) + + def test_equals(self): self.assertTrue(self.converter == self.create_batch_converter()) self.assertTrue(self.create_batch_converter() == self.converter) From c088431735c8341f4073d47776c2634d9d28ab49 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Tue, 30 Aug 2022 15:00:07 -0700 Subject: [PATCH 10/10] yapf, typo in test --- .../typehints/pandas_type_compatibility_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py index 8918ad09bcaa1..0ee9b1178a9ba 100644 --- a/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py +++ b/sdks/python/apache_beam/typehints/pandas_type_compatibility_test.py @@ -169,8 +169,10 @@ def _split_batch_into_n_partitions(self, N): ] lengths = [len(element_batch) for element_batch in element_batches] - batches = [self.converter.produce_batch(element_batch) - for element_batch in element_batches] + batches = [ + self.converter.produce_batch(element_batch) + for element_batch in element_batches + ] return batches, lengths @@ -192,13 +194,12 @@ def test_combine_batches(self, N): (3, ), (10, ), ]) - def test_get_lenth(self, N): + def test_get_length(self, N): batches, lengths = self._split_batch_into_n_partitions(N) for batch, expected_length in zip(batches, lengths): self.assertEqual(self.converter.get_length(batch), expected_length) - def test_equals(self): self.assertTrue(self.converter == self.create_batch_converter()) self.assertTrue(self.create_batch_converter() == self.converter)