Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reliable log file management for Merge on read, which is fa… #64

Merged
merged 1 commit into from
Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions hoodie-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,45 @@
<artifactId>hoodie-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.uber.hoodie</groupId>
<artifactId>hoodie-common</artifactId>
Expand Down
25 changes: 25 additions & 0 deletions hoodie-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand All @@ -99,5 +114,15 @@
<artifactId>kryo</artifactId>
<scope>test</scope>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.avro</groupId>-->
<!--<artifactId>avro-mapred</artifactId>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.mortbay.jetty</groupId>-->
<!--<artifactId>*</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.table.log;

import com.uber.hoodie.common.util.FSUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
* Configuration for a HoodieLog
*/
public class HoodieLogAppendConfig {
private final static Logger log = LogManager.getLogger(HoodieLogAppendConfig.class);
private static final long DEFAULT_SIZE_THRESHOLD = 32 * 1024 * 1024L;

private final int bufferSize;
private final short replication;
private final long blockSize;
private final HoodieLogFile logFile;
private boolean isAutoFlush;
private final Schema schema;
private final FileSystem fs;
private final long sizeThreshold;

private HoodieLogAppendConfig(FileSystem fs, HoodieLogFile logFile, Schema schema, Integer bufferSize,
Short replication, Long blockSize, boolean isAutoFlush, Long sizeThreshold) {
this.fs = fs;
this.logFile = logFile;
this.schema = schema;
this.bufferSize = bufferSize;
this.replication = replication;
this.blockSize = blockSize;
this.isAutoFlush = isAutoFlush;
this.sizeThreshold = sizeThreshold;
}

public int getBufferSize() {
return bufferSize;
}

public short getReplication() {
return replication;
}

public long getBlockSize() {
return blockSize;
}

public Schema getSchema() {
return schema;
}

public FileSystem getFs() {
return fs;
}

public HoodieLogFile getLogFile() {
return logFile;
}

public long getSizeThreshold() {
return sizeThreshold;
}

public boolean isAutoFlush() {
return isAutoFlush;
}

public static Builder newBuilder() {
return new Builder();
}

public HoodieLogAppendConfig withLogFile(HoodieLogFile newFile) {
return new HoodieLogAppendConfig(fs, newFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);
}

public static class Builder {
// Auto-flush. if set to true - then after every append, the avro block will be flushed
private boolean isAutoFlush = true;
// Buffer size in the Avro writer
private Integer bufferSize;
// Replication for the log file
private Short replication;
// Blocksize for the avro log file (useful if auto-flush is set to false)
private Long blockSize;
// Schema for the log file
private Schema schema;
// FileSystem
private FileSystem fs;
// Size threshold for the log file. Useful when used with a rolling log appender
private Long sizeThreshold;
// Log File extension. Could be .avro.delta or .avro.commits etc
private String logFileExtension;
// File ID
private String fileId;
// version number for this log file. If not specified, then the current version will be computed
private Integer fileVersion;
// Partition path for the log file
private Path partitionPath;

public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public Builder withReplication(short replication) {
this.replication = replication;
return this;
}

public Builder withBlockSize(long blockSize) {
this.blockSize = blockSize;
return this;
}

public Builder withSchema(Schema schema) {
this.schema = schema;
return this;
}

public Builder withFs(FileSystem fs) {
this.fs = fs;
return this;
}

public Builder withAutoFlush(boolean autoFlush) {
this.isAutoFlush = autoFlush;
return this;
}

public Builder withSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
return this;
}

public Builder withLogFileExtension(String logFileExtension) {
this.logFileExtension = logFileExtension;
return this;
}

public Builder withFileId(String fileId) {
this.fileId = fileId;
return this;
}

public Builder withFileVersion(int version) {
this.fileVersion = version;
return this;
}

public Builder onPartitionPath(Path path) {
this.partitionPath = path;
return this;
}

public HoodieLogAppendConfig build() throws IOException {
log.info("Building HoodieLogAppendConfig");
if (schema == null) {
throw new IllegalArgumentException("Schema for log is not specified");
}
if (fs == null) {
fs = FSUtils.getFs();
}

if (fileId == null) {
throw new IllegalArgumentException("FileID is not specified");
}
if (logFileExtension == null) {
throw new IllegalArgumentException("File extension is not specified");
}
if (partitionPath == null) {
throw new IllegalArgumentException("Partition path is not specified");
}
if (fileVersion == null) {
log.info("Computing the next log version for " + fileId + " in " + partitionPath);
fileVersion =
FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension);
log.info(
"Computed the next log version for " + fileId + " in " + partitionPath + " as "
+ fileVersion);
}

Path logPath = new Path(partitionPath,
FSUtils.makeLogFileName(fileId, logFileExtension, fileVersion));
log.info("LogConfig created on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);

if (bufferSize == null) {
bufferSize = FSUtils.getDefaultBufferSize(fs);
}
if (replication == null) {
replication = FSUtils.getDefaultReplication(fs, partitionPath);
}
if (blockSize == null) {
blockSize = FSUtils.getDefaultBlockSize(fs, partitionPath);
}
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}

return new HoodieLogAppendConfig(fs, logFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);

}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.table.log;

import com.uber.hoodie.common.table.log.avro.AvroLogAppender;
import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender;

import java.io.IOException;
import java.util.List;

/**
* Interface for implementations supporting appending data to a log file
*
* @param <R>
* @see AvroLogAppender
* @see RollingAvroLogAppender
*/
public interface HoodieLogAppender<R> {
/**
* Append a stream of records in a batch (this will be written as a block/unit to the underlying log)
*
* @param records
* @throws IOException
*/
void append(List<R> records) throws IOException, InterruptedException;

/**
* Syncs the log manually if auto-flush is not set in HoodieLogAppendConfig. If auto-flush is set
* Then the LogAppender will automatically flush after the append call.
*
* @throws IOException
*/
void sync() throws IOException;

/**
* Close the appended and release any resources holding on to
*
* @throws IOException
*/
void close() throws IOException;

/**
* Gets the current offset in the log. This is usually used to mark the start of the block in
* meta-data and passed to the HoodieLogReader
*
* @return
* @throws IOException
*/
long getCurrentSize() throws IOException;
}