Skip to content

Commit

Permalink
1. Small file size handling for inserts into log files. In summary, t…
Browse files Browse the repository at this point in the history
…he total size of the log file is compared with the parquet max file size and if there is scope to add inserts the add it.
  • Loading branch information
n3nash committed Sep 4, 2018
1 parent 2eaa42a commit 704bd86
Show file tree
Hide file tree
Showing 21 changed files with 824 additions and 123 deletions.
Expand Up @@ -82,7 +82,8 @@ public String showCommits(@CliOption(key = {
Collections.reverse(commits);
for (int i = 0; i < commits.size(); i++) {
HoodieInstant commit = commits.get(i);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get());
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[]{commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
Expand Down Expand Up @@ -160,7 +161,8 @@ public String showCommitPartitions(
if (!timeline.containsInstant(commitInstant)) {
return "Commit " + commitTime + " not found in Commits " + timeline;
}
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get());
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
HoodieCommitMetadata.class);
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) {
String path = entry.getKey();
Expand Down Expand Up @@ -221,7 +223,8 @@ public String showCommitFiles(
if (!timeline.containsInstant(commitInstant)) {
return "Commit " + commitTime + " not found in Commits " + timeline;
}
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get());
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
HoodieCommitMetadata.class);
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) {
String path = entry.getKey();
Expand Down
Expand Up @@ -75,7 +75,8 @@ public String writeAmplificationStats(
DecimalFormat df = new DecimalFormat("#.00");
for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) {
String waf = "0";
HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitTime).get());
HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitTime).get(),
HoodieCommitMetadata.class);
if (commit.fetchTotalUpdateRecordsWritten() > 0) {
waf = df.format((float) commit.fetchTotalRecordsWritten() / commit.fetchTotalUpdateRecordsWritten());
}
Expand Down
Expand Up @@ -30,7 +30,8 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co
HoodieTimeline timeline = target.getActiveTimeline().reload().getCommitTimeline().filterCompletedInstants();
for (String commit : commitsToCatchup) {
HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get());
timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(),
HoodieCommitMetadata.class);
totalNew += c.fetchTotalRecordsWritten() - c.fetchTotalUpdateRecordsWritten();
}
return totalNew;
Expand Down
45 changes: 42 additions & 3 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Expand Up @@ -32,6 +32,8 @@
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStat;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
Expand Down Expand Up @@ -501,9 +503,8 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
writeStatus.getPartitionPath(), writeStatus.getStat())).collect();

HoodieCommitMetadata metadata = new HoodieCommitMetadata();
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
}
updateMetadataAndRollingStats(actionType, metadata, stats);


// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
Expand Down Expand Up @@ -1256,4 +1257,42 @@ private Optional<String> forceCompact(Optional<Map<String, String>> extraMetadat
});
return compactionInstantTimeOpt;
}

private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, List<Tuple2<String,
HoodieWriteStat>> stats) {
// TODO : make sure we cannot rollback / archive last commit file
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
// there may be race conditions
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
// 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
// 2. Now, first read the existing rolling stats and merge with the result of current metadata.

// Need to do this on every commit (delta or commit) to support COW and MOR.
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat._2().getFileId(),
stat._2().getNumWrites() - (stat._2().getNumUpdateWrites() - stat._2.getNumDeletes()),
stat._2().getNumUpdateWrites(), stat._2.getNumDeletes(), stat._2().getTotalWriteBytes());
rollingStatMetadata.addRollingStat(stat._1, hoodieRollingStat);
}
// The last rolling stat should be present in the completed timeline
Optional<HoodieInstant> lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant
.get()).get(), HoodieCommitMetadata.class);
rollingStatMetadata = rollingStatMetadata
.merge(HoodieCommitMetadata.fromBytes(commitMetadata.getExtraMetadata()
.get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class));
}
metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString());
} catch (IOException io) {
throw new HoodieCommitException("Unable to save rolling stats");
}
}
}
Expand Up @@ -45,7 +45,11 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde

public InMemoryHashIndex(HoodieWriteConfig config) {
super(config);
recordLocationMap = new ConcurrentHashMap<>();
synchronized (InMemoryHashIndex.class) {
if (recordLocationMap == null) {
recordLocationMap = new ConcurrentHashMap<>();
}
}
}

@Override
Expand Down
Expand Up @@ -75,6 +75,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private long recordsWritten = 0;
// Total number of records deleted during an append
private long recordsDeleted = 0;
// Total number of records updated during an append
private long updatedRecordsWritten = 0;
// Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk
private long averageRecordSize = 0;
private HoodieLogFile currentLogFile;
Expand All @@ -89,6 +91,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private int maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
private Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
// Total number of new records inserted into the delta file
private long insertRecordsWritten = 0;

public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String fileId, Iterator<HoodieRecord<T>> recordItr) {
Expand All @@ -111,6 +115,7 @@ private void init(HoodieRecord record) {
// extract some information from the first record
Optional<FileSlice> fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst();
// Set the base commit time as the current commitTime for new inserts into log files
String baseInstantTime = commitTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseInstantTime();
Expand Down Expand Up @@ -156,6 +161,12 @@ private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils
.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId);
// If currentLocation is present, then this is an update
if (hoodieRecord.getCurrentLocation() != null) {
updatedRecordsWritten++;
} else {
insertRecordsWritten++;
}
recordsWritten++;
} else {
recordsDeleted++;
Expand Down Expand Up @@ -238,6 +249,8 @@ public WriteStatus close() {
writeStatus.getStat().setPrevCommit(commitTime);
writeStatus.getStat().setFileId(this.fileId);
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
writeStatus.getStat().setNumInserts(insertRecordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.uber.hoodie.common.model.ActionType;
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
Expand All @@ -55,6 +56,7 @@
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -273,7 +275,7 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline,
}
case COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
Expand All @@ -294,7 +296,7 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline,
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get());
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(commitMetadataConverter(commitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
Expand All @@ -312,6 +314,8 @@ private com.uber.hoodie.avro.model.HoodieCommitMetadata commitMetadataConverter(
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
com.uber.hoodie.avro.model.HoodieCommitMetadata avroMetaData = mapper
.convertValue(hoodieCommitMetadata, com.uber.hoodie.avro.model.HoodieCommitMetadata.class);
// Do not archive Rolling Stats, cannot set to null since AVRO will throw null pointer
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, StringUtils.EMPTY);
return avroMetaData;
}
}
Expand Up @@ -28,6 +28,7 @@
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
Expand Down Expand Up @@ -643,12 +644,18 @@ class UpsertPartitioner extends Partitioner {
*/
private HashMap<Integer, BucketInfo> bucketInfoMap;

/**
* Rolling stats for files
*/
protected HoodieRollingStatMetadata rollingStatMetadata;
protected long averageRecordSize;

UpsertPartitioner(WorkloadProfile profile) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBuckets = new HashMap<>();
bucketInfoMap = new HashMap<>();
globalStat = profile.getGlobalStat();

rollingStatMetadata = getRollingStats();
assignUpdates(profile);
assignInserts(profile);

Expand Down Expand Up @@ -792,15 +799,15 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
* Obtains the average record size based on records written during last commit. Used for
* estimating how many records pack into one file.
*/
private long averageBytesPerRecord() {
protected long averageBytesPerRecord() {
long avgSize = 0L;
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants();
try {
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get());
.fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get(), HoodieCommitMetadata.class);
avgSize = (long) Math.ceil(
(1.0 * commitMetadata.fetchTotalBytesWritten()) / commitMetadata
.fetchTotalRecordsWritten());
Expand Down Expand Up @@ -852,4 +859,8 @@ public int getPartition(Object key) {
}
}
}

protected HoodieRollingStatMetadata getRollingStats() {
return null;
}
}

0 comments on commit 704bd86

Please sign in to comment.