Skip to content
Closed

Test #2427

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,15 @@ def new_log_info(msg, *args, **kwargs):
# TODO(ccy): Currently, there are still instances of pickler.dumps() and
# pickler.loads() being used for data, which results in an unnecessary base64
# encoding. This should be cleaned up.
def dumps(o):
def dumps(o, enable_trace=True):
try:
s = dill.dumps(o)
except Exception: # pylint: disable=broad-except
dill.dill._trace(True) # pylint: disable=protected-access
s = dill.dumps(o)
except Exception as e: # pylint: disable=broad-except
if enable_trace:
dill.dill._trace(True) # pylint: disable=protected-access
s = dill.dumps(o)
else:
raise e
finally:
dill.dill._trace(False) # pylint: disable=protected-access

Expand All @@ -199,17 +202,20 @@ def dumps(o):
return base64.b64encode(c)


def loads(encoded):
def loads(encoded, enable_trace=True):
c = base64.b64decode(encoded)

s = zlib.decompress(c)
del c # Free up some possibly large and no-longer-needed memory.

try:
return dill.loads(s)
except Exception: # pylint: disable=broad-except
dill.dill._trace(True) # pylint: disable=protected-access
return dill.loads(s)
except Exception as e: # pylint: disable=broad-except
if enable_trace:
dill.dill._trace(True) # pylint: disable=protected-access
return dill.loads(s)
else:
raise e
finally:
dill.dill._trace(False) # pylint: disable=protected-access

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,9 @@ def visit_transform(self, transform_node):
Visitor.ok = False
try:
# Transforms must be picklable.
pickler.loads(pickler.dumps(transform_node.transform))
pickler.loads(pickler.dumps(transform_node.transform,
enable_trace=False),
enable_trace=False)
except Exception:
Visitor.ok = False
self.visit(Visitor())
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ def test_visit_entire_graph(self):
set(visitor.visited))
self.assertEqual(set(visitor.enter_composite),
set(visitor.leave_composite))
self.assertEqual(2, len(visitor.enter_composite))
self.assertEqual(visitor.enter_composite[1].transform, transform)
self.assertEqual(visitor.leave_composite[0].transform, transform)
self.assertEqual(3, len(visitor.enter_composite))
self.assertEqual(visitor.enter_composite[2].transform, transform)
self.assertEqual(visitor.leave_composite[1].transform, transform)

def test_apply_custom_transform(self):
pipeline = TestPipeline()
Expand Down
23 changes: 0 additions & 23 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,29 +261,6 @@ def _add_step(self, step_kind, step_label, transform_node, side_tags=()):

return step

def run_Create(self, transform_node):
transform = transform_node.transform
step = self._add_step(TransformNames.CREATE_PCOLLECTION,
transform_node.full_label, transform_node)
# TODO(silviuc): Eventually use a coder based on typecoders.
# Note that we base64-encode values here so that the service will accept
# the values.
element_coder = coders.PickleCoder()
step.add_property(
PropertyNames.ELEMENT,
[base64.b64encode(element_coder.encode(v))
for v in transform.value])
# The service expects a WindowedValueCoder here, so we wrap the actual
# encoding in a WindowedValueCoder.
step.encoding = self._get_cloud_encoding(
coders.WindowedValueCoder(element_coder))
step.add_property(
PropertyNames.OUTPUT_INFO,
[{PropertyNames.USER_NAME: (
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])

def _add_singleton_step(self, label, full_label, tag, input_step):
"""Creates a CollectionToSingleton step used to handle ParDo side inputs."""
# Import here to avoid adding the dependency for local running scenarios.
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def run(self, pipeline):
# We are running in eager mode, block until the pipeline execution
# completes in order to have full results in the cache.
result.wait_until_finish()
print "finishing"
self._cache.finalize()

return result
Expand Down Expand Up @@ -124,12 +125,15 @@ def pvalue_cache(self):
return self._pvalue_cache

def append(self, applied_ptransform, tag, elements):
print "append"
assert not self._finalized
assert elements is not None
assert self._cache is not None
self._cache[(applied_ptransform, tag)].extend(elements)

def finalize(self):
"""Make buffered cache elements visible to the underlying PValueCache."""
print "finalize"
assert not self._finalized
for key, value in self._cache.iteritems():
applied_ptransform, tag = key
Expand Down
31 changes: 0 additions & 31 deletions sdks/python/apache_beam/runners/direct/transform_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(self, evaluation_context):
self._evaluation_context = evaluation_context
self._evaluators = {
io.Read: _BoundedReadEvaluator,
core.Create: _CreateEvaluator,
core.Flatten: _FlattenEvaluator,
core.ParDo: _ParDoEvaluator,
core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
Expand Down Expand Up @@ -233,36 +232,6 @@ def finish_bundle(self):
self._applied_ptransform, bundles, None, None, None, None)


class _CreateEvaluator(_TransformEvaluator):
"""TransformEvaluator for Create transform."""

def __init__(self, evaluation_context, applied_ptransform,
input_committed_bundle, side_inputs, scoped_metrics_container):
assert not input_committed_bundle
assert not side_inputs
super(_CreateEvaluator, self).__init__(
evaluation_context, applied_ptransform, input_committed_bundle,
side_inputs, scoped_metrics_container)

def start_bundle(self):
assert len(self._outputs) == 1
output_pcollection = list(self._outputs)[0]
self.bundle = self._evaluation_context.create_bundle(output_pcollection)

def finish_bundle(self):
bundles = []
transform = self._applied_ptransform.transform

assert transform.value is not None
create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
for result in create_result:
self.bundle.output(result)
bundles.append(self.bundle)

return TransformResult(
self._applied_ptransform, bundles, None, None, None, None)


class _TaggedReceivers(dict):
"""Received ParDo output and redirect to the associated output bundle."""

Expand Down
75 changes: 73 additions & 2 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,6 @@ def expand(self, pcoll):
# This code path is only used in the local direct runner. For Dataflow
# runner execution, the GroupByKey transform is expanded on the service.
input_type = pcoll.element_type

if input_type is not None:
# Initialize type-hints used below to enforce type-checking and to pass
# downstream to further PTransforms.
Expand Down Expand Up @@ -1373,11 +1372,83 @@ def infer_output_type(self, unused_input_type):
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin)
self.pipeline = pbegin.pipeline
return pvalue.PCollection(self.pipeline)
coder = typecoders.registry.get_coder(self.infer_output_type(None))
source = self._create_source(self.value, coder)
return (pbegin.pipeline
| Read(source).with_output_types(self.infer_output_type(None)))

def get_windowing(self, unused_inputs):
return Windowing(GlobalWindows())

def _create_source(self, values, coder):
from apache_beam import io

class _CreateSource(io.iobase.BoundedSource):
def __init__(self, values, coder, is_serialized=False):
self._coder = coder
self._serialized_values = []
self._total_size = 0
if is_serialized:
self._serialized_values = values
else:
for value in values:
serialized_value = self._coder.encode(value)
self._serialized_values.append(serialized_value)

for serialized_value in self._serialized_values:
self._total_size += len(serialized_value)

def read(self, range_tracker):
start_position = range_tracker.start_position()
element_iter = iter(self._serialized_values[start_position:])
for i in range(start_position, range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
yield self._coder.decode(next(element_iter))

def split(self, desired_bundle_size, start_position=None, stop_position=None):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = len(self._serialized_values)

avg_size_per_element = self._total_size / len(self._serialized_values)
num_elements_per_split = desired_bundle_size / avg_size_per_element

start = start_position
while start < stop_position:
end = min(start + num_elements_per_split, stop_position)
remaining = stop_position - end
# Avoid having a too small bundle at the end.
if remaining < (desired_bundle_size / 4):
end = stop_position

sub_source = _CreateSource(self._serialized_values[start:end],
self._coder,
is_serialized=True)

from apache_beam import io
yield io.iobase.SourceBundle(weight=(end - start),
source=sub_source,
start_position=start,
stop_position=end)

start = end

def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = len(self._serialized_values)

from apache_beam import io
return io.OffsetRangeTracker(start_position, stop_position)

def estimate_size(self):
return self._total_size

return _CreateSource(values, coder)


def Read(*args, **kwargs):
from apache_beam import io
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ def __ror__(self, left, label=None):
replacements = {id(v): p | 'CreatePInput%s' % ix >> Create(v)
for ix, v in enumerate(pvalues)
if not isinstance(v, pvalue.PValue) and v is not None}

pvalueish = _SetInputPValues().visit(pvalueish, replacements)
self.pipeline = p
result = p.apply(self, pvalueish, label)
Expand Down