-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: a few small fixes or tuning for range partitioner #10823
Conversation
0b41dfe
to
cf18332
Compare
@@ -47,18 +46,6 @@ class SketchRangePartitioner implements Partitioner<RowData> { | |||
public int partition(RowData row, int numPartitions) { | |||
// reuse the sortKey and rowDataWrapper | |||
sortKey.wrap(rowDataWrapper.wrap(row)); | |||
int partition = Arrays.binarySearch(rangeBounds, sortKey, comparator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this refactoring is to make it easier to write unit test with the static util method
@@ -91,7 +91,7 @@ class DataStatisticsCoordinator implements OperatorCoordinator { | |||
this.context = context; | |||
this.schema = schema; | |||
this.sortOrder = sortOrder; | |||
this.comparator = SortOrderComparators.forSchema(schema, sortOrder); | |||
this.comparator = Comparators.forType(SortKeyUtil.sortKeySchema(schema, sortOrder).asStruct()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an important fix. SortKey may have been serialized by subtask and deserialized by the coordinator. we need to use the schema of the transformed sort fields for struct comparator.
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitioner.java
Show resolved
Hide resolved
...19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSketchRangePartitioner.java
Outdated
Show resolved
Hide resolved
new SortKey[] {CHAR_KEYS.get("c"), CHAR_KEYS.get("j"), CHAR_KEYS.get("m")}; | ||
|
||
// <= c | ||
assertThat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code duplicated in the next method? Is there a way to extract the duplicated code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are some duplications. but I think those two tests are different: scale up and scale down. I want to explicitly test each range.
The scale down case has this differently.
// > m
// reassigns out-of-range partitions via mod (% 3 in this case)
assertThat(
SketchUtil.partition(
CHAR_KEYS.get("n"), numPartitions, rangeBounds, SORT_ORDER_COMPARTOR))
.isEqualTo(0);
assertThat(
SketchUtil.partition(
CHAR_KEYS.get("z"), numPartitions, rangeBounds, SORT_ORDER_COMPARTOR))
.isEqualTo(0);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a method like this would help to focus on the actual test and hide the common parameters?
private static assertPartition(SortKey key, int expected) {
assertThat(
SketchUtil.partition(
key, numPartitions, rangeBounds, SORT_ORDER_COMPARTOR))
.isEqualTo(expected);
}
The first 8 assertions seems like the same to me, but it is hard to check/understand as it is "hidden" by the long assertion statements.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is a good idea. Reducing to a single line would help readability.
f1914f2
to
7e58deb
Compare
thanks @pvary for the review. will create the back port PR shortly |
The next PR after this would be putting everything together in
FlinkSink
and document update.