Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move heartbeat processor to where it is being used #31298

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented May 15, 2024

Heartbeat response processor should be close to where it is being used as we don't change the heartbeat response processor at all.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@m-trieu m-trieu force-pushed the mt-move-heartbeat-processor branch 2 times, most recently from 2110050 to d678be8 Compare May 31, 2024 22:47
Copy link

codecov bot commented May 31, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 68.55%. Comparing base (93cc6a5) to head (2110050).
Report is 53 commits behind head on master.

Current head 2110050 differs from pull request most recent head 08350cf

Please upload reports for the commit 08350cf to get more accurate results.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #31298   +/-   ##
=========================================
  Coverage     68.55%   68.55%           
  Complexity    14921    14921           
=========================================
  Files          2636     2636           
  Lines        222073   222069    -4     
  Branches      11825    11825           
=========================================
- Hits         152234   152233    -1     
+ Misses        63644    63641    -3     
  Partials       6195     6195           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@m-trieu m-trieu force-pushed the mt-move-heartbeat-processor branch 2 times, most recently from c6e3846 to c3aa698 Compare June 3, 2024 21:59
Copy link
Contributor

github-actions bot commented Jun 3, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

GrpcWindmillStreamFactory windmillStreamFactory =
createWindmillStreamFactory(options, clientId);
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createWindmillStreamFactoryBuilder(options, clientId);
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options));

// If ComputationConfig.Fetcher is the Streaming Appliance implementation, WindmillServerStub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some kind of a cyclic dependency between windmillStreamFactory and computationStateCache and the creation order is different between appliance and SE, which makes the code hard to reason about. How about something like arunpandianp@d428be0? Then we don't need these comments explaining if something then it is appliance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-move-heartbeat-processor branch from c3aa698 to d5aebd7 Compare June 4, 2024 00:49
GrpcWindmillStreamFactory windmillStreamFactory =
createWindmillStreamFactory(options, clientId);
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
createWindmillStreamFactoryBuilder(options, clientId);
GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(createStubFactory(options));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move construction of dispatcherClient inside createConfigFetcherComputationStateCacheAndWindmillClient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -393,7 +384,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
return new StreamingDataflowWorker(
windmillServer,
clientId,
configFetcherAndWindmillClient.getLeft(),
configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
computationStateCache,
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be windmillStateCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private static ConfigFetcherComputationStateCacheAndWindmillClient create(
ComputationConfig.Fetcher configFetcher,
ComputationStateCache computationStateCache,
Pair<WindmillServerStub, @Nullable GrpcWindmillStreamFactory> windmillClient) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split pair into separate members?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -185,7 +186,7 @@ private StreamingDataflowWorker(
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
AtomicInteger maxWorkItemCommitBytes,
GrpcWindmillStreamFactory windmillStreamFactory,
@Nullable GrpcWindmillStreamFactory windmillStreamFactory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need both windmillServer and windmillStreamFactory here?

I see windmillServer.appendSummaryHtml in turn calls windmillStreamFactory.appendSummaryHtml, can we remove windmillStreamFactory from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both since windmillStreamFactory is source of truth (it is used in GrpcWindmillServer#appendSummaryHtml)

In direct path mode we don't use GrpcWindmillServer/WindmillServerStub just inject windmillStreamFactory directly so will need that to add to status pages

new WorkHeartbeatResponseProcessor(computationStateCache::get))
.build();
if (options.isEnableStreamingEngine()) {
windmillStreamFactory.scheduleHealthChecks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like is missing on non test flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@m-trieu m-trieu force-pushed the mt-move-heartbeat-processor branch from d5aebd7 to b2b8a45 Compare June 4, 2024 06:54
@m-trieu m-trieu force-pushed the mt-move-heartbeat-processor branch from b2b8a45 to 08350cf Compare June 4, 2024 07:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants