New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34134] Add tracing for restored state size and locations #24119
[FLINK-34134] Add tracing for restored state size and locations #24119
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this extension @StefanRRichter. I don't see any major comments.
|
||
@Override | ||
public String toString() { | ||
return "StateObjectSizeStatsCollector{" + "stats=" + stats + '}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this print to a human readable map representation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
private Stream<StateObjectCollection<? extends StateObject>> streamOperatorAndKeyedStates() { | ||
return Stream.of(managedOperatorState, rawOperatorState, managedKeyedState, rawKeyedState) | ||
.filter(Objects::nonNull); | ||
} | ||
|
||
private Stream<StateObjectCollection<? extends AbstractChannelStateHandle<?>>> | ||
streamChannelStates() { | ||
return Stream.<StateObjectCollection<? extends AbstractChannelStateHandle<?>>>of( | ||
inputChannelState, resultSubpartitionState) | ||
.filter(Objects::nonNull); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need two methods, can not we have just a single one returning both op and channel state?
edit: I see collectUniqueDelegates
. Note, collectUniqueDelegates
could be refactored to also support but ignore non channel state, and then we wouldn't need to differentiate between two types of handles? But I'm not sure which option is better so 🤷
.forEach(channelHandle -> channelHandle.collectSizeStats(statsCollector)); | ||
|
||
// Report collected stats to metrics | ||
statsCollector | ||
.getStats() | ||
.forEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are going all in with streams, why don't you merge those two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One is still collecting remaining stats from channels, the second is reporting. I like to keep them separate.
// This handle is just temporary and will the directory size will never be used for | ||
// metrics, so we just fill it in as 0. | ||
new DirectoryStateHandle(downloadDestination, 0L), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For completeness, why don't you pass here stateHandle.getStateSize()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to double check if that would do the right thing, I didn't care much because none of that is every reported, but yeah...if it works it's nicer.
docs/content.zh/docs/ops/traces.md
Outdated
@@ -159,6 +159,14 @@ Flink reports a single span trace for the whole checkpoint and job initializatio | |||
<td>(Max/Sum)DownloadStateDurationMs<br><br>(optional - currently only supported by RocksDB Incremental)</td> | |||
<td>The aggregated (max and sum) across all subtasks duration of downloading state files from the DFS.</td> | |||
</tr> | |||
<tr> | |||
<td>(Max/Sum)RestoreStateSizeBytes.[location]<br><br>(optional - only reported if metrics.trace-restore-state-size is true)</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RestoredStateSizeBytes ? (ditto in the actual metric name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine with me.
c86ec49
to
28ee9ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming green azure!
What is the purpose of the change
Introduces tracing for state sizes and locations in task recovery. This is particularly interesting for a mixed recovery with some local and some remote state.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation