Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,11 @@
"""Collection of useful coders.

Only those coders listed in __all__ are part of the public API of this module.

## On usage of `pickle`, `dill` and `pickler` in Beam

In Beam, we generally we use `pickle` for pipeline elements and `dill` for
more complex types, like user functions.

`pickler` is Beam's own wrapping of dill + compression + error handling.
It serves also as an API to mask the actual encoding layer (so we can
change it from `dill` if necessary).

We created `_MemoizingPickleCoder` to improve performance when serializing
complex user types for the execution of SDF. Specifically to address
BEAM-12781, where many identical `BoundedSource` instances are being
encoded.

"""
# pytype: skip-file

import base64
import pickle
from functools import lru_cache
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -757,33 +741,6 @@ def __hash__(self):
return hash(type(self))


class _MemoizingPickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality with memoization."""
def __init__(self, cache_size=16):
super(_MemoizingPickleCoder, self).__init__()
self.cache_size = cache_size

def _create_impl(self):
from apache_beam.internal import pickler
dumps = pickler.dumps

mdumps = lru_cache(maxsize=self.cache_size, typed=True)(dumps)

def _nonhashable_dumps(x):
try:
return mdumps(x)
except TypeError:
return dumps(x)

return coder_impl.CallbackCoderImpl(_nonhashable_dumps, pickler.loads)

def as_deterministic_coder(self, step_label, error_message=None):
return FastPrimitivesCoder(self, requires_deterministic=step_label)

def to_type_hint(self):
return Any


class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
def _create_impl(self):
Expand Down
4 changes: 0 additions & 4 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,6 @@ def test_pickle_coder(self):
coder = coders.PickleCoder()
self.check_coder(coder, *self.test_values)

def test_memoizing_pickle_coder(self):
coder = coders._MemoizingPickleCoder()
self.check_coder(coder, *self.test_values)

def test_deterministic_coder(self):
coder = coders.FastPrimitivesCoder()
deterministic_coder = coders.DeterministicFastPrimitivesCoder(coder, 'step')
Expand Down
24 changes: 3 additions & 21 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@

from apache_beam import coders
from apache_beam import pvalue
from apache_beam.coders.coders import _MemoizingPickleCoder
from apache_beam.internal import pickler
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -888,14 +886,12 @@ def get_desired_chunk_size(total_size):

def expand(self, pbegin):
if isinstance(self.source, BoundedSource):
coders.registry.register_coder(BoundedSource, _MemoizingPickleCoder)
display_data = self.source.display_data() or {}
display_data['source'] = self.source.__class__

return (
pbegin
| Impulse()
| core.Map(lambda _: self.source).with_output_types(BoundedSource)
| core.Map(lambda _: self.source)
| SDFBoundedSourceReader(display_data))
elif isinstance(self.source, ptransform.PTransform):
# The Read transform can also admit a full PTransform as an input
Expand Down Expand Up @@ -1577,29 +1573,15 @@ def is_bounded(self):
return True


class _SDFBoundedSourceWrapperRestrictionCoder(coders.Coder):
def decode(self, value):
return _SDFBoundedSourceRestriction(SourceBundle(*pickler.loads(value)))

def encode(self, restriction):
return pickler.dumps((
restriction._source_bundle.weight,
restriction._source_bundle.source,
restriction._source_bundle.start_position,
restriction._source_bundle.stop_position))


class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
"""
A `RestrictionProvider` that is used by SDF for `BoundedSource`.

This restriction provider initializes restriction based on input
element that is expected to be of BoundedSource type.
"""
def __init__(self, desired_chunk_size=None, restriction_coder=None):
def __init__(self, desired_chunk_size=None):
self._desired_chunk_size = desired_chunk_size
self._restriction_coder = (
restriction_coder or _SDFBoundedSourceWrapperRestrictionCoder())

def _check_source(self, src):
if not isinstance(src, BoundedSource):
Expand Down Expand Up @@ -1636,7 +1618,7 @@ def restriction_size(self, element, restriction):
return restriction.weight()

def restriction_coder(self):
return self._restriction_coder
return coders.DillCoder()


class SDFBoundedSourceReader(PTransform):
Expand Down