Skip to content

Commit

Permalink
[HUDI-2847] Flink metadata table supports virtual keys (#4096)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Nov 24, 2021
1 parent 323be33 commit 0bb506f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -38,12 +39,15 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
Expand Down Expand Up @@ -374,12 +378,21 @@ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?

protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!config.populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta "
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, Option.empty());
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, Option.empty());
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,32 @@ void testSyncMetadataTable() throws Exception {
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
// write another 2 commits
for (int i = 6; i < 8; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(i + 1L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}

// write another commit to trigger clean
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(10L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "002"));
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.CLEAN_ACTION));

// write another commit
mockWriteWithMetadata();
// write another commit to trigger compaction
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(13L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
}

// -------------------------------------------------------------------------
Expand Down

0 comments on commit 0bb506f

Please sign in to comment.