From f59e1ec1db57c22f9912fb65c192337032938076 Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Wed, 2 Jun 2021 20:17:44 +0800 Subject: [PATCH] [FLINK-22865][python] Optimize state serialize/deserialize in PyFlink --- .../pyflink/datastream/data_stream.py | 2 +- .../beam/beam_coder_impl_fast.pyx | 8 +- .../fn_execution/beam/beam_coder_impl_slow.py | 5 +- .../pyflink/fn_execution/beam/beam_coders.py | 29 +----- .../beam/beam_operations_fast.pyx | 2 +- .../{beam_stream.pxd => beam_stream_fast.pxd} | 0 .../{beam_stream.pyx => beam_stream_fast.pyx} | 0 .../fn_execution/beam/beam_stream_slow.py | 35 ++++++++ .../pyflink/fn_execution/coder_impl_fast.pxd | 9 +- .../pyflink/fn_execution/coder_impl_fast.pyx | 45 +++++++++- .../pyflink/fn_execution/coder_impl_slow.py | 54 ++++++++++- flink-python/pyflink/fn_execution/coders.py | 90 ++++++++++++++++++- .../datastream/runtime_context.py | 25 ++++-- .../pyflink/fn_execution/operations.py | 4 +- .../pyflink/fn_execution/state_impl.py | 2 + .../pyflink/fn_execution/stream_slow.py | 3 +- .../fn_execution/table/aggregate_fast.pyx | 9 +- .../fn_execution/table/aggregate_slow.py | 9 +- .../fn_execution/table/state_data_view.py | 4 +- .../table/window_aggregate_fast.pyx | 6 +- .../table/window_aggregate_slow.py | 6 +- .../fn_execution/table/window_context.py | 15 +++- flink-python/setup.py | 8 +- 23 files changed, 285 insertions(+), 85 deletions(-) rename flink-python/pyflink/fn_execution/beam/{beam_stream.pxd => beam_stream_fast.pxd} (100%) rename flink-python/pyflink/fn_execution/beam/{beam_stream.pyx => beam_stream_fast.pyx} (100%) create mode 100644 flink-python/pyflink/fn_execution/beam/beam_stream_slow.py diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 660522791f20ad..8b915e7ca86d32 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -1136,7 +1136,7 @@ def get_execution_environment(self): return self._keyed_stream.get_execution_environment() def get_input_type(self): - return self._keyed_stream.get_type() + return _from_java_type(self._keyed_stream._original_data_type_info.get_java_type_info()) def trigger(self, trigger: Trigger): """ diff --git a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx index 551ae0fcab102b..da8da4cf9143a7 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx +++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx @@ -25,7 +25,7 @@ from apache_beam.coders.coder_impl cimport InputStream as BInputStream from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream from apache_beam.coders.coder_impl cimport StreamCoderImpl -from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream +from pyflink.fn_execution.beam.beam_stream_fast cimport BeamInputStream from pyflink.fn_execution.stream_fast cimport InputStream cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl): @@ -59,9 +59,11 @@ cdef class PassThroughPrefixCoderImpl(StreamCoderImpl): # create InputStream data_input_stream = InputStream() data_input_stream._input_data = in_stream.allc - in_stream.pos = size + data_input_stream._input_pos = in_stream.pos - return self._value_coder.decode_from_stream(data_input_stream, size) + result = self._value_coder.decode_from_stream(data_input_stream, size) + in_stream.pos = data_input_stream._input_pos + return result cdef void _write_data_output_stream(self, BOutputStream out_stream): cdef OutputStream data_out_stream diff --git a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py index 8e5a699fc3cbe0..cf67708b90e590 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py +++ b/flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py @@ -19,7 +19,8 @@ from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream, create_OutputStream -from pyflink.fn_execution.stream_slow import OutputStream, InputStream +from pyflink.fn_execution.stream_slow import OutputStream +from pyflink.fn_execution.beam.beam_stream_slow import BeamInputStream class PassThroughLengthPrefixCoderImpl(StreamCoderImpl): @@ -50,7 +51,7 @@ def encode_to_stream(self, value, out_stream: create_OutputStream, nested): self._data_output_stream.clear() def decode_from_stream(self, in_stream: create_InputStream, nested): - data_input_stream = InputStream(in_stream.read_all(False)) + data_input_stream = BeamInputStream(in_stream) return self._value_coder.decode_from_stream(data_input_stream) def __repr__(self): diff --git a/flink-python/pyflink/fn_execution/beam/beam_coders.py b/flink-python/pyflink/fn_execution/beam/beam_coders.py index 6d7962ec86f6c9..53009623d2f61a 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_coders.py +++ b/flink-python/pyflink/fn_execution/beam/beam_coders.py @@ -15,10 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import pickle -from typing import Any - -from apache_beam.coders import Coder, coder_impl +from apache_beam.coders import Coder from apache_beam.coders.coders import FastCoder, LengthPrefixCoder from apache_beam.portability import common_urns from apache_beam.typehints import typehints @@ -93,27 +90,3 @@ def __ne__(self, other): def __hash__(self): return hash(self._internal_coder) - - -class DataViewFilterCoder(FastCoder): - - def to_type_hint(self): - return Any - - def __init__(self, udf_data_view_specs): - self._udf_data_view_specs = udf_data_view_specs - - def filter_data_views(self, row): - i = 0 - for specs in self._udf_data_view_specs: - for spec in specs: - row[i][spec.field_index] = None - i += 1 - return row - - def _create_impl(self): - filter_data_views = self.filter_data_views - dumps = pickle.dumps - HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL - return coder_impl.CallbackCoderImpl( - lambda x: dumps(filter_data_views(x), HIGHEST_PROTOCOL), pickle.loads) diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx index c3b69dd626f052..2adbee920bc637 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx +++ b/flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx @@ -23,7 +23,7 @@ from libc.stdint cimport * from apache_beam.utils.windowed_value cimport WindowedValue from pyflink.fn_execution.coder_impl_fast cimport LengthPrefixBaseCoderImpl -from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream, BeamOutputStream +from pyflink.fn_execution.beam.beam_stream_fast cimport BeamInputStream, BeamOutputStream from pyflink.fn_execution.beam.beam_coder_impl_fast cimport InputStreamWrapper, BeamCoderImpl from pyflink.fn_execution.operations import BundleOperation diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream.pxd b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd similarity index 100% rename from flink-python/pyflink/fn_execution/beam/beam_stream.pxd rename to flink-python/pyflink/fn_execution/beam/beam_stream_fast.pxd diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream.pyx b/flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx similarity index 100% rename from flink-python/pyflink/fn_execution/beam/beam_stream.pyx rename to flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx diff --git a/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py new file mode 100644 index 00000000000000..8dff78152a4b60 --- /dev/null +++ b/flink-python/pyflink/fn_execution/beam/beam_stream_slow.py @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from apache_beam.coders.coder_impl import create_InputStream + +from pyflink.fn_execution.stream_slow import InputStream + + +class BeamInputStream(InputStream): + def __init__(self, input_stream: create_InputStream): + super(BeamInputStream, self).__init__([]) + self._input_stream = input_stream + + def read(self, size): + return self._input_stream.read(size) + + def read_byte(self): + return self._input_stream.read_byte() + + def size(self): + return self._input_stream.size() diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd index a5260d42358f63..f4674ca49b5f9f 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_fast.pxd +++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pxd @@ -127,7 +127,10 @@ cdef class TimestampCoderImpl(FieldCoderImpl): cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl): cdef object _timezone -cdef class PickledBytesCoderImpl(FieldCoderImpl): +cdef class CloudPickleCoderImpl(FieldCoderImpl): + pass + +cdef class PickleCoderImpl(FieldCoderImpl): pass cdef class BasicArrayCoderImpl(FieldCoderImpl): @@ -149,3 +152,7 @@ cdef class TimeWindowCoderImpl(FieldCoderImpl): cdef class CountWindowCoderImpl(FieldCoderImpl): pass + +cdef class DataViewFilterCoderImpl(FieldCoderImpl): + cdef object _udf_data_view_specs + cdef PickleCoderImpl _pickle_coder diff --git a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx index 64218455e64e73..c38da3ce3d7e81 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_fast.pyx +++ b/flink-python/pyflink/fn_execution/coder_impl_fast.pyx @@ -24,6 +24,7 @@ from libc.stdlib cimport free, malloc import datetime import decimal +import pickle from typing import List, Union from cloudpickle import cloudpickle @@ -616,9 +617,9 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl): cpdef decode_from_stream(self, InputStream in_stream, size_t size): return self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream)) -cdef class PickledBytesCoderImpl(FieldCoderImpl): +cdef class CloudPickleCoderImpl(FieldCoderImpl): """ - A coder for all kinds of python object. + A coder used with cloudpickle for all kinds of python object. """ cpdef encode_to_stream(self, value, OutputStream out_stream): @@ -631,6 +632,22 @@ cdef class PickledBytesCoderImpl(FieldCoderImpl): pickled_bytes = in_stream.read_bytes() return cloudpickle.loads(pickled_bytes) + +cdef class PickleCoderImpl(FieldCoderImpl): + """ + A coder used with pickle for all kinds of python object. + """ + + cpdef encode_to_stream(self, value, OutputStream out_stream): + cdef bytes pickled_bytes + pickled_bytes = pickle.dumps(value) + out_stream.write_bytes(pickled_bytes, len(pickled_bytes)) + + cpdef decode_from_stream(self, InputStream in_stream, size_t size): + cdef bytes pickled_bytes + pickled_bytes = in_stream.read_bytes() + return pickle.loads(pickled_bytes) + cdef class BasicArrayCoderImpl(FieldCoderImpl): """ A coder for basic array value (the element of array could be null). @@ -760,3 +777,27 @@ cdef class CountWindowCoderImpl(FieldCoderImpl): cpdef decode_from_stream(self, InputStream in_stream, size_t size): return CountWindow(in_stream.read_int64()) + +cdef class DataViewFilterCoderImpl(FieldCoderImpl): + """ + A coder for CountWindow. + """ + def __init__(self, udf_data_view_specs): + self._udf_data_view_specs = udf_data_view_specs + self._pickle_coder = PickleCoderImpl() + + cpdef encode_to_stream(self, value, OutputStream out_stream): + self._pickle_coder.encode_to_stream(self._filter_data_views(value), out_stream) + + cpdef decode_from_stream(self, InputStream in_stream, size_t size): + return self._pickle_coder.decode_from_stream(in_stream, size) + + def _filter_data_views(self, row): + i = 0 + for specs in self._udf_data_view_specs: + for spec in specs: + row[i][spec.field_index] = None + i += 1 + return row + + diff --git a/flink-python/pyflink/fn_execution/coder_impl_slow.py b/flink-python/pyflink/fn_execution/coder_impl_slow.py index dd0275db205580..fa9242b2632ae7 100644 --- a/flink-python/pyflink/fn_execution/coder_impl_slow.py +++ b/flink-python/pyflink/fn_execution/coder_impl_slow.py @@ -17,6 +17,7 @@ ################################################################################ import datetime import decimal +import pickle from abc import ABC, abstractmethod from typing import List @@ -585,9 +586,9 @@ def internal_to_timestamp(self, milliseconds, nanoseconds): milliseconds, nanoseconds)) -class PickledBytesCoderImpl(FieldCoderImpl): +class CloudPickleCoderImpl(FieldCoderImpl): """ - A coder for all kinds of python object. + A coder used with cloudpickle for all kinds of python object. """ def __init__(self): @@ -606,7 +607,31 @@ def _decode_one_value_from_stream(self, in_stream: InputStream): return value def __repr__(self) -> str: - return 'PickledBytesCoderImpl[%s]' % str(self.field_coder) + return 'CloudPickleCoderImpl[%s]' % str(self.field_coder) + + +class PickleCoderImpl(FieldCoderImpl): + """ + A coder used with pickle for all kinds of python object. + """ + + def __init__(self): + self.field_coder = BinaryCoderImpl() + + def encode_to_stream(self, value, out_stream): + coded_data = pickle.dumps(value) + self.field_coder.encode_to_stream(coded_data, out_stream) + + def decode_from_stream(self, in_stream, length=0): + return self._decode_one_value_from_stream(in_stream) + + def _decode_one_value_from_stream(self, in_stream: InputStream): + real_data = self.field_coder.decode_from_stream(in_stream) + value = pickle.loads(real_data) + return value + + def __repr__(self) -> str: + return 'PickleCoderImpl[%s]' % str(self.field_coder) class TupleCoderImpl(FieldCoderImpl): @@ -743,3 +768,26 @@ def encode_to_stream(self, value, out_stream): def decode_from_stream(self, in_stream, length=0): return CountWindow(in_stream.read_int64()) + + +class DataViewFilterCoderImpl(FieldCoderImpl): + """ + A coder for data view filter. + """ + def __init__(self, udf_data_view_specs): + self._udf_data_view_specs = udf_data_view_specs + self._pickle_coder = PickleCoderImpl() + + def encode_to_stream(self, value, out_stream): + self._pickle_coder.encode_to_stream(self._filter_data_views(value), out_stream) + + def decode_from_stream(self, in_stream, length=0): + return self._pickle_coder.decode_from_stream(in_stream) + + def _filter_data_views(self, row): + i = 0 + for specs in self._udf_data_view_specs: + for spec in specs: + row[i][spec.field_index] = None + i += 1 + return row diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index 3fc23846c8f78a..ed5b7223a15c0a 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -22,6 +22,9 @@ import pyarrow as pa import pytz +from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, DateTypeInfo, \ + TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, TupleTypeInfo, \ + MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo from pyflink.fn_execution import flink_fn_execution_pb2 from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, BooleanType, \ FloatType, DoubleType, VarCharType, VarBinaryType, DecimalType, DateType, TimeType, \ @@ -36,7 +39,8 @@ 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 'BinaryCoder', 'CharCoder', 'DateCoder', 'TimeCoder', 'TimestampCoder', 'LocalZonedTimestampCoder', 'BasicArrayCoder', 'PrimitiveArrayCoder', 'MapCoder', 'DecimalCoder', 'BigDecimalCoder', - 'TupleCoder', 'TimeWindowCoder', 'CountWindowCoder'] + 'TupleCoder', 'TimeWindowCoder', 'CountWindowCoder', 'PickleCoder', 'CloudPickleCoder', + 'DataViewFilterCoder'] # LengthPrefixBaseCoder will be used in Operations and other coders will be the field coder @@ -501,10 +505,22 @@ def get_impl(self): return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) -class PickledBytesCoder(FieldCoder): +class CloudPickleCoder(FieldCoder): + """ + Coder used with cloudpickle to encode python object. + """ def get_impl(self): - return coder_impl.PickledBytesCoderImpl() + return coder_impl.CloudPickleCoderImpl() + + +class PickleCoder(FieldCoder): + """ + Coder used with pickle to encode python object. + """ + + def get_impl(self): + return coder_impl.PickleCoderImpl() class TupleCoder(FieldCoder): @@ -540,6 +556,18 @@ def get_impl(self): return coder_impl.CountWindowCoderImpl() +class DataViewFilterCoder(FieldCoder): + """ + Coder for data view filter. + """ + + def __init__(self, udf_data_view_specs): + self._udf_data_view_specs = udf_data_view_specs + + def get_impl(self): + return coder_impl.DataViewFilterCoderImpl(self._udf_data_view_specs) + + type_name = flink_fn_execution_pb2.Schema _type_name_mappings = { type_name.TINYINT: TinyIntCoder(), @@ -606,7 +634,7 @@ def from_proto(field_type): type_info_name.SQL_DATE: DateCoder(), type_info_name.SQL_TIME: TimeCoder(), type_info_name.SQL_TIMESTAMP: TimestampCoder(3), - type_info_name.PICKLED_BYTES: PickledBytesCoder() + type_info_name.PICKLED_BYTES: CloudPickleCoder() } @@ -635,3 +663,57 @@ def from_type_info_proto(type_info): return BasicArrayCoder(from_type_info_proto(type_info.collection_element_type)) else: raise ValueError("Unsupported type_info %s." % type_info) + + +_basic_type_info_mappings = { + BasicType.BYTE: TinyIntCoder(), + BasicType.BOOLEAN: BooleanCoder(), + BasicType.SHORT: SmallIntCoder(), + BasicType.INT: IntCoder(), + BasicType.LONG: BigIntCoder(), + BasicType.BIG_INT: BigIntCoder(), + BasicType.FLOAT: FloatCoder(), + BasicType.DOUBLE: DoubleCoder(), + BasicType.STRING: CharCoder(), + BasicType.CHAR: CharCoder(), + BasicType.BIG_DEC: BigDecimalCoder(), +} + + +def from_type_info(type_info: TypeInformation) -> FieldCoder: + """ + Mappings from type_info to Coder + """ + + if isinstance(type_info, PickledBytesTypeInfo): + return PickleCoder() + elif isinstance(type_info, BasicTypeInfo): + return _basic_type_info_mappings[type_info._basic_type] + elif isinstance(type_info, DateTypeInfo): + return DateCoder() + elif isinstance(type_info, TimeTypeInfo): + return TimeCoder() + elif isinstance(type_info, TimestampTypeInfo): + return TimestampCoder(3) + elif isinstance(type_info, PrimitiveArrayTypeInfo): + element_type = type_info._element_type + if isinstance(element_type, BasicTypeInfo) and element_type._basic_type == BasicType.BYTE: + return BinaryCoder() + else: + return PrimitiveArrayCoder(from_type_info(element_type)) + elif isinstance(type_info, BasicArrayTypeInfo): + return BasicArrayCoder(from_type_info(type_info._element_type)) + elif isinstance(type_info, ListTypeInfo): + return BasicArrayCoder(from_type_info(type_info.elem_type)) + elif isinstance(type_info, MapTypeInfo): + return MapCoder( + from_type_info(type_info._key_type_info), from_type_info(type_info._value_type_info)) + elif isinstance(type_info, TupleTypeInfo): + return TupleCoder([from_type_info(field_type) + for field_type in type_info.get_field_types()]) + elif isinstance(type_info, RowTypeInfo): + return RowCoder( + [from_type_info(f) for f in type_info.get_field_types()], + [f for f in type_info.get_field_names()]) + else: + raise ValueError("Unsupported type_info %s." % type_info) diff --git a/flink-python/pyflink/fn_execution/datastream/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/runtime_context.py index 0264a2a3f5dc98..0663d6d5314b50 100644 --- a/flink-python/pyflink/fn_execution/datastream/runtime_context.py +++ b/flink-python/pyflink/fn_execution/datastream/runtime_context.py @@ -17,12 +17,11 @@ ################################################################################ from typing import Dict, Union -from apache_beam.coders import PickleCoder - from pyflink.datastream import RuntimeContext from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor, \ ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \ AggregatingStateDescriptor, AggregatingState +from pyflink.fn_execution.coders import from_type_info, MapCoder, BasicArrayCoder from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend from pyflink.metrics import MetricGroup @@ -105,27 +104,35 @@ def get_metrics_group(self) -> MetricGroup: def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState: if self._keyed_state_backend: - return self._keyed_state_backend.get_value_state(state_descriptor.name, PickleCoder()) + return self._keyed_state_backend.get_value_state( + state_descriptor.name, from_type_info(state_descriptor.type_info)) else: raise Exception("This state is only accessible by functions executed on a KeyedStream.") def get_list_state(self, state_descriptor: ListStateDescriptor) -> ListState: if self._keyed_state_backend: - return self._keyed_state_backend.get_list_state(state_descriptor.name, PickleCoder()) + array_coder = from_type_info(state_descriptor.type_info) # type: BasicArrayCoder + return self._keyed_state_backend.get_list_state( + state_descriptor.name, array_coder._elem_coder) else: raise Exception("This state is only accessible by functions executed on a KeyedStream.") def get_map_state(self, state_descriptor: MapStateDescriptor) -> MapState: if self._keyed_state_backend: - return self._keyed_state_backend.get_map_state(state_descriptor.name, PickleCoder(), - PickleCoder()) + map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder + key_coder = map_coder._key_coder + value_coder = map_coder._value_coder + return self._keyed_state_backend.get_map_state( + state_descriptor.name, key_coder, value_coder) else: raise Exception("This state is only accessible by functions executed on a KeyedStream.") def get_reducing_state(self, state_descriptor: ReducingStateDescriptor) -> ReducingState: if self._keyed_state_backend: return self._keyed_state_backend.get_reducing_state( - state_descriptor.get_name(), PickleCoder(), state_descriptor.get_reduce_function()) + state_descriptor.get_name(), + from_type_info(state_descriptor.type_info), + state_descriptor.get_reduce_function()) else: raise Exception("This state is only accessible by functions executed on a KeyedStream.") @@ -133,7 +140,9 @@ def get_aggregating_state( self, state_descriptor: AggregatingStateDescriptor) -> AggregatingState: if self._keyed_state_backend: return self._keyed_state_backend.get_aggregating_state( - state_descriptor.get_name(), PickleCoder(), state_descriptor.get_agg_function()) + state_descriptor.get_name(), + from_type_info(state_descriptor.type_info), + state_descriptor.get_agg_function()) else: raise Exception("This state is only accessible by functions executed on a KeyedStream.") diff --git a/flink-python/pyflink/fn_execution/operations.py b/flink-python/pyflink/fn_execution/operations.py index 6d8f49ce79ed5b..9c6fc56ee74c31 100644 --- a/flink-python/pyflink/fn_execution/operations.py +++ b/flink-python/pyflink/fn_execution/operations.py @@ -21,16 +21,14 @@ from itertools import chain from typing import Tuple -from apache_beam.coders import PickleCoder - from pyflink.datastream import TimerService from pyflink.datastream.functions import ProcessFunction from pyflink.datastream.timerservice import InternalTimer +from pyflink.fn_execution.coders import DataViewFilterCoder, PickleCoder from pyflink.fn_execution.datastream.runtime_context import create_runtime_context from pyflink.fn_execution.timerservice_impl import InternalTimerImpl, TimerOperandType from pyflink.fn_execution import flink_fn_execution_pb2, operation_utils from pyflink.fn_execution.table.state_data_view import extract_data_view_specs -from pyflink.fn_execution.beam.beam_coders import DataViewFilterCoder from pyflink.fn_execution.operation_utils import extract_user_defined_aggregate_function from pyflink.fn_execution.table.window_assigner import TumblingWindowAssigner, \ diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py index 590c1d7e600068..2d2785114bf757 100644 --- a/flink-python/pyflink/fn_execution/state_impl.py +++ b/flink-python/pyflink/fn_execution/state_impl.py @@ -1001,6 +1001,8 @@ def _get_internal_bag_state(self, name, namespace, element_coder): # at once. The internal state cache is only updated when the current key changes. # The reason is that the state cache size may be smaller that the count of activated # state (i.e. the state with current key). + if isinstance(element_coder, FieldCoder): + element_coder = FlinkCoder(element_coder) state_spec = userstate.BagStateSpec(name, element_coder) internal_state = self._create_bag_state(state_spec, encoded_namespace) return internal_state diff --git a/flink-python/pyflink/fn_execution/stream_slow.py b/flink-python/pyflink/fn_execution/stream_slow.py index 7d6a942e3ad225..143db4a95e38e1 100644 --- a/flink-python/pyflink/fn_execution/stream_slow.py +++ b/flink-python/pyflink/fn_execution/stream_slow.py @@ -61,8 +61,7 @@ def read_var_int64(self): shift = 0 result = 0 while True: - self.pos += 1 - byte = self.data[self.pos - 1] + byte = self.read_byte() if byte < 0: raise RuntimeError('VarLong not terminated.') diff --git a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx index 7537a52f405ead..f14c8135a50fc4 100644 --- a/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx +++ b/flink-python/pyflink/fn_execution/table/aggregate_fast.pyx @@ -22,9 +22,8 @@ from libc.stdlib cimport free, malloc from typing import List, Dict -from apache_beam.coders import PickleCoder, Coder - from pyflink.common import Row +from pyflink.fn_execution.coders import PickleCoder from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \ PerKeyStateDataViewStore from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend @@ -427,7 +426,7 @@ cdef class GroupAggFunctionBase: aggs_handle: AggsHandleFunctionBase, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): @@ -471,7 +470,7 @@ cdef class GroupAggFunction(GroupAggFunctionBase): aggs_handle, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): @@ -578,7 +577,7 @@ cdef class GroupTableAggFunction(GroupAggFunctionBase): aggs_handle, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): diff --git a/flink-python/pyflink/fn_execution/table/aggregate_slow.py b/flink-python/pyflink/fn_execution/table/aggregate_slow.py index ca434c6a7a5f68..a280f9cb787a7e 100644 --- a/flink-python/pyflink/fn_execution/table/aggregate_slow.py +++ b/flink-python/pyflink/fn_execution/table/aggregate_slow.py @@ -18,9 +18,8 @@ from abc import ABC, abstractmethod from typing import List, Dict, Iterable -from apache_beam.coders import PickleCoder, Coder - from pyflink.common import Row, RowKind +from pyflink.fn_execution.coders import PickleCoder from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \ PerKeyStateDataViewStore from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend @@ -411,7 +410,7 @@ def __init__(self, aggs_handle: AggsHandleFunctionBase, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): @@ -457,7 +456,7 @@ def __init__(self, aggs_handle: AggsHandleFunction, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): @@ -552,7 +551,7 @@ def __init__(self, aggs_handle: TableAggsHandleFunction, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, generate_update_before: bool, state_cleaning_enabled: bool, index_of_count_star: int): diff --git a/flink-python/pyflink/fn_execution/table/state_data_view.py b/flink-python/pyflink/fn_execution/table/state_data_view.py index 438aac83dfc3be..36a6989d507fa1 100644 --- a/flink-python/pyflink/fn_execution/table/state_data_view.py +++ b/flink-python/pyflink/fn_execution/table/state_data_view.py @@ -18,10 +18,8 @@ from abc import ABC, abstractmethod from typing import TypeVar, Generic, Union -from apache_beam.coders import PickleCoder - from pyflink.datastream.state import ListState, MapState -from pyflink.fn_execution.coders import from_proto +from pyflink.fn_execution.coders import from_proto, PickleCoder from pyflink.fn_execution.internal_state import InternalListState, InternalMapState from pyflink.fn_execution.operation_utils import is_built_in_function, load_aggregate_function from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx index 0dfd9257a78c5a..3530436dccf90a 100644 --- a/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx +++ b/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pyx @@ -29,8 +29,8 @@ import sys from typing import List, Dict import pytz -from apache_beam.coders import PickleCoder, Coder +from pyflink.fn_execution.coders import PickleCoder from pyflink.fn_execution.timerservice_impl import InternalTimerServiceImpl from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \ PerWindowStateDataViewStore @@ -320,7 +320,7 @@ cdef class GroupWindowAggFunctionBase: allowed_lateness: int, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, window_assigner: WindowAssigner[W], window_aggregator: NamespaceAggsHandleFunctionBase, trigger: Trigger[W], @@ -501,7 +501,7 @@ cdef class GroupWindowAggFunction(GroupWindowAggFunctionBase): allowed_lateness: int, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, window_assigner: WindowAssigner[W], window_aggregator: NamespaceAggsHandleFunction[W], trigger: Trigger[W], diff --git a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py index e64157ff846125..2b486992651d08 100644 --- a/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py +++ b/flink-python/pyflink/fn_execution/table/window_aggregate_slow.py @@ -21,10 +21,10 @@ from typing import TypeVar, Generic, List, Dict import pytz -from apache_beam.coders import PickleCoder, Coder from pyflink.common import Row, RowKind from pyflink.datastream.timerservice import InternalTimer +from pyflink.fn_execution.coders import PickleCoder from pyflink.fn_execution.timerservice_impl import InternalTimerServiceImpl from pyflink.fn_execution.table.aggregate_slow import DistinctViewDescriptor, RowKeySelector from pyflink.fn_execution.table.state_data_view import DataViewSpec, ListViewSpec, MapViewSpec, \ @@ -290,7 +290,7 @@ def __init__(self, allowed_lateness: int, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, window_assigner: WindowAssigner[W], window_aggregator: NamespaceAggsHandleFunctionBase[W], trigger: Trigger[W], @@ -457,7 +457,7 @@ def __init__(self, allowed_lateness: int, key_selector: RowKeySelector, state_backend: RemoteKeyedStateBackend, - state_value_coder: Coder, + state_value_coder, window_assigner: WindowAssigner[W], window_aggregator: NamespaceAggsHandleFunction[W], trigger: Trigger[W], diff --git a/flink-python/pyflink/fn_execution/table/window_context.py b/flink-python/pyflink/fn_execution/table/window_context.py index 08732059f18b3d..f73fe037f16ece 100644 --- a/flink-python/pyflink/fn_execution/table/window_context.py +++ b/flink-python/pyflink/fn_execution/table/window_context.py @@ -19,12 +19,13 @@ from abc import ABC, abstractmethod from typing import Generic, TypeVar, List, Iterable -from apache_beam.coders import Coder, PickleCoder +from apache_beam.coders import Coder from pyflink.datastream.state import StateDescriptor, State, ValueStateDescriptor, \ ListStateDescriptor, MapStateDescriptor from pyflink.datastream.timerservice import InternalTimerService from pyflink.datastream.window import TimeWindow, CountWindow +from pyflink.fn_execution.coders import from_type_info, MapCoder, BasicArrayCoder from pyflink.fn_execution.timerservice_impl import InternalTimerServiceImpl from pyflink.fn_execution.internal_state import InternalMergingState from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend @@ -228,12 +229,18 @@ def clear(self): def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: if isinstance(state_descriptor, ValueStateDescriptor): - state = self._state_backend.get_value_state(state_descriptor.name, PickleCoder()) + state = self._state_backend.get_value_state( + state_descriptor.name, from_type_info(state_descriptor.type_info)) elif isinstance(state_descriptor, ListStateDescriptor): - state = self._state_backend.get_list_state(state_descriptor.name, PickleCoder()) + array_coder = from_type_info(state_descriptor.type_info) # type: BasicArrayCoder + state = self._state_backend.get_list_state( + state_descriptor.name, array_coder._elem_coder) elif isinstance(state_descriptor, MapStateDescriptor): + map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder + key_coder = map_coder._key_coder + value_coder = map_coder._value_coder state = self._state_backend.get_map_state( - state_descriptor.name, PickleCoder(), PickleCoder()) + state_descriptor.name, key_coder, value_coder) else: raise Exception("Unknown supported StateDescriptor %s" % state_descriptor) state.set_current_namespace(self.window) diff --git a/flink-python/setup.py b/flink-python/setup.py index dd614731d46b54..3947f15b72a465 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -120,8 +120,8 @@ def extracted_output_files(base_dir, file_path, output_directory): sources=["pyflink/fn_execution/stream_fast.pyx"], include_dirs=["pyflink/fn_execution/"]), Extension( - name="pyflink.fn_execution.beam.beam_stream", - sources=["pyflink/fn_execution/beam/beam_stream.pyx"], + name="pyflink.fn_execution.beam.beam_stream_fast", + sources=["pyflink/fn_execution/beam/beam_stream_fast.pyx"], include_dirs=["pyflink/fn_execution/beam"]), Extension( name="pyflink.fn_execution.beam.beam_coder_impl_fast", @@ -152,8 +152,8 @@ def extracted_output_files(base_dir, file_path, output_directory): sources=["pyflink/fn_execution/stream_fast.c"], include_dirs=["pyflink/fn_execution/"]), Extension( - name="pyflink.fn_execution.beam.beam_stream", - sources=["pyflink/fn_execution/beam/beam_stream.c"], + name="pyflink.fn_execution.beam.beam_stream_fast", + sources=["pyflink/fn_execution/beam/beam_stream_fast.c"], include_dirs=["pyflink/fn_execution/beam"]), Extension( name="pyflink.fn_execution.beam.beam_coder_impl_fast",