Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8123] Add cloudpickle as optional library #15472

Merged
merged 60 commits into from Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
39a964d
wrapped pickler so that pickler is chosen
ryanthompson591 Sep 7, 2021
24b110a
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Oct 28, 2021
c1f8608
changes to pickler
ryanthompson591 Oct 6, 2021
954a46f
Update unit tests and correctly add global setter
ryanthompson591 Oct 28, 2021
9947720
updated cloudpickle_pickler_test
ryanthompson591 Oct 28, 2021
dfa434a
added scope test
ryanthompson591 Oct 28, 2021
c8ff50c
updated some comments
ryanthompson591 Oct 29, 2021
a6475e5
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Oct 29, 2021
54d1c1b
Allow absl library not to be present.
ryanthompson591 Oct 29, 2021
36556fc
allow multiple error types
ryanthompson591 Oct 29, 2021
5d7de36
removed absl flags import
ryanthompson591 Oct 29, 2021
9d5587b
add cloudpickle dependency
ryanthompson591 Oct 29, 2021
7c92db4
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 4, 2021
2d91d60
applied valentyn comments. remove dill dependencies
ryanthompson591 Nov 4, 2021
eaa17ab
linted file
ryanthompson591 Nov 4, 2021
9eed711
change cloudpickle max requirement
ryanthompson591 Nov 4, 2021
62c7d0d
fixed pickle lib typo
ryanthompson591 Nov 4, 2021
c5a9df0
fixed pickle lib typo 2
ryanthompson591 Nov 4, 2021
771235b
sets the default pickler
ryanthompson591 Nov 8, 2021
64b74f4
revert last change
ryanthompson591 Nov 8, 2021
28abd0e
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 8, 2021
4344529
fixed pickle lib typo
ryanthompson591 Nov 8, 2021
6381aab
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 16, 2021
d842061
fixed typo
ryanthompson591 Nov 15, 2021
b8d249e
revert wordcount
ryanthompson591 Nov 15, 2021
ea2cd96
added pipeline options
ryanthompson591 Nov 15, 2021
87f39f0
added pickler setting to worker
ryanthompson591 Nov 16, 2021
7339c29
added setup options
ryanthompson591 Nov 16, 2021
9611520
removed base_image_requirements to merge with master branch
ryanthompson591 Nov 17, 2021
2eea347
Upgraded arguement names and simplify pickle changing interface in pi…
ryanthompson591 Nov 17, 2021
7cb995f
updated function name and param name in sdk_worker_main
ryanthompson591 Nov 17, 2021
fcdb9c5
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 19, 2021
56304fa
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 23, 2021
a2012cb
updated base_image_requirements
ryanthompson591 Nov 23, 2021
a9a9784
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 23, 2021
b078766
linted to remove space in default arg'
ryanthompson591 Nov 23, 2021
db0c27e
Added cloudpickle to requirements
ryanthompson591 Nov 23, 2021
adb7ae4
linted
ryanthompson591 Nov 23, 2021
3400ef4
change dill reference in coders
ryanthompson591 Nov 23, 2021
8cde88c
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 24, 2021
e848b7a
only import lock if it can be imported otherwise ignore
ryanthompson591 Nov 24, 2021
e57e987
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 29, 2021
dd56672
linted tests removed unused variables changed line size
ryanthompson591 Nov 29, 2021
7fb9f53
moved imports, reverted file that wasnt changed
ryanthompson591 Nov 29, 2021
9f08887
changed import order
ryanthompson591 Nov 29, 2021
032c631
trying a small fix
ryanthompson591 Nov 29, 2021
92e3072
removed rlock pickling
ryanthompson591 Nov 29, 2021
1c66fc3
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 30, 2021
94a9c96
added to change file
ryanthompson591 Nov 30, 2021
66c8069
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Nov 30, 2021
4ec71c3
Minor wording suggestion.
tvalentyn Nov 30, 2021
f42051f
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Dec 1, 2021
915c5c6
Merge branch 'apache:master' into pickle-prototype
ryanthompson591 Dec 1, 2021
165c8d0
merged
ryanthompson591 Dec 1, 2021
f7b2682
Update CHANGES.md
ryanthompson591 Dec 1, 2021
31aba48
merged change.md changes
ryanthompson591 Dec 1, 2021
f20ce43
merged
ryanthompson591 Dec 1, 2021
4f617b9
linted again
ryanthompson591 Dec 1, 2021
4e0a330
removed file that is also removed in head, not sure why git keeps bri…
ryanthompson591 Dec 1, 2021
232ecac
removed changes that shouldnt be relevant to this pr
ryanthompson591 Dec 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -33,6 +33,7 @@

* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275)
* Added support for cloudpickle as a pickling library for Python SDK. To use cloudpickle set pipeline options, --pickler_lib=cloudpickle.

## Breaking Changes

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/coders.py
Expand Up @@ -80,7 +80,7 @@
try:
# Import dill from the pickler module to make sure our monkey-patching of dill
# occurs.
from apache_beam.internal.pickler import dill
from apache_beam.internal.dill_pickler import dill
except ImportError:
# We fall back to using the stock dill library in tests that don't use the
# full Python SDK.
Expand Down
17 changes: 8 additions & 9 deletions sdks/python/apache_beam/coders/coders_test_common.py
Expand Up @@ -679,17 +679,16 @@ def iterable_state_read(token, element_coder_impl):
read_state=iterable_state_read,
write_state=iterable_state_write,
write_state_threshold=1)
context = pipeline_context.PipelineContext(
iterable_state_read=iterable_state_read,
iterable_state_write=iterable_state_write)
self.check_coder(
coder, [1, 2, 3], context=context, test_size_estimation=False)
# Note: do not use check_coder
# see https://github.com/cloudpipe/cloudpickle/issues/452
self._observe(coder)
self.assertEqual([1, 2, 3], coder.decode(coder.encode([1, 2, 3])))
# Ensure that state was actually used.
self.assertNotEqual(state, {})
self.check_coder(
coders.TupleCoder((coder, coder)), ([1], [2, 3]),
context=context,
test_size_estimation=False)
tupleCoder = coders.TupleCoder((coder, coder))
self._observe(tupleCoder)
self.assertEqual(([1], [2, 3]),
tupleCoder.decode(tupleCoder.encode(([1], [2, 3]))))

def test_nullable_coder(self):
self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64)
Expand Down
111 changes: 111 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler.py
@@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Pickler for values, functions, and classes.

For internal use only. No backwards compatibility guarantees.

Uses the cloudpickle library to pickle data, functions, lambdas
and classes.

dump_session and load_session are no-ops.
"""

# pytype: skip-file

import base64
import bz2
import io
import threading
import zlib

import cloudpickle

try:
from absl import flags
except (ImportError, ModuleNotFoundError):
pass

# Pickling, especially unpickling, causes broken module imports on Python 3
# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884.
_pickle_lock = threading.RLock()


def dumps(o, enable_trace=True, use_zlib=False):
# type: (...) -> bytes

"""For internal use only; no backwards-compatibility guarantees."""
with _pickle_lock:
with io.BytesIO() as file:
pickler = cloudpickle.CloudPickler(file)
tvalentyn marked this conversation as resolved.
Show resolved Hide resolved
try:
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
except NameError:
pass
pickler.dump(o)
s = file.getvalue()

# Compress as compactly as possible (compresslevel=9) to decrease peak memory
# usage (of multiple in-memory copies) and to avoid hitting protocol buffer
# limits.
# WARNING: Be cautious about compressor change since it can lead to pipeline
# representation change, and can break streaming job update compatibility on
# runners such as Dataflow.
if use_zlib:
c = zlib.compress(s, 9)
else:
c = bz2.compress(s, compresslevel=9)
del s # Free up some possibly large and no-longer-needed memory.

return base64.b64encode(c)


def loads(encoded, enable_trace=True, use_zlib=False):
"""For internal use only; no backwards-compatibility guarantees."""

c = base64.b64decode(encoded)

if use_zlib:
s = zlib.decompress(c)
else:
s = bz2.decompress(c)

del c # Free up some possibly large and no-longer-needed memory.

with _pickle_lock:
unpickled = cloudpickle.loads(s)
return unpickled


def _pickle_absl_flags(obj):
return _create_absl_flags, tuple([])


def _create_absl_flags():
return flags.FLAGS


def dump_session(file_path):
# It is possible to dump session with cloudpickle. However, since references
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
pass


def load_session(file_path):
# It is possible to load_session with cloudpickle. However, since references
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
pass
112 changes: 112 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
@@ -0,0 +1,112 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for the cloudpickle_pickler module."""

# pytype: skip-file

import sys
import types
import unittest

from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads


class PicklerTest(unittest.TestCase):

NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")

def test_basics(self):
self.assertEqual([1, 'a', (u'z', )], loads(dumps([1, 'a', (u'z', )])))
fun = lambda x: 'xyz-%s' % x
self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))

def test_lambda_with_globals(self):
"""Tests that the globals of a function are preserved."""

# The point of the test is that the lambda being called after unpickling
# relies on having the re module being loaded.
self.assertEqual(['abc', 'def'],
loads(dumps(
module_test.get_lambda_with_globals()))('abc def'))

def test_lambda_with_main_globals(self):
self.assertEqual(unittest, loads(dumps(lambda: unittest))())

def test_lambda_with_closure(self):
"""Tests that the closure of a function is preserved."""
self.assertEqual(
'closure: abc',
loads(dumps(module_test.get_lambda_with_closure('abc')))())

def test_class_object_pickled(self):
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.Xyz))().foo('abc def'))

def test_class_instance_pickled(self):
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))

def test_pickling_preserves_closure_of_a_function(self):
self.assertEqual(
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
self.assertEqual(
'Y:abc',
loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)

def test_pickle_dynamic_class(self):
self.assertEqual(
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())

def test_generators(self):
with self.assertRaises(TypeError):
dumps((_ for _ in range(10)))

def test_recursive_class(self):
self.assertEqual(
'RecursiveClass:abc',
loads(dumps(module_test.RecursiveClass('abc').datum)))

def test_function_with_external_reference(self):
out_of_scope_var = 'expected_value'

def foo():
return out_of_scope_var

self.assertEqual('expected_value', loads(dumps(foo))())

@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
tvalentyn marked this conversation as resolved.
Show resolved Hide resolved
def test_dump_and_load_mapping_proxy(self):
self.assertEqual(
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
self.assertEqual(
types.MappingProxyType, type(loads(dumps(types.MappingProxyType({})))))

# pylint: disable=exec-used
@unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only')
def test_dataclass(self):
exec(
'''
from apache_beam.internal.module_test import DataClass
self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc'))))
''')


if __name__ == '__main__':
unittest.main()