diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 58828799255d..220353d75ac4 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..9dae43ce5e58 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -114,13 +114,19 @@ private void appendRowsToTable(List allRows) throws Exception { new BoundedTestSource<>( allRows.stream().map(converter::toExternal).toArray(Row[]::new)), ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) .table(table) .tableLoader(tableLoader) .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) + .distributionMode(DistributionMode.NONE) .append(); env.execute("Test Iceberg DataStream"); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 58828799255d..220353d75ac4 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..9dae43ce5e58 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -114,13 +114,19 @@ private void appendRowsToTable(List allRows) throws Exception { new BoundedTestSource<>( allRows.stream().map(converter::toExternal).toArray(Row[]::new)), ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) .table(table) .tableLoader(tableLoader) .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) + .distributionMode(DistributionMode.NONE) .append(); env.execute("Test Iceberg DataStream"); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 58828799255d..220353d75ac4 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -525,13 +525,7 @@ private DataStream distributeDataStream( + "and table is unpartitioned"); return input; } else { - if (BucketPartitionerUtil.hasOneBucketField(partitionSpec)) { - return input.partitionCustom( - new BucketPartitioner(partitionSpec), - new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); - } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); } } else { if (partitionSpec.isUnpartitioned()) { diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java index 29a0898a1b76..9dae43ce5e58 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java @@ -114,13 +114,19 @@ private void appendRowsToTable(List allRows) throws Exception { new BoundedTestSource<>( allRows.stream().map(converter::toExternal).toArray(Row[]::new)), ROW_TYPE_INFO) - .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); + .map(converter::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)) + .partitionCustom( + new BucketPartitioner(table.spec()), + new BucketPartitionKeySelector( + table.spec(), + table.schema(), + FlinkSink.toFlinkRowType(table.schema(), SimpleDataUtil.FLINK_SCHEMA))); FlinkSink.forRowData(dataStream) .table(table) .tableLoader(tableLoader) .writeParallelism(parallelism) - .distributionMode(DistributionMode.HASH) + .distributionMode(DistributionMode.NONE) .append(); env.execute("Test Iceberg DataStream"); @@ -157,26 +163,6 @@ public void testSendRecordsToAllBucketsEvenly(TableSchemaType tableSchemaType) t } } - /** - * Verifies the BucketPartitioner is not used when the PartitionSpec has more than 1 bucket, and - * that it should fallback to input.keyBy - */ - @ParameterizedTest - @EnumSource(value = TableSchemaType.class, names = "TWO_BUCKETS") - public void testMultipleBucketsFallback(TableSchemaType tableSchemaType) throws Exception { - setupEnvironment(tableSchemaType); - List rows = generateTestDataRows(); - - appendRowsToTable(rows); - TableTestStats stats = extractPartitionResults(tableSchemaType); - - Assertions.assertThat(stats.totalRowCount).isEqualTo(rows.size()); - for (int i = 0, j = numBuckets; i < numBuckets; i++, j++) { - // Only 1 file per bucket will be created when falling back to input.keyBy(...) - Assertions.assertThat((int) stats.numFilesPerBucket.get(i)).isEqualTo(1); - } - } - /** * Generating 16 rows to be sent uniformly to all writers (round-robin across 8 writers -> 4 * buckets)