-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: refactor sink shuffling statistics collection #10331
Conversation
b97bf41
to
0882cb8
Compare
Moved
Will add the sketch range partitioner in a separate PR following this one. |
this.dataStatistics = statisticsSerializer.createInstance(); | ||
} | ||
private final StatisticsType type; | ||
private final Map<SortKey, Long> keyFrequency; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine both Map and Sketch stats in the same aggregated statistics object would allow run-time switch from Map
stats to Sketch
.
if (record.type() == StatisticsType.Map) { | ||
keyFrequencySerializer.serialize(record.keyFrequency(), target); | ||
} else { | ||
rangeBoundsSerializer.serialize(Arrays.asList(record.rangeBounds()), target); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reused list serializer from Flink. paying a small penalty for array to list conversion for that.
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private void merge(DataStatistics taskStatistics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method shows the stats type migration from Map
to Sketch
....19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Show resolved
Hide resolved
|
||
if (localStatistics.type() == StatisticsType.Map) { | ||
Map<SortKey, Long> mapStatistics = (Map<SortKey, Long>) localStatistics.result(); | ||
if (statisticsType == StatisticsType.Auto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is stats migration (Map -> Sketch) at operator side during collection phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With AUTO
, if any task, or coordinator decides that we move to sketch then it might be a good idea for everyone to move to sketch to save memory, and transformations.
Do we want to have an extra message in this case, or at least switch when a global stat comes where we already switched to stat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question
- each operator makes independent decision on switching from Map to Sketch during local collection phase.
- when operators received the global statistics from coordinator, operators should also check if type switch is needed. but looks like I missed this logic. will add.
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
||
@Internal | ||
class DataStatisticsSerializer extends TypeSerializer<DataStatistics> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this single serializer can handle both map and sketch stats type
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java
Show resolved
Hide resolved
@@ -66,6 +66,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |||
exclude group: 'org.slf4j' | |||
} | |||
|
|||
implementation libs.datasketches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the jar file is about 1MB. so not too big to be included
...link/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsSerializer.java
Show resolved
Hide resolved
SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update); | ||
coordinatorSketchStatistics.update(taskSketch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering which is better:
- Getting a map from task -> converting task map to sketch -> merging the coordinator and the map sketch
- Updating the coordinator sketch, by adding the values from the map directly
Which one is performing better? Which results in better approximation in the resulting sketch?
If we consciously use the 1st solution, then we probably want to send a message to the tasks when we switch to sketch to not bother sending the whole map, but just the sketch (it might be a smaller message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, once coordinator switched to sketch, all operators will switch too upon the receiving of the global statistics
....19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java
Show resolved
Hide resolved
...1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsSerializer.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapDataStatistics.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsType.java
Outdated
Show resolved
Hide resolved
this.checkpointId = checkpointId; | ||
this.type = type; | ||
this.keyFrequency = keyFrequency; | ||
this.rangeBounds = rangeBounds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add a check at here to make sure keyFrequency and rangeBounds won't have value at the same time
private final int parallelism; | ||
private final TypeSerializer<DataStatistics> statisticsSerializer; | ||
private final int downstreamParallelism; | ||
private final StatisticsType statisticsType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the field is not being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. the variable is needed but missed as you pointed out in the other comment below on coordinator migration from Map to Sketch
Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) taskStatistics.result(); | ||
if (coordinatorStatisticsType == StatisticsType.Map) { | ||
taskMapStats.forEach((key, count) -> coordinatorMapStatistics.merge(key, count, Long::sum)); | ||
if (coordinatorMapStatistics.size() > switchToSketchThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for coordinator, unlike operator which needs to check if StatisticsType = Auto, we will convert it from map to sketch once the size reaches the threshold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. it is related to your other comment that statisticsType
was not used. it should be used and checked here.
StatisticsEvent statisticsEvent = | ||
StatisticsEvent.createAggregatedStatisticsEvent( | ||
checkpointId, globalStatistics, aggregatedStatisticsSerializer); | ||
for (int i = 0; i < context.currentParallelism(); ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a function #parallelism at line 187 to get the current parallelism. Do we want to remove the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. let me remove the parallelism()
method as it is too trivial to be kept
} | ||
|
||
this.taskStatisticsType = StatisticsUtil.collectType(statisticsType, globalStatistics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Iceberg repo follows the type that, we always use this.
to refer to the class variable? If that's the case, then let's update globalStatistics
to this.globalStatistics
like what we do in line 113
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment for line 124 and 125
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg style only uses this.
if trying to modify the value/state (like setter or constructor)
} | ||
|
||
/** | ||
* To understand how range bounds are used in range partitioning, heere is an example for human |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo heere to here
* <li>Target size is "coordinator reservoir size * over sampling ration (10) / operator | ||
* parallelism" | ||
* <li>Min is 1K to achieve good accuracy while memory footprint is still relatively small | ||
* <li>Max is 100K to cap the memory footprint on coordinator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the current implementation, operator reservoir size depends on coordinator reservoir size completely. Do we check the operator reservoir min max value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel not needed. coordinator reservoir size is already correlated with parallelism/partitions. operator reservoir size probably can purely tie to OPERATOR_OVER_SAMPLE_RATIO.
taskMapStats.forEach( | ||
(sortKey, count) -> { | ||
for (int i = 0; i < count; ++i) { | ||
sketchConsumer.accept(sortKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we consider to execute the sketchConsumer.accept in parallel to make it faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought about it. but it would require a thread pool. let's start simple. if this turns out to be an issue later, we can improve it then.
...9/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java
Show resolved
Hide resolved
…h statistics and auto migration from Map stats to reservoir sampling sketch if cardinality is detected high
…unit test to cover the 2 scenarios of operator stats migrations
…rrent checkpoints properly.
private final Comparator<StructLike> comparator; | ||
private final NavigableMap<Long, Aggregation> aggregationsPerCheckpoint; | ||
|
||
private volatile AggregatedStatistics completedStatistics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the thread model work for the event handling? Do we need the volatile
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question. we don't really need volatile
here as coordinator event handling is always single thread. let me remove the volatile.
this.coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one small question from my side
thanks @pvary and @yegangy0718 for the code review |
to support sketch statistics and auto migration from Map stats to reservoir sampling sketch if cardinality is detected high