New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8942][runtime] Pass heartbeat target ResourceID #5699
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @zentol. The changes look good to me. The only comment was whether we want to expose that we use a Map
to store the heartbeat targets or whether we expose the ResourceID
via the HeartbeatMonitor
. What do you think?
@Test | ||
public void testHeartbeatManagerSenderTargetPayload() throws Exception { | ||
final long heartbeatTimeout = 100L; | ||
final long heartbeatPeriod = 2000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's set the period to 1L
to speed up the test a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't have an affect on the test time. The HeartbeatManagerSenderImpl
sends one heartbeat right away without delay, and we only require on for each target.
@@ -106,8 +107,8 @@ Executor getExecutor() { | |||
return heartbeatListener; | |||
} | |||
|
|||
Collection<HeartbeatManagerImpl.HeartbeatMonitor<O>> getHeartbeatTargets() { | |||
return heartbeatTargets.values(); | |||
Collection<Map.Entry<ResourceID, HeartbeatMonitor<O>>> getHeartbeatTargets() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a getTargetID()
to the HeartbeatMonitor
? Then we would not have to expose, even though it is an internal method, that we use a Map
to store the heartbeat targets. We could then keep the return type Collection<HeartbeatMonitor>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is a better solution.
@tillrohrmann I've addressed your comments. |
Thanks for addressing them. Merging this PR once Travis gave green light. |
received payload field now volatile Add HeartbeatMonitor#getHeartbeatTargetId This closes apache#5699.
received payload field now volatile Add HeartbeatMonitor#getHeartbeatTargetId This closes #5699.
received payload field now volatile Add HeartbeatMonitor#getHeartbeatTargetId This closes apache#5699.
What is the purpose of the change
With this PR the heartbeat target
ResourceID
is passed to theHeartbeatListener
when retrieving the payload to send. This allows the listener to create target-dependent payloads.The primary use-case is FLINK-8881, where accumulators are sent via heartbeats to the JobManager. Here we only want to send accumulators for the relevant job, and not for all jobs.
Brief change log
ResourceID
parameter toHeartbeatListener#retrievePayload
HeartbeatManagerImpl#getHeartbeatTargets
to also contain the targetResourceID
Verifying this change
This change added tests:
HeartbeatManagerTest
:testHeartbeatManagerTargetPayload
testHeartbeatManagerSenderTargetPayload
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation