diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 7e25559bb177..a2e4962be9b5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -38,12 +39,15 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.CleanActionExecutor; @@ -374,12 +378,21 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { + Option keyGeneratorOpt = Option.empty(); + if (!config.populateMetaFields()) { + try { + keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()))); + } catch (IOException e) { + throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta " + + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e); + } + } if (requireSortedRecords()) { return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, Option.empty()); + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); } else { return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier, Option.empty()); + dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index be2e334a4c96..d86602ea95c5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -222,6 +222,32 @@ void testSyncMetadataTable() throws Exception { assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L)); assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001")); assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); + // write another 2 commits + for (int i = 6; i < 8; i++) { + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(i + 1L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); + } + + // write another commit to trigger clean + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(10L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "002")); + assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.CLEAN_ACTION)); + + // write another commit + mockWriteWithMetadata(); + // write another commit to trigger compaction + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(13L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001")); + assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); } // -------------------------------------------------------------------------