Skip to content

Commit

Permalink
[FLINK-22865][python] Optimize state serialize/deserialize in PyFlink
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangXingBo committed Jun 3, 2021
1 parent 5de3ef9 commit f59e1ec
Show file tree
Hide file tree
Showing 23 changed files with 285 additions and 85 deletions.
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ def get_execution_environment(self):
return self._keyed_stream.get_execution_environment()

def get_input_type(self):
return self._keyed_stream.get_type()
return _from_java_type(self._keyed_stream._original_data_type_info.get_java_type_info())

def trigger(self, trigger: Trigger):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from apache_beam.coders.coder_impl cimport InputStream as BInputStream
from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
from apache_beam.coders.coder_impl cimport StreamCoderImpl

from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream
from pyflink.fn_execution.beam.beam_stream_fast cimport BeamInputStream
from pyflink.fn_execution.stream_fast cimport InputStream

cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
Expand Down Expand Up @@ -59,9 +59,11 @@ cdef class PassThroughPrefixCoderImpl(StreamCoderImpl):
# create InputStream
data_input_stream = InputStream()
data_input_stream._input_data = <char*?>in_stream.allc
in_stream.pos = size
data_input_stream._input_pos = in_stream.pos

return self._value_coder.decode_from_stream(data_input_stream, size)
result = self._value_coder.decode_from_stream(data_input_stream, size)
in_stream.pos = data_input_stream._input_pos
return result

cdef void _write_data_output_stream(self, BOutputStream out_stream):
cdef OutputStream data_out_stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

from apache_beam.coders.coder_impl import StreamCoderImpl, create_InputStream, create_OutputStream

from pyflink.fn_execution.stream_slow import OutputStream, InputStream
from pyflink.fn_execution.stream_slow import OutputStream
from pyflink.fn_execution.beam.beam_stream_slow import BeamInputStream


class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
Expand Down Expand Up @@ -50,7 +51,7 @@ def encode_to_stream(self, value, out_stream: create_OutputStream, nested):
self._data_output_stream.clear()

def decode_from_stream(self, in_stream: create_InputStream, nested):
data_input_stream = InputStream(in_stream.read_all(False))
data_input_stream = BeamInputStream(in_stream)
return self._value_coder.decode_from_stream(data_input_stream)

def __repr__(self):
Expand Down
29 changes: 1 addition & 28 deletions flink-python/pyflink/fn_execution/beam/beam_coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import pickle
from typing import Any

from apache_beam.coders import Coder, coder_impl
from apache_beam.coders import Coder
from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
from apache_beam.portability import common_urns
from apache_beam.typehints import typehints
Expand Down Expand Up @@ -93,27 +90,3 @@ def __ne__(self, other):

def __hash__(self):
return hash(self._internal_coder)


class DataViewFilterCoder(FastCoder):

def to_type_hint(self):
return Any

def __init__(self, udf_data_view_specs):
self._udf_data_view_specs = udf_data_view_specs

def filter_data_views(self, row):
i = 0
for specs in self._udf_data_view_specs:
for spec in specs:
row[i][spec.field_index] = None
i += 1
return row

def _create_impl(self):
filter_data_views = self.filter_data_views
dumps = pickle.dumps
HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL
return coder_impl.CallbackCoderImpl(
lambda x: dumps(filter_data_views(x), HIGHEST_PROTOCOL), pickle.loads)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ from libc.stdint cimport *
from apache_beam.utils.windowed_value cimport WindowedValue

from pyflink.fn_execution.coder_impl_fast cimport LengthPrefixBaseCoderImpl
from pyflink.fn_execution.beam.beam_stream cimport BeamInputStream, BeamOutputStream
from pyflink.fn_execution.beam.beam_stream_fast cimport BeamInputStream, BeamOutputStream
from pyflink.fn_execution.beam.beam_coder_impl_fast cimport InputStreamWrapper, BeamCoderImpl
from pyflink.fn_execution.operations import BundleOperation

Expand Down
35 changes: 35 additions & 0 deletions flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from apache_beam.coders.coder_impl import create_InputStream

from pyflink.fn_execution.stream_slow import InputStream


class BeamInputStream(InputStream):
def __init__(self, input_stream: create_InputStream):
super(BeamInputStream, self).__init__([])
self._input_stream = input_stream

def read(self, size):
return self._input_stream.read(size)

def read_byte(self):
return self._input_stream.read_byte()

def size(self):
return self._input_stream.size()
9 changes: 8 additions & 1 deletion flink-python/pyflink/fn_execution/coder_impl_fast.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ cdef class TimestampCoderImpl(FieldCoderImpl):
cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cdef object _timezone

cdef class PickledBytesCoderImpl(FieldCoderImpl):
cdef class CloudPickleCoderImpl(FieldCoderImpl):
pass

cdef class PickleCoderImpl(FieldCoderImpl):
pass

cdef class BasicArrayCoderImpl(FieldCoderImpl):
Expand All @@ -149,3 +152,7 @@ cdef class TimeWindowCoderImpl(FieldCoderImpl):

cdef class CountWindowCoderImpl(FieldCoderImpl):
pass

cdef class DataViewFilterCoderImpl(FieldCoderImpl):
cdef object _udf_data_view_specs
cdef PickleCoderImpl _pickle_coder
45 changes: 43 additions & 2 deletions flink-python/pyflink/fn_execution/coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from libc.stdlib cimport free, malloc

import datetime
import decimal
import pickle
from typing import List, Union

from cloudpickle import cloudpickle
Expand Down Expand Up @@ -616,9 +617,9 @@ cdef class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
cpdef decode_from_stream(self, InputStream in_stream, size_t size):
return self._timezone.localize(self._decode_timestamp_data_from_stream(in_stream))

cdef class PickledBytesCoderImpl(FieldCoderImpl):
cdef class CloudPickleCoderImpl(FieldCoderImpl):
"""
A coder for all kinds of python object.
A coder used with cloudpickle for all kinds of python object.
"""

cpdef encode_to_stream(self, value, OutputStream out_stream):
Expand All @@ -631,6 +632,22 @@ cdef class PickledBytesCoderImpl(FieldCoderImpl):
pickled_bytes = in_stream.read_bytes()
return cloudpickle.loads(pickled_bytes)


cdef class PickleCoderImpl(FieldCoderImpl):
"""
A coder used with pickle for all kinds of python object.
"""

cpdef encode_to_stream(self, value, OutputStream out_stream):
cdef bytes pickled_bytes
pickled_bytes = pickle.dumps(value)
out_stream.write_bytes(pickled_bytes, len(pickled_bytes))

cpdef decode_from_stream(self, InputStream in_stream, size_t size):
cdef bytes pickled_bytes
pickled_bytes = in_stream.read_bytes()
return pickle.loads(pickled_bytes)

cdef class BasicArrayCoderImpl(FieldCoderImpl):
"""
A coder for basic array value (the element of array could be null).
Expand Down Expand Up @@ -760,3 +777,27 @@ cdef class CountWindowCoderImpl(FieldCoderImpl):

cpdef decode_from_stream(self, InputStream in_stream, size_t size):
return CountWindow(in_stream.read_int64())

cdef class DataViewFilterCoderImpl(FieldCoderImpl):
"""
A coder for CountWindow.
"""
def __init__(self, udf_data_view_specs):
self._udf_data_view_specs = udf_data_view_specs
self._pickle_coder = PickleCoderImpl()

cpdef encode_to_stream(self, value, OutputStream out_stream):
self._pickle_coder.encode_to_stream(self._filter_data_views(value), out_stream)

cpdef decode_from_stream(self, InputStream in_stream, size_t size):
return self._pickle_coder.decode_from_stream(in_stream, size)

def _filter_data_views(self, row):
i = 0
for specs in self._udf_data_view_specs:
for spec in specs:
row[i][spec.field_index] = None
i += 1
return row


54 changes: 51 additions & 3 deletions flink-python/pyflink/fn_execution/coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
################################################################################
import datetime
import decimal
import pickle
from abc import ABC, abstractmethod
from typing import List

Expand Down Expand Up @@ -585,9 +586,9 @@ def internal_to_timestamp(self, milliseconds, nanoseconds):
milliseconds, nanoseconds))


class PickledBytesCoderImpl(FieldCoderImpl):
class CloudPickleCoderImpl(FieldCoderImpl):
"""
A coder for all kinds of python object.
A coder used with cloudpickle for all kinds of python object.
"""

def __init__(self):
Expand All @@ -606,7 +607,31 @@ def _decode_one_value_from_stream(self, in_stream: InputStream):
return value

def __repr__(self) -> str:
return 'PickledBytesCoderImpl[%s]' % str(self.field_coder)
return 'CloudPickleCoderImpl[%s]' % str(self.field_coder)


class PickleCoderImpl(FieldCoderImpl):
"""
A coder used with pickle for all kinds of python object.
"""

def __init__(self):
self.field_coder = BinaryCoderImpl()

def encode_to_stream(self, value, out_stream):
coded_data = pickle.dumps(value)
self.field_coder.encode_to_stream(coded_data, out_stream)

def decode_from_stream(self, in_stream, length=0):
return self._decode_one_value_from_stream(in_stream)

def _decode_one_value_from_stream(self, in_stream: InputStream):
real_data = self.field_coder.decode_from_stream(in_stream)
value = pickle.loads(real_data)
return value

def __repr__(self) -> str:
return 'PickleCoderImpl[%s]' % str(self.field_coder)


class TupleCoderImpl(FieldCoderImpl):
Expand Down Expand Up @@ -743,3 +768,26 @@ def encode_to_stream(self, value, out_stream):

def decode_from_stream(self, in_stream, length=0):
return CountWindow(in_stream.read_int64())


class DataViewFilterCoderImpl(FieldCoderImpl):
"""
A coder for data view filter.
"""
def __init__(self, udf_data_view_specs):
self._udf_data_view_specs = udf_data_view_specs
self._pickle_coder = PickleCoderImpl()

def encode_to_stream(self, value, out_stream):
self._pickle_coder.encode_to_stream(self._filter_data_views(value), out_stream)

def decode_from_stream(self, in_stream, length=0):
return self._pickle_coder.decode_from_stream(in_stream)

def _filter_data_views(self, row):
i = 0
for specs in self._udf_data_view_specs:
for spec in specs:
row[i][spec.field_index] = None
i += 1
return row
Loading

0 comments on commit f59e1ec

Please sign in to comment.