Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3383c26
streaming test
charlesccychen May 24, 2017
038f3f3
Working TestStream evaluator
charlesccychen May 24, 2017
fc661f6
Slightly better watermark propagation, debugging print cleanup.
charlesccychen May 24, 2017
053c7ae
Factor out GroupAlsoByWindows so that it can be overridden by the Dir…
charlesccychen May 24, 2017
c2b3f32
Don't buffer elements in GroupByKeyOnly.
charlesccychen Apr 21, 2017
8854c53
Skeleton implementation of GroupAlsoByKeyEvaluator.
charlesccychen May 24, 2017
8be01a9
Mostly working streaming GroupAlsoByWindow.
charlesccychen May 24, 2017
bf38fe7
Introduce keyed bundles, KeyedWorkItems.
charlesccychen May 24, 2017
ae0d80d
Rename old non-keyed state to legacy_state, introduce KeyedWorkItem, …
charlesccychen Apr 24, 2017
97de556
Fix rebase to head after Beam 2.0 release.
charlesccychen May 24, 2017
341fa92
Clarify transform_keyed_states.
charlesccychen May 25, 2017
1e83d5e
Support timer firings.
charlesccychen May 26, 2017
256309f
Reintroduce hacks from head.
charlesccychen May 26, 2017
25ce9f1
Add debugging output.
charlesccychen May 26, 2017
7be85c6
Don't hold input watermark on pending elements.
charlesccychen May 26, 2017
1df07a5
Finish moving earliest hold extraction to evaluators, dbg output++
charlesccychen May 30, 2017
e52ba42
Batch WordCount works in streaming mode!!
charlesccychen May 31, 2017
e5bb716
Working batch and streaming mode execution, except for certain empty …
charlesccychen Jun 5, 2017
91873cc
PubSubIO for direct runner.
robertwb Jun 7, 2017
e05778f
Merge pull request #1 from robertwb/streaming
charlesccychen Jun 7, 2017
12a9517
Use keyed holds in watermark manager
charlesccychen Jun 8, 2017
b90a2e4
Fix dropped messages in PubSubRead.
charlesccychen Jun 9, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import core
from apache_beam.transforms import window
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
from apache_beam.transforms import ParDo
from apache_beam.transforms.display import DisplayDataItem
Expand Down Expand Up @@ -60,10 +63,13 @@ def __init__(self, topic, subscription=None, id_label=None):
subscription=subscription,
id_label=id_label)

def get_windowing(self, unused_inputs):
return core.Windowing(window.GlobalWindows())

def expand(self, pvalue):
pcoll = pvalue.pipeline | Read(self._source)
pcoll.element_type = bytes
pcoll = pcoll | 'decode string' >> ParDo(_decodeUtf8String)
pcoll = pcoll | 'decode string' >> Map(_decodeUtf8String)
pcoll.element_type = unicode
return pcoll

Expand All @@ -81,7 +87,7 @@ def __init__(self, topic):
self._sink = _PubSubPayloadSink(topic)

def expand(self, pcoll):
pcoll = pcoll | 'encode string' >> ParDo(_encodeUtf8String)
pcoll = pcoll | 'encode string' >> Map(_encodeUtf8String)
pcoll.element_type = bytes
return pcoll | Write(self._sink)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def run(self, test_runner_api=True):
"""Runs the pipeline. Returns whatever our runner returns after running."""

# When possible, invoke a round trip through the runner API.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmoskitten wrote:
Please fix

Done.

if test_runner_api and self._verify_runner_api_compatible():
if test_runner_api and self._verify_runner_api_compatible() and False:
return Pipeline.from_runner_api(
self.to_runner_api(), self.runner, self._options).run(False)

Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ class PBegin(PValue):
transforms. This allows us to have transforms that uniformly take PValue(s)
as inputs.
"""
pass
@property
def windowing(self):
from apache_beam.transforms import core, window
return core.Windowing(window.GlobalWindows())


class PDone(PValue):
Expand Down
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/runners/direct/bundle_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ def __init__(self, stacked):
self._stacked = stacked

def create_bundle(self, output_pcollection):
return _Bundle(output_pcollection, self._stacked)
return _Bundle(output_pcollection, None, stacked=self._stacked)

def create_keyed_bundle(self, output_pcollection, key):

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix create_keyed_bundle.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please fix create_keyed_bundle.

Done.

return _Bundle(output_pcollection, key, stacked=self._stacked)

def create_empty_committed_bundle(self, output_pcollection):
bundle = self.create_bundle(output_pcollection)
Expand Down Expand Up @@ -107,14 +110,18 @@ def windowed_values(self):
yield WindowedValue(v, self._initial_windowed_value.timestamp,
self._initial_windowed_value.windows)

def __init__(self, pcollection, stacked=True):
assert isinstance(pcollection, pvalue.PCollection)
def __init__(self, pcollection, key, stacked=True):
assert isinstance(pcollection, (pvalue.PBegin, pvalue.PCollection))
self._pcollection = pcollection
self.key = key
self._elements = []
self._stacked = stacked
self._committed = False
self._tag = None # optional tag information for this bundle

def __repr__(self):
return '<Bundle %s %s>' % (self._pcollection, self._elements)

def get_elements_iterable(self, make_copy=False):
"""Returns iterable elements.

Expand Down Expand Up @@ -183,6 +190,7 @@ def add(self, element):
self._elements.append(element)

def output(self, element):
assert isinstance(element, WindowedValue)
self.add(element)

def commit(self, synchronized_processing_time):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self):
self._num_transforms = 0

def visit_transform(self, applied_ptransform):
print 'VISIT', applied_ptransform, '; inputs:', applied_ptransform.inputs, '; parent:', applied_ptransform.parent
inputs = list(applied_ptransform.inputs)
if inputs:
for input_value in inputs:
Expand All @@ -55,4 +56,5 @@ def visit_transform(self, applied_ptransform):
self.step_names[applied_ptransform] = 's%d' % (self._num_transforms)
self._num_transforms += 1
for side_input in applied_ptransform.side_inputs:
print 'VIEW', side_input
self.views.append(side_input)
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import collections
import logging

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.runners.direct.bundle_factory import BundleFactory
from apache_beam.runners.runner import PipelineResult
Expand Down Expand Up @@ -56,6 +58,37 @@ def apply_CombinePerKey(self, transform, pcoll):
except NotImplementedError:
return transform.expand(pcoll)

def apply_GroupAlsoByWindow(self, transform, pcoll):
transform._check_pcollection(pcoll)
return pvalue.PCollection(pcoll.pipeline)

def apply_ReadStringsFromPubSub(self, transform, pcoll):
# Execute this as a native transform.
return pvalue.PCollection(pcoll.pipeline)

def apply_WriteStringsToPubSub(self, transform, pcoll):
from google.cloud import pubsub
topic_name = transform._sink.topic
class WriteToPubSub(beam.DoFn):
_topic = None
def start_bundle(self):
if self._topic is None:
self._topic = pubsub.Client().topic(topic_name)
self._buffer = []
def process(self, elem):
self._buffer.append(elem.encode('utf-8'))
if len(self._buffer) >= 100:
self._flush()
def finish_bundle(self):
self._flush()
def _flush(self):
if self._buffer:
with self._topic.batch() as batch:
for datum in self._buffer:
batch.publish(datum)
self._buffer = []
return pcoll | beam.ParDo(WriteToPubSub())

def run(self, pipeline):
"""Execute the entire pipeline and returns an DirectPipelineResult."""

Expand Down
134 changes: 111 additions & 23 deletions sdks/python/apache_beam/runners/direct/evaluation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,60 @@
from apache_beam.runners.direct.watermark_manager import WatermarkManager
from apache_beam.runners.direct.executor import TransformExecutor
from apache_beam.runners.direct.direct_metrics import DirectMetrics
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.utils import counters




class DirectUnmergedState(InMemoryUnmergedState):
def __init__(self):
super(DirectUnmergedState, self).__init__()
self.new_timers = []

def set_timer(self, window, name, time_domain, timestamp):
print '[!!] set_timer', window, name, time_domain, timestamp
super(DirectUnmergedState, self).set_timer(window, name, time_domain, timestamp)

def clear_timer(self, window, name, time_domain):
print '[!!] clear_timer', window, name, time_domain
# TODO(ccy): this is not implemented, but is not strictly necessary as it is an optimization
super(DirectUnmergedState, self).clear_timer(window, name, time_domain)



class StepContext(object):
def __init__(self, execution_context):
self._execution_context = execution_context
self._state = None

def get_state(self):
# TODO(ccy): consider using copy on write semantics so that work items can be retried.
if not self._state:
if self._execution_context.existing_state:
self._state = self._execution_context.existing_state
else:
self._state = DirectUnmergedState()
return self._state



class _ExecutionContext(object):

def __init__(self, watermarks, existing_state):
self._watermarks = watermarks
self._existing_state = existing_state
def __init__(self, applied_ptransform, watermarks, existing_state, legacy_existing_state, key):
self.applied_ptransform = applied_ptransform
self.watermarks = watermarks
self.existing_state = existing_state
self.legacy_existing_state = legacy_existing_state
self.key = key
# TODO(ccy): key, clock as first arguments for consistency with Java.
self._step_context = None

@property
def watermarks(self):
return self._watermarks
def get_step_context(self):
if not self._step_context:
self._step_context = StepContext(self)
return self._step_context

@property
def existing_state(self):
return self._existing_state


class _SideInputView(object):
Expand Down Expand Up @@ -79,16 +117,20 @@ def get_value_or_schedule_after_output(self, side_input, task):
def add_values(self, side_input, values):
with self._lock:
view = self._views[side_input]
if view.has_result:
print 'ERROR!!! VIEW ALREADY HAS RESULT', side_input, values
assert not view.has_result
view.elements.extend(values)

def finalize_value_and_get_tasks(self, side_input):
print '***** FINALIZE VIEWWWWW', side_input
with self._lock:
view = self._views[side_input]
assert not view.has_result
assert view.value is None
assert view.callable_queue is not None
view.value = self._pvalue_to_value(side_input, view.elements)
print 'result', view.elements, view.value
view.elements = None
result = tuple(view.callable_queue)
for task in result:
Expand Down Expand Up @@ -145,18 +187,32 @@ def __init__(self, pipeline_options, bundle_factory, root_transforms,
self._pcollection_to_views = collections.defaultdict(list)
for view in views:
self._pcollection_to_views[view.pvalue].append(view)
import pprint
print 'PCOLLECTION TO VIEWS'
pprint.pprint(self._pcollection_to_views)

# AppliedPTransform -> Evaluator specific state objects
self._application_state_interals = {}
self._legacy_existing_state = {}
# AppliedPTransform -> {key -> DirectUnmergedState objects}; todo: rename
self._transform_keyed_states = self._initialize_transform_states(root_transforms, value_to_consumers)
self._watermark_manager = WatermarkManager(
Clock(), root_transforms, value_to_consumers)
Clock(), root_transforms, value_to_consumers, self._transform_keyed_states)
self._side_inputs_container = _SideInputsContainer(views)
self._pending_unblocked_tasks = []
self._counter_factory = counters.CounterFactory()
self._cache = None
self._metrics = DirectMetrics()

self._lock = threading.Lock()

def _initialize_transform_states(self, root_transforms, value_to_consumers):
transform_keyed_states = {}
for transform in root_transforms:
transform_keyed_states[transform] = {}
for consumers in value_to_consumers.values():
for consumer in consumers:
transform_keyed_states[consumer] = {}
return transform_keyed_states

def use_pvalue_cache(self, cache):
assert not self._cache
Expand Down Expand Up @@ -199,39 +255,61 @@ def handle_result(
the committed bundles contained within the handled result.
"""
with self._lock:
committed_bundles = self._commit_bundles(
result.uncommitted_output_bundles)
committed_bundles, unprocessed_bundle = self._commit_bundles(
result.uncommitted_output_bundles,
result.unprocessed_bundle)
self._watermark_manager.update_watermarks(
completed_bundle, result.transform, completed_timers,
completed_bundle, unprocessed_bundle, result.transform, completed_timers,
committed_bundles, result.watermark_hold)

self._metrics.commit_logical(completed_bundle,
result.logical_metric_updates)

print 'HANDLE RESULT COMMITED', committed_bundles, result.uncommitted_output_bundles, result.unprocessed_bundle
# If the result is for a view, update side inputs container.
if (result.uncommitted_output_bundles
and result.uncommitted_output_bundles[0].pcollection
in self._pcollection_to_views):
print '***** HI I AM A VIEW', result.uncommitted_output_bundles[0].pcollection, committed_bundles
for view in self._pcollection_to_views[
result.uncommitted_output_bundles[0].pcollection]:
for committed_bundle in committed_bundles:
print 'ADD TO VIEW', committed_bundle.get_elements_iterable(make_copy=True)
# side_input must be materialized.
self._side_inputs_container.add_values(
view,
committed_bundle.get_elements_iterable(make_copy=True))
if (self.get_execution_context(result.transform)
.watermarks.input_watermark
== WatermarkManager.WATERMARK_POS_INF):
completed = True
print 'STUFF', self._transform_keyed_states[result.transform]
watermarks = self._watermark_manager.get_watermarks(result.transform)
if watermarks._pending:
completed = False
print 'WATERMARKS', watermarks, watermarks._pending
for key in self._transform_keyed_states[result.transform]:
if (self.get_execution_context(result.transform, key).watermarks.input_watermark
< WatermarkManager.WATERMARK_POS_INF or
self.get_execution_context(result.transform, key).watermarks._pending):
print '&&& COMPLETED FALSE', result.transform, key, self.get_execution_context(result.transform, key).watermarks._pending
completed = False
if completed:
print '***** HI I AM COMPLETED', result.uncommitted_output_bundles[0].pcollection
self._pending_unblocked_tasks.extend(
self._side_inputs_container.finalize_value_and_get_tasks(view))
else:
print '***** HI I AM *NOT* COMPLETED', result.uncommitted_output_bundles[0].pcollection

if result.counters:
for counter in result.counters:
merged_counter = self._counter_factory.get_counter(
counter.name, counter.combine_fn)
merged_counter.accumulator.merge([counter.accumulator])

self._application_state_interals[result.transform] = result.state
self._legacy_existing_state[result.transform] = result.legacy_state
if not result.state:
if completed_bundle.key in self._transform_keyed_states[result.transform]:
del self._transform_keyed_states[result.transform][completed_bundle.key]
else:
self._transform_keyed_states[result.transform][completed_bundle.key] = result.state
return committed_bundles

def get_aggregator_values(self, aggregator_or_name):
Expand All @@ -244,24 +322,33 @@ def schedule_pending_unblocked_tasks(self, executor_service):
executor_service.submit(task)
self._pending_unblocked_tasks = []

def _commit_bundles(self, uncommitted_bundles):
def _commit_bundles(self, uncommitted_output_bundles, unprocessed_bundle):
"""Commits bundles and returns a immutable set of committed bundles."""
for in_progress_bundle in uncommitted_bundles:
for in_progress_bundle in uncommitted_output_bundles:
producing_applied_ptransform = in_progress_bundle.pcollection.producer
watermarks = self._watermark_manager.get_watermarks(
producing_applied_ptransform)
in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
return tuple(uncommitted_bundles)
if unprocessed_bundle:
unprocessed_bundle.commit(None)
return tuple(uncommitted_output_bundles), unprocessed_bundle

def get_execution_context(self, applied_ptransform):
def get_execution_context(self, applied_ptransform, key):
return _ExecutionContext(
applied_ptransform,
self._watermark_manager.get_watermarks(applied_ptransform),
self._application_state_interals.get(applied_ptransform))
self._transform_keyed_states[applied_ptransform].get(key),
self._legacy_existing_state.get(applied_ptransform),
key)

def create_bundle(self, output_pcollection):
"""Create an uncommitted bundle for the specified PCollection."""
return self._bundle_factory.create_bundle(output_pcollection)

def create_keyed_bundle(self, output_pcollection, key):
"""Create an uncommitted bundle for the specified PCollection."""
return self._bundle_factory.create_keyed_bundle(output_pcollection, key)

def create_empty_committed_bundle(self, output_pcollection):
"""Create empty bundle useful for triggering evaluation."""
return self._bundle_factory.create_empty_committed_bundle(
Expand Down Expand Up @@ -290,6 +377,7 @@ def is_done(self, transform=None):

def _is_transform_done(self, transform):
tw = self._watermark_manager.get_watermarks(transform)
print '[!!] TRANSFORM_DONE?', transform, 'IWM', tw.input_watermark,'OWM', tw.output_watermark, tw.output_watermark == WatermarkManager.WATERMARK_POS_INF, tw._pending
return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF

def get_value_or_schedule_after_output(self, side_input, task):
Expand Down
Loading