Skip to content

Commit

Permalink
Flink: Reverting the default custom partitioner for bucket column (#8848
Browse files Browse the repository at this point in the history
) (#8858)
  • Loading branch information
nastra committed Oct 17, 2023
1 parent 8aa948f commit 4f999f1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 87 deletions.
Expand Up @@ -525,13 +525,7 @@ private DataStream<RowData> distributeDataStream(
+ "and table is unpartitioned");
return input;
} else {
if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) {
return input.partitionCustom(
new BucketPartitioner(partitionSpec),
new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType));
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
} else {
if (partitionSpec.isUnpartitioned()) {
Expand Down
Expand Up @@ -114,13 +114,19 @@ private void appendRowsToTable(List<RowData> allRows) throws Exception {
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));

FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.HASH)
.distributionMode(DistributionMode.NONE)
.append();

env.execute("Test Iceberg DataStream");
Expand Down Expand Up @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t
}
}

/**
* Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and
* that it should fallback to input.keyBy
*/
@ParameterizedTest
@EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS")
public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception {
setupEnvironment(tableSchemaType);
List<RowData> rows = generateTestDataRows();

appendRowsToTable(rows);
TableTestStats stats = extractPartitionResults(tableSchemaType);

Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
// Only 1 file per bucket will be created when falling back to input.keyBy(...)
Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1);
}
}

/**
* Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4
* buckets)
Expand Down
Expand Up @@ -525,13 +525,7 @@ private DataStream<RowData> distributeDataStream(
+ "and table is unpartitioned");
return input;
} else {
if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) {
return input.partitionCustom(
new BucketPartitioner(partitionSpec),
new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType));
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
} else {
if (partitionSpec.isUnpartitioned()) {
Expand Down
Expand Up @@ -114,13 +114,19 @@ private void appendRowsToTable(List<RowData> allRows) throws Exception {
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));

FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.HASH)
.distributionMode(DistributionMode.NONE)
.append();

env.execute("Test Iceberg DataStream");
Expand Down Expand Up @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t
}
}

/**
* Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and
* that it should fallback to input.keyBy
*/
@ParameterizedTest
@EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS")
public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception {
setupEnvironment(tableSchemaType);
List<RowData> rows = generateTestDataRows();

appendRowsToTable(rows);
TableTestStats stats = extractPartitionResults(tableSchemaType);

Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
// Only 1 file per bucket will be created when falling back to input.keyBy(...)
Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1);
}
}

/**
* Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4
* buckets)
Expand Down
Expand Up @@ -525,13 +525,7 @@ private DataStream<RowData> distributeDataStream(
+ "and table is unpartitioned");
return input;
} else {
if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) {
return input.partitionCustom(
new BucketPartitioner(partitionSpec),
new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType));
} else {
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
} else {
if (partitionSpec.isUnpartitioned()) {
Expand Down
Expand Up @@ -114,13 +114,19 @@ private void appendRowsToTable(List<RowData> allRows) throws Exception {
new BoundedTestSource<>(
allRows.stream().map(converter::toExternal).toArray(Row[]::new)),
ROW_TYPE_INFO)
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
.map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE))
.partitionCustom(
new BucketPartitioner(table.spec()),
new BucketPartitionKeySelector(
table.spec(),
table.schema(),
FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA)));

FlinkSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.HASH)
.distributionMode(DistributionMode.NONE)
.append();

env.execute("Test Iceberg DataStream");
Expand Down Expand Up @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t
}
}

/**
* Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and
* that it should fallback to input.keyBy
*/
@ParameterizedTest
@EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS")
public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception {
setupEnvironment(tableSchemaType);
List<RowData> rows = generateTestDataRows();

appendRowsToTable(rows);
TableTestStats stats = extractPartitionResults(tableSchemaType);

Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size());
for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) {
// Only 1 file per bucket will be created when falling back to input.keyBy(...)
Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1);
}
}

/**
* Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4
* buckets)
Expand Down

0 comments on commit 4f999f1

Please sign in to comment.