Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2716] InLineFS support for S3FS logs #3977

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.hudi.common.fs.inline;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.ValidationUtils;

import java.io.File;

/**
* Utils to parse InLineFileSystem paths.
Expand All @@ -29,46 +32,58 @@
public class InLineFSUtils {
private static final String START_OFFSET_STR = "start_offset";
private static final String LENGTH_STR = "length";
private static final String PATH_SEPARATOR = "/";
private static final String SCHEME_SEPARATOR = ":";
private static final String EQUALS_STR = "=";
private static final String LOCAL_FILESYSTEM_SCHEME = "file";

/**
* Fetch inline file path from outer path.
* Eg
* Input:
* Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
* Get the InlineFS Path for a given schema and its Path.
* <p>
* Examples:
* Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
*
* @param outerPath
* @param origScheme
* @param inLineStartOffset
* @param inLineLength
* @return
* @param outerPath The outer file Path
* @param origScheme The file schema
* @param inLineStartOffset Start offset for the inline file
* @param inLineLength Length for the inline file
* @return InlineFS Path for the requested outer path and schema
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
return new Path(
InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
+ "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
+ PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}

/**
* Inline file format
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Outer File format
* "<outer_file_scheme>://<path_to_outer_file>"
* InlineFS Path format:
* "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
* <p>
* Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
* Output : "sa3://file1"
* Outer File Path format:
* "outer_file_schema://path/to/outer/file"
* <p>
* Example
* Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
manojpec marked this conversation as resolved.
Show resolved Hide resolved
* Output: "s3a://file1"
*
* @param inlinePath inline file system path
* @return
* @param inlineFSPath InLineFS Path to get the outer file Path
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
String scheme = inlinePath.getParent().getName();
Path basePath = inlinePath.getParent().getParent();
return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
final String scheme = inlineFSPath.getParent().getName();
final Path basePath = inlineFSPath.getParent().getParent();
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
"Invalid InLineFSPath: " + inlineFSPath);

final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
final String fullPath = scheme + SCHEME_SEPARATOR
+ (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work for hdfs scheme? does n't that expect a / , just like file (LOCAL_FILESYSTEM_SCHEME)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I am not wrong, pathExceptScheme has the slashes. even I had the same doubt when I reviewed the patch and had to clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar
Right, hdfs/s3a/s3 and all other non-local fs needs :// as scheme separator. Where as the local fs needs :/ or :///. Unit test was added to verify the expected inlinefs forward conversion and backward conversions - https://github.com/apache/hudi/pull/3977/files#diff-5b10f493d7ba18b2f58b6c2fe4e544413d47ceb0d24a980ec07b9b1b14dca167R312

+ pathExceptScheme;
return new Path(fullPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String getScheme() {

@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
Expand All @@ -80,7 +80,7 @@ public boolean exists(Path f) {

@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,23 @@ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int
LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
break;
case CORRUPT_BLOCK:
LOG.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
if (lastBlock != null) {
manojpec marked this conversation as resolved.
Show resolved Hide resolved
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
break;
case CORRUPT_BLOCK:
LOG.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
}
}
}
// At this step the lastBlocks are consumed. We track approximate progress by number of log-files seen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
private final boolean enableInLineReading;
private int bufferSize;

private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
Expand All @@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
this.enableInLineReading = enableInlineReading;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
Expand Down Expand Up @@ -104,7 +106,8 @@ public boolean hasNext() {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader =
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
this.enableInLineReading);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,64 @@ public void testsetWorkingDirectory() throws IOException {
}, "Should have thrown exception");
}

static class TestFSPath {
final Path inputPath;
final Path expectedInLineFSPath;
final Path transformedInputPath;

TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
this.inputPath = inputPath;
this.expectedInLineFSPath = expectedInLineFSPath;
this.transformedInputPath = transformedInputPath;
}
}

@Test
public void testInLineFSPathConversions() {
final List<TestFSPath> expectedInLinePaths = Arrays.asList(
new TestFSPath(
new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : here are examples for hdfs path conversion to inline paths.

new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"),
new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"),
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"),
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"))
);

for (TestFSPath entry : expectedInLinePaths) {
final Path inputPath = entry.inputPath;
final Path expectedInLineFSPath = entry.expectedInLineFSPath;
final Path expectedTransformedInputPath = entry.transformedInputPath;

String scheme = "file";
if (inputPath.toString().contains(":")) {
scheme = inputPath.toString().split(":")[0];
}
final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10);
assertEquals(expectedInLineFSPath, actualInLineFSPath);

final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath);
assertEquals(expectedTransformedInputPath, actualOuterFilePath);
}
}

@Test
public void testExists() throws IOException {
Path inlinePath = getRandomInlinePath();
Expand Down