Skip to content

Commit

Permalink
Using BufferedFsInputStream to wrap FSInputStream for FSDataInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
n3nash authored and vinothchandar committed Apr 18, 2018
1 parent 720e42f commit c3c205f
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public String showLogFileCommits(
if (n instanceof HoodieCorruptBlock) {
try {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
if (instantTime == null) {
throw new Exception("Invalid instant time " + instantTime);
}
} catch (Exception e) {
numCorruptBlocks++;
instantTime = "corrupt_block_" + numCorruptBlocks;
Expand Down Expand Up @@ -172,7 +175,8 @@ public String showLogFileRecords(@CliOption(key = {
.getTimestamp(),
Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED));
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Optional<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() >= limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
// Property to set the max memory for compaction
public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size";
// Property to set the max memory for dfs inputstream buffer size
public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size";
public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 16 * 1024 * 1024; // 16MB


private HoodieMemoryConfig(Properties props) {
Expand Down Expand Up @@ -86,6 +89,12 @@ public Builder withMaxMemoryFractionPerCompaction(long maxMemoryFractionPerCompa
return this;
}

public Builder withMaxDFSStreamBufferSize(int maxStreamBufferSize) {
props.setProperty(MAX_DFS_STREAM_BUFFER_SIZE_PROP,
String.valueOf(maxStreamBufferSize));
return this;
}

/**
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = spark.executor.memory *
* (1 - spark.memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
Expand Down Expand Up @@ -143,6 +152,9 @@ public HoodieMemoryConfig build() {
!props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP),
MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(
getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP))));
setDefaultOnCondition(props,
!props.containsKey(MAX_DFS_STREAM_BUFFER_SIZE_PROP),
MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE));
return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ public Long getMaxMemoryPerCompaction() {
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP));
}

public int getMaxDFSStreamBufferSize() {
return Integer
.valueOf(
props.getProperty(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP));
}

public static class Builder {

private final Properties props = new Properties();
Expand Down Expand Up @@ -469,6 +475,12 @@ public Builder withMetricsConfig(HoodieMetricsConfig metricsConfig) {
return this;
}

public Builder withMemoryConfig(HoodieMemoryConfig memoryConfig) {
props.putAll(memoryConfig.getProps());
isMemoryConfigSet = true;
return this;
}

public Builder withAutoCommit(boolean autoCommit) {
props.setProperty(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private List<WriteStatus> compact(HoodieTable hoodieTable, HoodieWriteConfig con
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs,
metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime,
config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled());
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize());
if (!scanner.iterator().hasNext()) {
return Lists.<WriteStatus>newArrayList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -116,9 +117,14 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
}

public static void createCommitFile(String basePath, String commitTime) throws IOException {
createCommitFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}

public static void createCommitFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try {
Expand All @@ -130,9 +136,14 @@ public static void createCommitFile(String basePath, String commitTime) throws I
}

public static void createSavepointFile(String basePath, String commitTime) throws IOException {
createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}

public static void createSavepointFile(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME
+ "/" + HoodieTimeline.makeSavePointFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import com.google.common.collect.Sets;
import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException;
Expand All @@ -42,23 +42,57 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestHoodieCommitArchiveLog {

private String basePath;
private FileSystem fs;
private Configuration hadoopConf;
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
// We need to use DFS here instead of LocalFs since the FsDataInputStream.getWrappedStream() returns a
// FsDataInputStream instead of a InputStream and thus throws java.lang.ClassCastException:
// org.apache.hadoop.fs.FSDataInputStream cannot be cast to org.apache.hadoop.fs.FSInputStream
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static HdfsTestService hdfsTestService;

@AfterClass
public static void cleanUp() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}

@BeforeClass
public static void setUpDFS() throws IOException {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
}

@Before
public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
hadoopConf = dfs.getConf();
HoodieTestUtils.init(hadoopConf, basePath);
}

Expand All @@ -68,7 +102,7 @@ public void testArchiveEmptyDataset() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").build();
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), cfg.getBasePath(), true));
new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
}
Expand All @@ -81,26 +115,26 @@ public void testArchiveDatasetWithArchival() throws IOException {
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(hadoopConf, basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();

assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());

HoodieTestUtils.createCleanFiles(basePath, "100");
HoodieTestUtils.createInflightCleanFiles(basePath, "101");
HoodieTestUtils.createCleanFiles(basePath, "101");
HoodieTestUtils.createCleanFiles(basePath, "102");
HoodieTestUtils.createCleanFiles(basePath, "103");
HoodieTestUtils.createCleanFiles(basePath, "104");
HoodieTestUtils.createCleanFiles(basePath, "105");
HoodieTestUtils.createInflightCleanFiles(basePath, "106", "107");
HoodieTestUtils.createCleanFiles(basePath, "100", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "101");
HoodieTestUtils.createCleanFiles(basePath, "101", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "102", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "103", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "104", dfs.getConf());
HoodieTestUtils.createCleanFiles(basePath, "105", dfs.getConf());
HoodieTestUtils.createInflightCleanFiles(basePath, dfs.getConf(), "106", "107");

//reload the timeline and get all the commmits before archive
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
Expand All @@ -112,7 +146,7 @@ public void testArchiveDatasetWithArchival() throws IOException {
verifyInflightInstants(metaClient, 3);

HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
new HoodieTableMetaClient(fs.getConf(), basePath, true));
new HoodieTableMetaClient(dfs.getConf(), basePath, true));

assertTrue(archiveLog.archiveIfRequired());

Expand All @@ -121,7 +155,7 @@ public void testArchiveDatasetWithArchival() throws IOException {
originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList()));

//read the file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs,
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs,
new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
HoodieArchivedMetaEntry.getClassSchema());

Expand Down Expand Up @@ -156,12 +190,12 @@ public void testArchiveDatasetWithNoArchival() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());

HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 4 commits and the count should match", 4, timeline.countInstants());
Expand All @@ -177,14 +211,14 @@ public void testArchiveCommitSafety() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());

HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
Expand All @@ -203,15 +237,15 @@ public void testArchiveCommitSavepointNoHole() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createSavepointFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());

HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieMemoryConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
Expand Down Expand Up @@ -97,6 +98,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() {
.withParallelism(2, 2).withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false)
.build()).withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder().withMaxDFSStreamBufferSize(1 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
}
Expand Down

0 comments on commit c3c205f

Please sign in to comment.