Skip to content

Commit

Permalink
[HUDI-430] Adding InlineFileSystem to support embedding any file form…
Browse files Browse the repository at this point in the history
…at as an InlineFile (#1176)

* Adding InlineFileSystem to support embedding any file format (parquet, hfile, etc). Supports reading the embedded file using respective readers.
  • Loading branch information
nsivabalan committed Mar 28, 2020
1 parent 04449f3 commit ac73bdc
Show file tree
Hide file tree
Showing 12 changed files with 1,460 additions and 7 deletions.
1 change: 1 addition & 0 deletions hudi-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<!-- Hoodie - Tests -->
Expand Down
15 changes: 15 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,20 @@
</exclusion>
</exclusions>
</dependency>

<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.inline.fs;

import org.apache.hadoop.fs.Path;

/**
* Utils to parse InLineFileSystem paths.
* Inline FS format:
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Eg: "inlinefs://<path_to_outer_file>/s3a/?start_offset=20&length=40"
*/
public class InLineFSUtils {
private static final String START_OFFSET_STR = "start_offset";
private static final String LENGTH_STR = "length";
private static final String EQUALS_STR = "=";

/**
* Fetch inline file path from outer path.
* Eg
* Input:
* Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
*
* @param outerPath
* @param origScheme
* @param inLineStartOffset
* @param inLineLength
* @return
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
return new Path(
InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
+ "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}

/**
* Inline file format
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Outer File format
* "<outer_file_scheme>://<path_to_outer_file>"
* <p>
* Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
* Output : "sa3://file1"
*
* @param inlinePath inline file system path
* @return
*/
public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
String scheme = inlinePath.getParent().getName();
Path basePath = inlinePath.getParent().getParent();
return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
}

/**
* Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
* output: 20
*
* @param inlinePath
* @return
*/
public static int startOffset(Path inlinePath) {
String[] slices = inlinePath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 3]);
}

/**
* Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40".
* Output: 40
*
* @param inlinePath
* @return
*/
public static int length(Path inlinePath) {
String[] slices = inlinePath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 1]);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.inline.fs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import java.io.IOException;
import java.net.URI;

/**
* Enables reading any inline file at a given offset and length. This {@link FileSystem} is used only in read path and does not support
* any write apis.
* <p>
* - Reading an inlined file at a given offset, length, read it out as if it were an independent file of that length
* - Inlined path is of the form "inlinefs:///path/to/outer/file/<outer_file_scheme>/?start_offset=<start_offset>&length=<length>
* <p>
* TODO: The reader/writer may try to use relative paths based on the inlinepath and it may not work. Need to handle
* this gracefully eg. the parquet summary metadata reading. TODO: If this shows promise, also support directly writing
* the inlined file to the underneath file without buffer
*/
public class InLineFileSystem extends FileSystem {

public static final String SCHEME = "inlinefs";
private Configuration conf = null;

@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
this.conf = conf;
}

@Override
public URI getUri() {
return URI.create(getScheme());
}

public String getScheme() {
return SCHEME;
}

@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
}

@Override
public boolean exists(Path f) {
try {
return getFileStatus(f) != null;
} catch (Exception e) {
return false;
}
}

@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
status.getModificationTime(), status.getAccessTime(), status.getPermission(), status.getOwner(),
status.getGroup(), inlinePath);
return toReturn;
}

@Override
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l,
Progressable progressable) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}

@Override
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}

@Override
public boolean rename(Path path, Path path1) throws IOException {
throw new UnsupportedOperationException("Can't rename files");
}

@Override
public boolean delete(Path path, boolean b) throws IOException {
throw new UnsupportedOperationException("Can't delete files");
}

@Override
public FileStatus[] listStatus(Path inlinePath) throws IOException {
return new FileStatus[] {getFileStatus(inlinePath)};
}

@Override
public void setWorkingDirectory(Path path) {
throw new UnsupportedOperationException("Can't set working directory");
}

@Override
public Path getWorkingDirectory() {
throw new UnsupportedOperationException("Can't get working directory");
}

@Override
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
throw new UnsupportedOperationException("Can't set working directory");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.inline.fs;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.io.ByteBufferPool;

import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;

/**
* Inline {@link FSDataInputStream}. A startOffset that is passed in is assumed to be the start of the InputStream.
* All operations are handled having the {@code startOffset} as starting point.
*/
public class InLineFsDataInputStream extends FSDataInputStream {

private final int startOffset;
private final FSDataInputStream outerStream;
private final int length;

public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) {
super(outerStream.getWrappedStream());
this.startOffset = startOffset;
this.outerStream = outerStream;
this.length = length;
}

@Override
public void seek(long desired) throws IOException {
outerStream.seek(startOffset + desired);
}

@Override
public long getPos() throws IOException {
return outerStream.getPos() - startOffset;
}

@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return outerStream.read(startOffset + position, buffer, offset, length);
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
outerStream.readFully(startOffset + position, buffer, offset, length);
}

@Override
public void readFully(long position, byte[] buffer)
throws IOException {
outerStream.readFully(startOffset + position, buffer, 0, buffer.length);
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
boolean toReturn = outerStream.seekToNewSource(startOffset + targetPos);
return toReturn;
}

@Override
public int read(ByteBuffer buf) throws IOException {
return outerStream.read(buf);
}

@Override
public FileDescriptor getFileDescriptor() throws IOException {
return outerStream.getFileDescriptor();
}

@Override
public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException {
outerStream.setReadahead(readahead);
}

@Override
public void setDropBehind(Boolean dropBehind) throws IOException, UnsupportedOperationException {
outerStream.setDropBehind(dropBehind);
}

@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
return outerStream.read(bufferPool, maxLength, opts);
}

@Override
public void releaseBuffer(ByteBuffer buffer) {
outerStream.releaseBuffer(buffer);
}

@Override
public void unbuffer() {
outerStream.unbuffer();
}
}

0 comments on commit ac73bdc

Please sign in to comment.