Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co

public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
return HoodieActiveTimeline.getInstantForDate(date);
}

/**
Expand All @@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) {
* b) hours: -1, returns 20200202010000
*/
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
return HoodieActiveTimeline.getInstantForDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String

if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(), durationInMs,
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
private Long parsedToSeconds(String time) {
long timestamp;
try {
timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000;
timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000;
} catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void completeCompaction(
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down Expand Up @@ -392,7 +392,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(clusteringCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public boolean rollbackCommit(String instantTime) {
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);

Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
Long currentTime = new Date().getTime();
Scan scan = new Scan();
scan.addFamily(SYSTEM_COLUMN_FAMILY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -59,7 +60,17 @@
*/
public class HoodieActiveTimeline extends HoodieDefaultTimeline {

public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");
private static final String MILLIS_COMMIT_FORMAT = "yyyyMMddHHmmssSSS";
private static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_COMMIT_FORMAT.length();
private static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat(MILLIS_COMMIT_FORMAT);

private static final String MILLIS_GRANULARITY_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss:SSS";
private static final SimpleDateFormat MILLIS_GRANULARITY_DATE_FORMATTER = new SimpleDateFormat(MILLIS_GRANULARITY_DATE_FORMAT);

// The default number of milliseconds that we add if they are not present
// We prefer the max timestamp as it mimics the current behavior with second granularity
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
private static final String DEFAULT_MILLIS_EXT = "999";

public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION,
Expand All @@ -74,6 +85,43 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
protected HoodieTableMetaClient metaClient;
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));

/**
* Parses the given instant ID to return a date instance.
* @param instant The instant ID
* @return A date
* @throws ParseException If the instant ID is malformed
*/
public static Date parseDateFromInstantTime(String instant) throws ParseException {
// Enables backwards compatibility with non-millisecond granularity instants
if (isMillisecondGranularity(instant)) {
return COMMIT_FORMATTER.parse(instant);
} else {
// Add milliseconds to the instant in order to parse successfully
return COMMIT_FORMATTER.parse(instant + DEFAULT_MILLIS_EXT);
}
}

public static String getInstantForDate(Date instantDate) {
return COMMIT_FORMATTER.format(instantDate);
}

/**
* Creates an instant string given a valid date-time string.
* @param dateString A date-time string in the format yyyy-MM-dd HH:mm:ss[:SSS]
* @return A timeline instant
* @throws ParseException If we cannot parse the date string
*/
public static String getInstantForDateString(String dateString) throws ParseException {
try {
return getInstantForDate(MILLIS_GRANULARITY_DATE_FORMATTER.parse(dateString));
} catch (ParseException e) {
// Attempt to add the milliseconds in order to complete parsing
return getInstantForDate(MILLIS_GRANULARITY_DATE_FORMATTER.parse(
String.format("%s:%s", dateString, DEFAULT_MILLIS_EXT)
));
}
}

/**
* Returns next instant time in the {@link #COMMIT_FORMATTER} format.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
Expand All @@ -90,7 +138,7 @@ public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(System.currentTimeMillis() + milliseconds));
newCommitTime = getInstantForDate(new Date(System.currentTimeMillis() + milliseconds));
} while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
Expand Down Expand Up @@ -193,6 +241,10 @@ public void deleteCompactionRequested(HoodieInstant instant) {
deleteInstantFile(instant);
}

private static boolean isMillisecondGranularity(String instant) {
return instant.length() == MILLIS_INSTANT_ID_LENGTH;
}

private void deleteInstantFileIfExists(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -51,7 +52,6 @@
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -79,14 +79,14 @@ public void setUp() throws IOException {

@Test
public void testMakeDataFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.getInstantForDate(new Date());
String fileName = UUID.randomUUID().toString();
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
}

@Test
public void testMaskFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.getInstantForDate(new Date());
int taskPartitionId = 2;
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testProcessFiles() throws Exception {

@Test
public void testGetCommitTime() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.getInstantForDate(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
Expand All @@ -165,7 +165,7 @@ public void testGetCommitTime() {

@Test
public void testGetFileNameWithoutMeta() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.getInstantForDate(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(fileName, FSUtils.getFileId(fullFileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.hudi.common.fs.FSUtils;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.junit.jupiter.api.Test;

import java.util.Date;
import java.util.UUID;

import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand All @@ -37,7 +37,7 @@ public class TestHoodieWriteStat {

@Test
public void testSetPaths() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.getInstantForDate(new Date());
String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -428,6 +430,27 @@ public void testReplaceActionsTimeline() {
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction());
}

@Test
public void testMillisGranularityInstantDateParsing() throws ParseException {
// Old second granularity instant ID
String secondGranularityInstant = "20210101120101";
Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant);
// New ms granularity instant ID
String specificMsGranularityInstant = secondGranularityInstant + "009";
Date msGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(specificMsGranularityInstant);
assertEquals(999, defaultMsGranularityDate.getTime() % 1000, "Expected the ms part to be 999");
assertEquals(9, msGranularityDate.getTime() % 1000, "Expected the ms part to be 9");

// Ensure that any date math which expects second granularity still works
String laterDateInstant = "20210101120111"; // + 10 seconds from original instant
assertEquals(
10,
HoodieActiveTimeline.parseDateFromInstantTime(laterDateInstant).getTime() / 1000
- HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant).getTime() / 1000,
"Expected the difference between later instant and previous instant to be 10 seconds"
);
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand Down Expand Up @@ -84,7 +85,6 @@
import static org.apache.hudi.common.model.WriteOperationType.CLUSTER;
import static org.apache.hudi.common.model.WriteOperationType.COMPACT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
Expand Down Expand Up @@ -147,7 +147,7 @@ public static String makeNewCommitTime() {
}

public static String makeNewCommitTime(Instant dateTime) {
return COMMIT_FORMATTER.format(Date.from(dateTime));
return HoodieActiveTimeline.getInstantForDate(Date.from(dateTime));
}

public static List<String> makeIncrementalCommitTimes(int num) {
Expand Down
10 changes: 5 additions & 5 deletions hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,12 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
*/
public static String medianInstantTime(String highVal, String lowVal) {
try {
long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime();
long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime();
long high = HoodieActiveTimeline.parseDateFromInstantTime(highVal).getTime();
long low = HoodieActiveTimeline.parseDateFromInstantTime(lowVal).getTime();
ValidationUtils.checkArgument(high > low,
"Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
long median = low + (high - low) / 2;
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median));
return HoodieActiveTimeline.getInstantForDate(new Date(median));
} catch (ParseException e) {
throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e);
}
Expand All @@ -419,8 +419,8 @@ public static String medianInstantTime(String highVal, String lowVal) {
*/
public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
try {
long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime();
long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime();
long newTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(newInstantTime).getTime();
long oldTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(oldInstantTime).getTime();
return (newTimestamp - oldTimestamp) / 1000;
} catch (ParseException e) {
throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ void testInitTableIfNotExists() throws IOException {
void testMedianInstantTime() {
String higher = "20210705125921";
String lower = "20210705125806";
String expectedMedianInstant = "20210705125844499";

String median1 = StreamerUtil.medianInstantTime(higher, lower);
assertThat(median1, is("20210705125843"));
assertThat(median1, is(expectedMedianInstant));
// test symmetry
assertThrows(IllegalArgumentException.class,
() -> StreamerUtil.medianInstantTime(lower, higher),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import java.text.SimpleDateFormat
import scala.collection.immutable.Map

object HoodieSqlUtils extends SparkAdapterSupport {
private val defaultDateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd")

def isHoodieTable(table: CatalogTable): Boolean = {
Expand Down Expand Up @@ -292,13 +291,14 @@ object HoodieSqlUtils extends SparkAdapterSupport {
* 3、yyyyMMddHHmmss
*/
def formatQueryInstant(queryInstant: String): String = {
if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss
HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateTimeFormat.parse(queryInstant))
} else if (queryInstant.length == 14) { // for yyyyMMddHHmmss
HoodieActiveTimeline.COMMIT_FORMATTER.parse(queryInstant) // validate the format
val instantLength = queryInstant.length
if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[:SSS]
HoodieActiveTimeline.getInstantForDateString(queryInstant)
} else if (instantLength == 14 || instantLength == 17) { // for yyyyMMddHHmmss[SSS]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we use the 17/ instant length, from the constant we have up above?

HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
queryInstant
} else if (queryInstant.length == 10) { // for yyyy-MM-dd
HoodieActiveTimeline.COMMIT_FORMATTER.format(defaultDateFormat.parse(queryInstant))
HoodieActiveTimeline.getInstantForDate(defaultDateFormat.parse(queryInstant))
} else {
throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant,"
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ class HoodieStreamSource(
startOffset match {
case INIT_OFFSET => startOffset.commitTime
case HoodieSourceOffset(commitTime) =>
val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
val time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime
// As we consume the data between (start, end], start is not included,
// so we +1s to the start commit time here.
HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
HoodieActiveTimeline.getInstantForDate(new Date(time + 1000))
case _=> throw new IllegalStateException("UnKnow offset type.")
}
}
Expand Down
Loading