Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5153] Fix the write token name resolution of cdc log file #7128

Merged
merged 1 commit into from Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -279,34 +279,26 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
return createLogWriter(fileSlice, baseCommitTime, "");
return createLogWriter(fileSlice, baseCommitTime, null);
}

protected HoodieLogFormat.Writer createLogWriter(
Option<FileSlice> fileSlice, String baseCommitTime, String fileSuffix) throws IOException {
int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
long logFileSize = 0L;
String logWriteToken = writeToken + fileSuffix;
String rolloverLogWriteToken = writeToken + fileSuffix;
if (fileSlice.isPresent()) {
Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
if (latestLogFileOpt.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOpt.get();
logVersion = latestLogFile.getLogVersion();
logFileSize = latestLogFile.getFileSize();
logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
}
}
Option<FileSlice> fileSlice, String baseCommitTime, String suffix) throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.isPresent()
? fileSlice.get().getLatestLogFile()
: Option.empty();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(logVersion)
.withFileSize(logFileSize)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(rolloverLogWriteToken)
.withLogWriteToken(logWriteToken)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withSuffix(suffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

Expand Down
Expand Up @@ -77,9 +77,10 @@
public class FSUtils {

private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
// Archive log files are of this pattern - .commits_.archive.1_1-0-1
private static final Pattern LOG_FILE_PATTERN =
Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)(-cdc)?))?");
Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(-cdc)?)?");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought we want to use .cdc instead of the unconventional -cdc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. Discussed with @danny0405 , this pr just changes the origin pattern. when this pr is merged, i will change the cdc format in #7042.

private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
Expand Down
Expand Up @@ -141,6 +141,8 @@ class WriterBuilder {
private Path parentPath;
// Log File Write Token
private String logWriteToken;
// optional file suffix
private String suffix;
// Rollover Log file write token
private String rolloverLogWriteToken;

Expand All @@ -164,6 +166,11 @@ public WriterBuilder withLogWriteToken(String logWriteToken) {
return this;
}

public WriterBuilder withSuffix(String suffix) {
this.suffix = suffix;
return this;
}

public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
Expand Down Expand Up @@ -250,6 +257,14 @@ public Writer build() throws IOException {
logWriteToken = rolloverLogWriteToken;
}

if (suffix != null) {
// A little hacky to simplify the file name concatenation:
// patch the write token with an optional suffix
// instead of adding a new extension
logWriteToken = logWriteToken + suffix;
rolloverLogWriteToken = rolloverLogWriteToken + suffix;
}

Path logPath = new Path(parentPath,
FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken));
LOG.info("HoodieLogFile on path " + logPath);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
Expand Down Expand Up @@ -250,6 +251,42 @@ public void tesLogFileName() {
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(rlPath));
}

@Test
public void testCdcLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = UUID.randomUUID().toString();
String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1") + HoodieCDCUtils.CDC_LOGFILE_SUFFIX;
Path path = new Path(new Path(partitionPath), logFile);

assertTrue(FSUtils.isLogFile(path));
assertEquals("log", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

@Test
public void testArchiveLogFileName() {
String partitionPath = "2022/11/04/";
String fileName = "commits";
String logFile = FSUtils.makeLogFileName(fileName, ".archive", "", 2, "1-0-1");
Path path = new Path(new Path(partitionPath), logFile);

assertFalse(FSUtils.isLogFile(path));
assertEquals("archive", FSUtils.getFileExtensionFromLog(path));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(path));
assertEquals("", FSUtils.getBaseCommitTimeFromLogPath(path));
assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(path));
assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(path));
assertEquals(0, FSUtils.getStageIdFromLogPath(path));
assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(path));
assertEquals(2, FSUtils.getFileVersionFromLog(path));
}

/**
* Test Log File Comparisons when log files do not have write tokens.
*/
Expand Down