Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8823] Make FnApiRunner work by executing ready elements instead of stages #15441

Merged
merged 30 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
40ed4d8
[BEAM-9640] Sketching watermark tracking on FnApiRunner.
pabloem Apr 2, 2020
c1d8f54
Addressing some comments
pabloem Jan 26, 2021
5c59afd
Fixups
pabloem Feb 1, 2021
f695058
fixing bug with truncation of restrictions
pabloem Feb 3, 2021
4781255
Fixing output watermark for stages
pabloem Feb 4, 2021
aadbef0
Moving visualization tools to different file
pabloem Feb 4, 2021
6d15f2c
Individual stages are run bundle-based
pabloem Mar 3, 2021
863d1a5
[wip] Working on per-bundle execution
pabloem Jun 29, 2021
69a44d7
fixups
pabloem Sep 1, 2021
197823b
Merge branch 'master' of https://github.com/apache/beam into per-bund…
pabloem Sep 1, 2021
3edb6bc
Fixing tests
pabloem Sep 14, 2021
e03dbd9
cleanup
pabloem Sep 14, 2021
9ca0022
fixing lint, formatting, some typing info
pabloem Sep 14, 2021
708098b
More tests passing
pabloem Sep 15, 2021
802eb45
fix import
pabloem Sep 15, 2021
8632ae6
fix test
pabloem Sep 15, 2021
455308c
Fix most other tests
pabloem Sep 21, 2021
d1984dd
all passin
pabloem Sep 21, 2021
4150ac6
testout
pabloem Sep 21, 2021
95a9813
testing weird fix, hehe
pabloem Sep 21, 2021
dddf9f3
fixup
pabloem Sep 22, 2021
01f9fa4
Fix formatting
pabloem Sep 22, 2021
bb6cc4b
fix typing issues
pabloem Sep 23, 2021
d2be2fa
fixup
pabloem Sep 23, 2021
71221b2
fixup
pabloem Sep 23, 2021
a669a68
cleanup
pabloem Sep 23, 2021
4a4067b
addressing comments
pabloem Oct 4, 2021
64e354e
fix typoschmypo
pabloem Oct 5, 2021
2e8d046
Merge remote-tracking branch 'origin/master' into per-bundle-execution
pabloem Oct 5, 2021
70fd62b
proper timer handling
pabloem Oct 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 7 additions & 5 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,11 +1148,13 @@ def expand(self, pcoll):
| 'Extract' >> core.FlatMap(lambda x: x[1]))
# PreFinalize should run before FinalizeWrite, and the two should not be
# fused.
pre_finalize_coll = do_once | 'PreFinalize' >> core.FlatMap(
_pre_finalize,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll))
pre_finalize_coll = (
do_once
| 'PreFinalize' >> core.FlatMap(
_pre_finalize,
self.sink,
AsSingleton(init_result_coll),
AsIter(write_result_coll)))
return do_once | 'FinalizeWrite' >> core.FlatMap(
_finalize_write,
self.sink,
Expand Down
357 changes: 282 additions & 75 deletions sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py

Large diffs are not rendered by default.

442 changes: 301 additions & 141 deletions sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ def cross_product(elem, sides):
pcoll | beam.FlatMap(cross_product, beam.pvalue.AsList(pcoll)),
equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')]))

def test_pardo_unfusable_side_inputs_with_separation(self):
def cross_product(elem, sides):
for side in sides:
yield elem, side

with self.create_pipeline() as p:
pcoll = p | beam.Create(['a', 'b'])
derived = ((pcoll, ) | beam.Flatten()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import itertools
import logging
import operator
from builtins import object
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Collection
from typing import Container
Expand All @@ -34,6 +37,8 @@
from typing import Iterable
from typing import Iterator
from typing import List
from typing import MutableMapping
from typing import NamedTuple
from typing import Optional
from typing import Set
from typing import Tuple
Expand All @@ -44,11 +49,17 @@
from apache_beam.internal import pickler
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.worker import bundle_processor
from apache_beam.transforms import combiners
from apache_beam.transforms import core
from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp

if TYPE_CHECKING:
from apache_beam.runners.portability.fn_api_runner.execution import ListBuffer
from apache_beam.runners.portability.fn_api_runner.execution import PartitionableBuffer

T = TypeVar('T')

Expand Down Expand Up @@ -77,14 +88,15 @@
IMPULSE_BUFFER = b'impulse'

# TimerFamilyId is identified by transform name + timer family
# TODO(pabloem): Rename this type to express this name is unique per pipeline.
TimerFamilyId = Tuple[str, str]

BufferId = bytes

# SideInputId is identified by a consumer ParDo + tag.
SideInputId = Tuple[str, str]
SideInputAccessPattern = beam_runner_api_pb2.FunctionSpec

DataOutput = Dict[str, bytes]

# A map from a PCollection coder ID to a Safe Coder ID
# A safe coder is a coder that can be used on the runner-side of the FnApi.
# A safe coder receives a byte string, and returns a type that can be
Expand All @@ -96,6 +108,27 @@
# (MultiMap / Iterable).
DataSideInput = Dict[SideInputId, Tuple[bytes, SideInputAccessPattern]]

DataOutput = Dict[str, BufferId]

# A map of [Transform ID, Timer Family ID] to [Buffer ID, Time Domain for timer]
# The time domain comes from beam_runner_api_pb2.TimeDomain. It may be
# EVENT_TIME or PROCESSING_TIME.
OutputTimers = MutableMapping[TimerFamilyId, Tuple[BufferId, Any]]

# A map of [Transform ID, Timer Family ID] to [Buffer CONTENTS, Timestamp]
OutputTimerData = MutableMapping[TimerFamilyId,
Tuple['PartitionableBuffer',
timestamp.Timestamp]]

BundleProcessResult = Tuple[beam_fn_api_pb2.InstructionResponse,
List[beam_fn_api_pb2.ProcessBundleSplitResponse]]


# TODO(pabloem): Change tha name to a more representative one
class DataInput(NamedTuple):
data: MutableMapping[str, 'PartitionableBuffer']
timers: MutableMapping[TimerFamilyId, 'PartitionableBuffer']


class Stage(object):
"""A set of Transforms that can be sent to the worker for processing."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def set_watermark(self, wm: timestamp.Timestamp):

def upstream_watermark(self):
if self.producers:
return min(p.input_watermark() for p in self.producers)
return min(p.output_watermark() for p in self.producers)
else:
return timestamp.MAX_TIMESTAMP

Expand All @@ -71,13 +71,16 @@ def __init__(self, name):

def __str__(self):
return 'StageNode<inputs=%s,side_inputs=%s' % (
[i.name for i in self.inputs], [i.name for i in self.side_inputs])
[
'%s(%s, upstream:%s)' %
(i.name, i.watermark(), i.upstream_watermark())
for i in self.inputs
], ['%s(%s)' % (i.name, i.watermark()) for i in self.side_inputs])

def output_watermark(self):
if not self.outputs:
return self.input_watermark()
else:
return min(o.watermark() for o in self.outputs)
if not self.inputs:
return timestamp.MAX_TIMESTAMP
return min(i.watermark() for i in self.inputs)

def input_watermark(self):
if not self.inputs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,9 @@ def append(self, item):
self._overlay[self._key] = list(self._underlying[self._key])
self._overlay[self._key].append(item)

def extend(self, other: Buffer) -> None:
raise NotImplementedError()

StateType = Union[CopyOnWriteState, DefaultDict[bytes, Buffer]]

def __init__(self):
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.transforms import window
from apache_beam.transforms.combiners import CountCombineFn
from apache_beam.transforms.core import CombinePerKey
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import DoFn
from apache_beam.transforms.core import FlatMap
from apache_beam.transforms.core import Flatten
Expand Down Expand Up @@ -159,6 +160,8 @@ def _extract_input_pvalues(self, pvalueish):
return pcolls, pcolls

def expand(self, pcolls):
if not pcolls:
pcolls = (self.pipeline | Create([]), )
if isinstance(pcolls, dict):
if all(isinstance(tag, str) and len(tag) < 10 for tag in pcolls.keys()):
# Small, string tags. Pass them as data.
Expand Down