diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index c7ab49316c4ce..0b579474b9d6c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -107,13 +107,13 @@ public String compactionsAll( try { // This could be a completed compaction. Assume a compaction request file is present but skip if fails compactionPlan = AvroUtils.deserializeCompactionPlan( - activeTimeline.readPlanAsBytes( + activeTimeline.readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); } catch (HoodieIOException ioe) { // SKIP } } else { - compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readPlanAsBytes( + compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); } @@ -156,7 +156,7 @@ public String compactionShow( HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - activeTimeline.readPlanAsBytes( + activeTimeline.readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); List rows = new ArrayList<>(); diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 56a47b73263e6..680d8cd770edd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -220,7 +220,7 @@ public List repairCompaction(String compactionInstant, int paral private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) throws IOException { return AvroUtils.deserializeCompactionPlan( - metaClient.getActiveTimeline().readPlanAsBytes( + metaClient.getActiveTimeline().readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); } diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 0b8df71f286ca..207df4cea6f0e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -953,7 +953,7 @@ public void commitCompaction(String compactionInstantTime, JavaRDD HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - timeline.readPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); + timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); // Merge extra meta-data passed by user with the one already in inflight compaction Option> mergedMetaData = extraMetadata.map(m -> { Map merged = new HashMap<>(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index f135d80e3373f..a7226a7e64ee0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -34,6 +34,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; @@ -279,18 +280,28 @@ public Option getInstantDetails(HoodieInstant instant) { return readDataFromPath(detailPath); } + public Option readCleanerInfoAsBytes(HoodieInstant instant) { + // Cleaner metadata are always stored only in timeline .hoodie + return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); + } + //----------------------------------------------------------------- // BEGIN - COMPACTION RELATED META-DATA MANAGEMENT. //----------------------------------------------------------------- - public Option readPlanAsBytes(HoodieInstant instant) { - Path detailPath = null; - if (metaClient.getTimelineLayoutVersion().isNullVersion()) { - detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - } else { - detailPath = new Path(metaClient.getMetaPath(), instant.getFileName()); + public Option readCompactionPlanAsBytes(HoodieInstant instant) { + try { + // Reading from auxiliary path first. In future release, we will cleanup compaction management + // to only write to timeline and skip auxiliary and this code will be able to handle it. + return readDataFromPath(new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName())); + } catch (HoodieIOException e) { + // This will be removed in future release. See HUDI-546 + if (e.getIOException() instanceof FileNotFoundException) { + return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); + } else { + throw e; + } } - return readDataFromPath(detailPath); } /** @@ -344,14 +355,9 @@ public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflig } private void createFileInAuxiliaryFolder(HoodieInstant instant, Option data) { - if (metaClient.getTimelineLayoutVersion().isNullVersion()) { - /** - * For latest version, since we write immutable files directly in timeline directory, there is no need to write - * additional immutable files in .aux folder - */ - Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - createFileInPath(fullPath, data); - } + // This will be removed in future release. See HUDI-546 + Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + createFileInPath(fullPath, data); } //----------------------------------------------------------------- @@ -369,8 +375,6 @@ public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightIns Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); Preconditions.checkArgument(inflightInstant.isInflight()); HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp()); - // First write metadata to aux folder - createFileInAuxiliaryFolder(commitInstant, data); // Then write to timeline transitionState(inflightInstant, commitInstant, data); return commitInstant; @@ -471,8 +475,6 @@ public void saveToCompactionRequested(HoodieInstant instant, Option cont public void saveToCleanRequested(HoodieInstant instant, Option content) { Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); Preconditions.checkArgument(instant.getState().equals(State.REQUESTED)); - // Write workload to auxiliary folder - createFileInAuxiliaryFolder(instant, content); // Plan is stored in meta path createFileInMetaPath(instant.getFileName(), content, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index c5261efb4738d..84b7ee4c50cad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -72,7 +72,7 @@ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient metaC throws IOException { CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata( - metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get()); + metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get()); return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion()); } @@ -85,7 +85,7 @@ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient metaC */ public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) throws IOException { - return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get(), + return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), HoodieCleanerPlan.class); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 7d5f786a06e4f..44dbc3e7723ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -140,7 +140,7 @@ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaC throws IOException { CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - metaClient.getActiveTimeline().readPlanAsBytes( + metaClient.getActiveTimeline().readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java index 5d5d4f4222d77..1274b3c0501b8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java @@ -18,8 +18,11 @@ package org.apache.hudi.common.table.string; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.HoodieTestUtils; +import org.apache.hudi.common.model.TimelineLayoutVersion; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -31,6 +34,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.IOException; import java.util.stream.Stream; import static org.junit.Assert.assertEquals; @@ -52,7 +56,7 @@ public void setUp() throws Exception { } @Test - public void testLoadingInstantsFromFiles() { + public void testLoadingInstantsFromFiles() throws IOException { HoodieInstant instant1 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "1"); HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "3"); HoodieInstant instant3 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "5"); @@ -96,6 +100,30 @@ public void testLoadingInstantsFromFiles() { timeline.getCommitTimeline().filterCompletedInstants().getInstants()); HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5), timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants()); + + // Backwards compatibility testing for reading compaction plans + HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9"); + byte[] dummy = new byte[5]; + HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new HoodieTableMetaClient(metaClient.getHadoopConf(), + metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(), + Option.of(new TimelineLayoutVersion(TimelineLayoutVersion.VERSION_0)))); + // Old Timeline writes both to aux and timeline folder + oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy)); + // Now use latest timeline version + timeline = timeline.reload(); + // Ensure aux file is present + assertTrue(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()))); + // Read 5 bytes + assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5); + + // Delete auxiliary file to mimic future release where we stop writing to aux + metaClient.getFs().delete(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())); + + // Ensure requested instant is not present in aux + assertFalse(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()))); + + // Now read compaction plan again which should not throw exception + assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5); } @Test