Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9640] Sketching watermark tracking on FnApiRunner #11296

Merged
merged 10 commits into from
Jun 9, 2021

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Apr 2, 2020

r: @robertwb

This change adds an initial 'sketch' of watermark tracking to the batch runner. Watermark tracking is done like so:

  1. If a PCollection has a delayed application, its watermark will be held at MIN_WATERMARK.
  2. If a PTransform has a channel split, its input PCollection's watermark will be held at MIN_WATERMARK
  3. For timers, an 'input pcollection' node is created for each timer family to be consumed by a transform. If a bundle execution returns a timer set at time X, the input PCollection for that timer family will be held at X. This will hold back the downstream watermarks from the stage.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@pabloem
Copy link
Member Author

pabloem commented Apr 3, 2020

This is the output of watermark_manager.show() for one pipeline:
image

@pabloem pabloem force-pushed the fn-api-watermarks branch 2 times, most recently from 2797d98 to 694c8e1 Compare April 3, 2020 19:45
@pabloem pabloem force-pushed the fn-api-watermarks branch 3 times, most recently from 94ca0b0 to 8a0a4c2 Compare April 22, 2020 21:17
@pabloem
Copy link
Member Author

pabloem commented Apr 30, 2020

r: @robertwb

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took so long to get to this. Most of my questions are around watermark advancement.

w = min(i.watermark() for i in self.inputs)
return w

def input_watermark(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right, the input watermarks should always be an upper bound on the output watermark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I've changed the output_watermark to be calculated based on downstream PCollections.

def set_watermark(self, wm):
raise NotImplementedError('Stages do not have a watermark')

def output_watermark(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to take into account data that's "in flight." E.g. all the input watermarks could be at max-timestamp, but that doesn't mean that all the inputs' data has been consumed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this implementation to rely on the watermark from downstream PCollections.

@pabloem
Copy link
Member Author

pabloem commented May 5, 2020

Sorry it took so long to get to this. Most of my questions are around watermark advancement.

no worries. This is a critical component, and I have other work to do, so I'm glad to get a thoughtful review. I'll address your comments soon.

@stale stale bot added the stale label Aug 16, 2020
@stale stale bot closed this Aug 23, 2020
@pabloem pabloem reopened this Jan 26, 2021
@stale stale bot removed the stale label Jan 26, 2021
@pabloem pabloem force-pushed the fn-api-watermarks branch 5 times, most recently from 441a2a8 to 75b713e Compare February 2, 2021 20:05
@pabloem
Copy link
Member Author

pabloem commented Feb 3, 2021

Run Python 3.8 PostCommit

@apache apache deleted a comment from codecov bot Feb 4, 2021
@apache apache deleted a comment from stale bot Feb 4, 2021
@apache apache deleted a comment from stale bot Feb 4, 2021
@apache apache deleted a comment from stale bot Feb 4, 2021
@pabloem
Copy link
Member Author

pabloem commented Feb 4, 2021

@robertwb sorry about the long delay on this, but I've rebased this, and addressed some of your older comments.

@pabloem
Copy link
Member Author

pabloem commented Feb 4, 2021

cc: @tvalentyn FYI I am now focusing on this change

@apache apache deleted a comment from codecov bot Feb 5, 2021
@staticmethod
def _collect_written_timers(
bundle_context_manager: execution.BundleContextManager,
newly_set_timers: Dict[Tuple[str, str], ListBuffer],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a return value as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


This function reviews a stage that has just been run. The stage will have
written timers to its output buffers. The function then takes the timers,
and adds them to the `newly_set_timers` dictionary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

producer_name]
# We take the output with tag 'out' from the producer transform. The
# producer transform is a GRPC read, and it has a single output.
pcolls_with_delayed_apps.add(transform.outputs['out'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More flexibly, you could do only_element(transform.values())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import graphviz
except ImportError:
import warnings
warnings.warn('Unable to draw pipeline. graphviz library missing.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to make this a dependency? (E.g. is it fairly large?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - since the utilities are only used for internal debugging of the runner, I hesitate to add it as a dependency - it is also large as you point out. I don't think it fits under the test tag dependencies - we may need an extra tag for internal dependencies or something like that. I'm happy to add a new tag, or leave it out. Thoughts?

assert isinstance(pcnode, WatermarkManager.PCollectionNode)
snode.inputs.add(pcnode)
node = self._watermarks_by_name[pcname]
assert isinstance(node, WatermarkManager.PCollectionNode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we just assert that above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. removed.

# type: (str) -> Union[PCollectionNode, StageNode]
return self._watermarks_by_name[name]

def get_watermark(self, name) -> timestamp.Timestamp:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get/set watermark is never used on stage nodes, right? Does it make sense to keep them in the same dictionary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you're right. I was also having that sense when I was jumping hoops to unify the typing. I've separated them. Thanks!


for input in stage_inputs:
pcoll_id = get_pcoll_id(input)
if pcoll_id not in updates:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit I have a hard time keeping the exact ordering here in my head. E.g. is expected_timers in this set? In which of the loops above could updates[pcoll_id] have been set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added comments for these sections. LMK if that helps.

pcoll_id = get_pcoll_id(tr)
updates[pcoll_id] = timestamp.MIN_TIMESTAMP

for timer_pcoll_id, ts in watermarks_by_transform_and_timer_family.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be simpler to do something like

for timer_pcoll_id in expected_timers:
  updates[timer_pcoll_id] = watermarks_by_transform_and_timer_family.get(
      timestamp.MAX_TIMESTAMP)

than these two loops here. Or could timer_pcoll_id be in pcolls_with_da and/or transforms_w_splits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Much better : )

@codecov
Copy link

codecov bot commented Mar 2, 2021

Codecov Report

Merging #11296 (7246df4) into master (2d9c666) will increase coverage by 0.64%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #11296      +/-   ##
==========================================
+ Coverage   83.04%   83.68%   +0.64%     
==========================================
  Files         469      438      -31     
  Lines       58472    58731     +259     
==========================================
+ Hits        48556    49151     +595     
+ Misses       9916     9580     -336     
Impacted Files Coverage Δ
...python/apache_beam/examples/wordcount_dataframe.py 0.00% <0.00%> (-92.60%) ⬇️
...sdks/python/apache_beam/utils/interactive_utils.py 87.80% <0.00%> (-7.44%) ⬇️
...thon/apache_beam/runners/worker/sdk_worker_main.py 72.26% <0.00%> (-5.92%) ⬇️
...n/apache_beam/runners/direct/test_direct_runner.py 37.50% <0.00%> (-4.81%) ⬇️
...tes/tox/py38/build/srcs/sdks/python/test_config.py 66.66% <0.00%> (-4.77%) ⬇️
...s/sdks/python/apache_beam/runners/test/__init__.py 66.66% <0.00%> (-4.77%) ⬇️
...ld/srcs/sdks/python/apache_beam/io/gcp/__init__.py 80.00% <0.00%> (-4.62%) ⬇️
...thon/apache_beam/runners/worker/channel_factory.py 75.00% <0.00%> (-3.95%) ⬇️
...python/apache_beam/examples/streaming_wordcount.py 30.55% <0.00%> (-3.66%) ⬇️
...s/python/apache_beam/io/gcp/bigquery_file_loads.py 87.50% <0.00%> (-3.61%) ⬇️
... and 450 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1f35e2f...7246df4. Read the comment docs.

@pabloem
Copy link
Member Author

pabloem commented Mar 2, 2021

also rebased against master

@pabloem
Copy link
Member Author

pabloem commented Mar 12, 2021

@robertwb PTAL

3 similar comments
@pabloem
Copy link
Member Author

pabloem commented Mar 16, 2021

@robertwb PTAL

@pabloem
Copy link
Member Author

pabloem commented Mar 22, 2021

@robertwb PTAL

@pabloem
Copy link
Member Author

pabloem commented Mar 29, 2021

@robertwb PTAL

@pabloem
Copy link
Member Author

pabloem commented Mar 31, 2021

@y1chi do you think you have time to take a look at this PR?

@pabloem
Copy link
Member Author

pabloem commented Apr 14, 2021

@y1chi PTAL?

@y1chi
Copy link
Contributor

y1chi commented Apr 14, 2021

@y1chi PTAL?

Will do.

Copy link
Contributor

@y1chi y1chi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test we can use for validating the watermark logic?

if t.spec.urn == bundle_processor.DATA_INPUT_URN
}
self.watermark_manager = WatermarkManager(stages)
# self.watermark_manager.show()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

assert (
runner_execution_context.watermark_manager.get_stage_node(
bundle_context_manager.stage.name
).input_watermark() == timestamp.MAX_TIMESTAMP), (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output_watermark()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially I'm checking the input_watermark. I'll add a more advanced check in a follow-up change that updates the runner to support per-bundle execution (instead of per-stage)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, there is a mismatch between the assertion and the error message so that's why I'm confused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks for the catch. fixed that.

assert (
runner_execution_context.watermark_manager.get_stage_node(
bundle_context_manager.stage.name
).input_watermark() == timestamp.MAX_TIMESTAMP), (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the assertions only for batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently only batch is supported. this will be fixed later on.

timer_family_id)] = timestamp.MAX_TIMESTAMP
timer_watermark_data[(transform_id, timer_family_id)] = min(
timer_watermark_data[(transform_id, timer_family_id)],
decoded_timer.fire_timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be decoded_timer.hold_timestamp, currently timers will set hold_timestamp to fire_timestamp but I think for watermark we should still use decoded_timer.hold_timestamp which prevents potential breakage from BEAM-11507.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the tip! done.

stage_node = WatermarkManager.StageNode(stage_name)
self._stages_by_name[stage_name] = stage_node

def add_pcollection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function declared outside of the loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._pcollections_by_name[
timer_pcoll_name] = WatermarkManager.PCollectionNode(
timer_pcoll_name)
timer_pcoll_node = self._pcollections_by_name[timer_pcoll_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need these assertions immediately after updating the map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are added to fix type checking issues (ensuring the types are what's expected)

if pcoll_name not in self._pcollections_by_name:
self._pcollections_by_name[
pcoll_name] = WatermarkManager.PCollectionNode(pcoll_name)
pcoll_node = self._pcollections_by_name[pcoll_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are added to fix type checking issues

@pabloem
Copy link
Member Author

pabloem commented May 19, 2021

@y1chi PTAL

@aaltay
Copy link
Member

aaltay commented May 27, 2021

What is the next step on this PR?

@y1chi
Copy link
Contributor

y1chi commented May 27, 2021

it LGTM, I only have one more question on https://github.com/apache/beam/pull/11296/files#r635450347

@pabloem
Copy link
Member Author

pabloem commented Jun 8, 2021

Run Python 3.8 PostCommit

@pabloem pabloem merged commit de29bc5 into apache:master Jun 9, 2021
@pabloem pabloem deleted the fn-api-watermarks branch June 9, 2021 17:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants