New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add direct path code path #30764
Add direct path code path #30764
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
6a6e737
to
3c88dfa
Compare
@scwhittle still wrapping up the unit tests, just would like an initial pass |
87a043e
to
ec12c72
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30764 +/- ##
============================================
+ Coverage 70.95% 71.27% +0.32%
+ Complexity 4470 1485 -2985
============================================
Files 1257 904 -353
Lines 140917 112898 -28019
Branches 4305 1076 -3229
============================================
- Hits 99989 80471 -19518
+ Misses 37451 30408 -7043
+ Partials 3477 2019 -1458 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See direct path harness comment first, I think further work needs to be done to reduce the duplication because we're going to have both direct/non-direct paths for a while.
@@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater( | |||
super(worker, Integer.MAX_VALUE); | |||
this.workItemStatusClient = workItemStatusClient; | |||
this.workItem = workItem; | |||
this.hotKeyLogger = new HotKeyLogger(); | |||
this.hotKeyLogger = HotKeyLogger.ofSystemClock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can cleanups be moved to separate PRs? Less churn if things are reverted and easier to review and summarize with commit description.
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
Show resolved
Hide resolved
...dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java
Show resolved
Hide resolved
...-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
Show resolved
Hide resolved
StreamingWorkerHarness worker = | ||
isDirectPathPipeline(options) | ||
? StreamingEngineDirectPathWorkerHarness.fromOptions(options) | ||
: StreamingDataflowWorker.fromOptions(options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about moving the harness portions of StreamingDataflowWorker to a new StreamingSingleEndpointWorkerHarness class? (open to better name ideas).
I think that is clearer which parts of this file are non-direct path specific and which are shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reused code across different harness impls
.build()); | ||
} | ||
|
||
private static Stream<DirectHeartbeatRequest> toHeartbeatRequestStreamDirectPath( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a single helper used by both direct/non-direct methods since they are largely the same and could otherwise drift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done added HeartbeatRequests.java
executionStateQueue.offer(executionState); | ||
} | ||
|
||
public Optional<ExecutionState> getExecutionState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name acquireExecutionState or pollExecutionState?
get makes it sound like a simple accessor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.slf4j.LoggerFactory; | ||
|
||
@Internal | ||
public final class StreamingApplianceComputationStateCacheLoader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put the config loading changes in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently needed for code dedup since the ConfigLoaders are used in the ComputationStateCache cache loaders
...org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceConfigLoader.java
Outdated
Show resolved
Hide resolved
LoggerFactory.getLogger(StreamingEngineDirectPathWorkerHarness.class); | ||
// Controls processing parallelism. Maximum number of threads for processing. Currently, each | ||
// thread processes one key at a time. | ||
private static final int MAX_PROCESSING_THREADS = 300; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is too much duplication between this and the other harness, which will make it difficult to add new features (such as current PR to make processing threads dynamic).
It seems like a lot of the things: executor, metrics, reporting, cache etc are not affected by how work is obtained, committed or state fetched. It would be better if we could instead keep the logic and just inject different work obtainer, committer, state fetcher.
Or alternatively we could make everything work with direct-path by always plumbing somethign to use for getdata/commitwork and in the non-direct path cases just having a single one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, reused components (mainly around config loading, computationStateCache, and work execution) across all streaming worker harness impls
…ween harness to their own classes/files. Add different harness implementations for Dispatched, Appliance, and Direct Path streaing jobs. StreamingDataflowWorker is now just a main method
… in work processing context, guard for null dataflowServiceOptions in StreamingEngineClient
@@ -257,3 +257,4 @@ checkstyleMain.enabled = false | |||
checkstyleTest.enabled = false | |||
//TODO(https://github.com/apache/beam/issues/19119): javadoc task should be enabled in the future. | |||
javadoc.enabled = false | |||
test.outputs.upToDateWhen {false} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
|
||
HotKeyLogger() {} | ||
public static HotKeyLogger ofSystemClock() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just name create?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
super(stepName); | ||
} | ||
|
||
public static MetricsLogger createUnboundedMetricsLogger() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unbounded is confusing with bounded/unbounded pcollection.
How about workerMetricsLogger if it is metrics not scoped to a step?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -17,15 +17,19 @@ | |||
*/ | |||
package org.apache.beam.runners.dataflow.worker.logging; | |||
|
|||
import org.checkerframework.checker.nullness.qual.Nullable; | |||
|
|||
/** Mapped diagnostic context for the Dataflow worker. */ | |||
@SuppressWarnings({ | |||
"nullness" // TODO(https://github.com/apache/beam/issues/20497) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed with your changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -90,11 +83,11 @@ private ActiveWorkState( | |||
} | |||
|
|||
static ActiveWorkState create(WindmillStateCache.ForComputation computationStateCache) { | |||
return new ActiveWorkState(new HashMap<>(), computationStateCache); | |||
return new ActiveWorkState(new ConcurrentHashMap<>(), computationStateCache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the concurrent hash map required? It seems getReadOnlyActivework is synchronized below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea this was unintended changed back
|
||
Windmill.GetConfigResponse response = maybeResponse.get(); | ||
|
||
// The max work item commit bytes should be modified to be dynamic once it is available in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
odd spot for this comment, the global id has this field but not individual computations.
workUnitExecutor, | ||
transformUserNameToStateFamilyByComputationId.getOrDefault( | ||
computationId, ImmutableMap.of()), | ||
perComputationStateCacheViewFactory.apply(computationId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like the appliance and SE differ just in how they get the map task and username->sf map.
What if we had an interface instead of StreamingConfigLoader<Windmill.GetConfigResponse> that can vend the MapTask and usertransform to statefamilymap for a computation?
Then the ComputationStateCacheLoader could just be a final class taking that loader. The StreamingEngineConfigLoader could implement that interface as well as the other one for the dynamic config and the appliance could just implement that.
I'm not sure the StreamingConfigLoader templated interface is buying us much since only the SE needs the background threads.
// the request. | ||
for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry : | ||
response.getSystemNameToComputationIdMapList()) { | ||
systemNameToComputationIdMap.put(entry.getSystemName(), entry.getComputationId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you get rid of this?
It seems we just request a single item, so should we just get a single response back? Can we then just pass the computationId to createComputationSTate instead of trying to map back from the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why this was being previously done? looks like based on
Line 1420 in 673da54
for (String serializedMapTask : response.getCloudWorksList()) { |
GetConfigResponse
and we create a different ComputationState
based on each MapTask
|
||
public abstract Builder setGetDataStream(GetDataStream value); | ||
|
||
abstract WorkProcessingContext autoBuild(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
autoBuild needs to be abstract since it is handled by autovalue codegen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could it be protected then? we don't want this to be called directly correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will try to use protected and see if it works
previously was following this guide https://github.com/google/auto/blob/main/value/userguide/builders-howto.md#-validate-property-values
FileSystems.setDefaultPipelineOptions(options); | ||
} | ||
|
||
public static ApplianceWorkerHarness fromOptions(DataflowWorkerHarnessOptions options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a high-level the difference between appliance/dispatcher/direct paths seem to be limited to:
- how to get the config
- how to get/schedule WorkProcessingContext (which then knows how to fetch/commit/heartbeat to right worker).
I think that moving things out of StreamingWorkerHarness into classes can help testing/readability but that there is still a lot of duplicated setup between the different harnesses which will make it harder to maintain.
Can we instead have a shared StreamingDataflowWorker that uses the objects and which just minimal injection differences are made when constructing for appliance/fanout/direct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm, will redo some of the components to manage that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! if you see ways to break up this change that would be great too. It's really big and github reviews are particularly painful. If we can just pull out various things from StreamingDataflowWorker to their own files/tests before getting to the direct path stuff I think it will help reviewing. I will try to stay on top of the reviews so hopefully we don't get bogged down too much due to more PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm!
Reminder, please take a look at this pr: @Abacn |
@Abacn not sure how to turn off the reminders but this doesn't require a look. It is being broken apart into smaller PRs which I am reviewing. |
@m-trieu Should we close this one for now? We can reopen after rebasing on top of the broken off PRs |
Reminder, please take a look at this pr: @Abacn |
Add direct path code path
we will be able to run direct path pipelines by passing in the option is
IsWindmillServiceDirectPathEnabled
StreamingDataflowWorker
in a seperate prR: @scwhittle
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.