Skip to content

Commit

Permalink
[HUDI-2744] Fix parsing of metadadata table compaction timestamp when…
Browse files Browse the repository at this point in the history
… metrics are enabled (#3976)
  • Loading branch information
nsivabalan committed Nov 15, 2021
1 parent 3c43197 commit 53d2d6a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
@ParameterizedTest
@MethodSource("bootstrapAndTableOperationTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan);
init(tableType, true, enableFullScan, false);
doWriteInsertAndUpsert(testTable);

// trigger an upsert
Expand Down Expand Up @@ -462,27 +462,43 @@ public void testSync(HoodieTableType tableType) throws Exception {
validateMetadata(testTable, emptyList(), true);
}

/**
* Fetches next commit time in seconds from current one.
*
* @param curCommitTime current commit time.
* @return the next valid commit time.
*/
private Long getNextCommitTime(long curCommitTime) {
if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and hence
return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
} else {
return curCommitTime + 1;
}
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
init(tableType);
init(tableType, true, true, true);
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
for (int i = 1; i < 25; i += 7) {
String commitTime1 = ((i > 9) ? ("00000") : ("000000")) + i;
String commitTime2 = ((i > 9) ? ("00000") : ("000000")) + (i + 1);
String commitTime3 = ((i > 9) ? ("00000") : ("000000")) + (i + 2);
String commitTime4 = ((i > 9) ? ("00000") : ("000000")) + (i + 3);
String commitTime5 = ((i > 9) ? ("00000") : ("000000")) + (i + 4);
String commitTime6 = ((i > 9) ? ("00000") : ("000000")) + (i + 5);
String commitTime7 = ((i > 9) ? ("00000") : ("000000")) + (i + 6);
doWriteOperation(testTable, commitTime1, INSERT);
doWriteOperation(testTable, commitTime2);
doClean(testTable, commitTime3, Arrays.asList(commitTime1));
doWriteOperation(testTable, commitTime4);
long commitTime1 = getNextCommitTime(baseCommitTime);
long commitTime2 = getNextCommitTime(commitTime1);
long commitTime3 = getNextCommitTime(commitTime2);
long commitTime4 = getNextCommitTime(commitTime3);
long commitTime5 = getNextCommitTime(commitTime4);
long commitTime6 = getNextCommitTime(commitTime5);
long commitTime7 = getNextCommitTime(commitTime6);
baseCommitTime = commitTime7;
doWriteOperation(testTable, Long.toString(commitTime1), INSERT);
doWriteOperation(testTable, Long.toString(commitTime2));
doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1)));
doWriteOperation(testTable, Long.toString(commitTime4));
if (tableType == MERGE_ON_READ) {
doCompaction(testTable, commitTime5);
doCompaction(testTable, Long.toString(commitTime5));
}
doWriteOperation(testTable, commitTime6);
doRollback(testTable, commitTime6, commitTime7);
doWriteOperation(testTable, Long.toString(commitTime6));
doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7));
}
validateMetadata(testTable, emptyList(), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ public void init(HoodieTableType tableType) throws IOException {
}

public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
init(tableType, enableMetadataTable, true);
init(tableType, enableMetadataTable, true, false);
}

public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan) throws IOException {
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
Expand All @@ -87,8 +87,7 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false,
enableFullScan).build();
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.table.timeline;

import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -33,6 +34,7 @@
public class HoodieInstantTimeGenerator {
// Format of the timestamp used for an Instant
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length();
// Formatter to generate Instant timestamps
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
// The last Instant timestamp generated
Expand All @@ -56,7 +58,7 @@ public static String createNewInstantTime(long milliseconds) {
});
}

public static Date parseInstantTime(String timestamp) {
public static Date parseInstantTime(String timestamp) throws ParseException {
try {
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
Expand All @@ -65,7 +67,11 @@ public static Date parseInstantTime(String timestamp) {
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}

// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity
if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) {
LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
}
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFrom
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
// Retrieve record from base file
if (baseFileReader != null) {
HoodieTimer readTimer = new HoodieTimer().startTimer();
HoodieTimer readTimer = new HoodieTimer();
for (String key : keys) {
readTimer.startTimer();
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = metadataTableConfig.populateMetaFields()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -472,6 +473,19 @@ public void testCreateNewInstantTime() throws Exception {
}
}

@Test
public void testMetadataCompactionInstantDateParsing() throws ParseException {
// default second granularity instant ID
String secondGranularityInstant = "20210101120101";
Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant);
// metadata table compaction/cleaning : ms granularity instant ID
String compactionInstant = secondGranularityInstant + "001";
Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant);
assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0");
assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant));
assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant));
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand Down

0 comments on commit 53d2d6a

Please sign in to comment.