Skip to content

Commit

Permalink
General enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
artem0 authored and vinothchandar committed Dec 18, 2018
1 parent 30c5f8b commit 6946dd7
Show file tree
Hide file tree
Showing 46 changed files with 400 additions and 371 deletions.
Expand Up @@ -143,9 +143,8 @@ public String fileSizeStats(
Snapshot s = globalHistogram.getSnapshot();
rows.add(printFileSizeHistogram("ALL", s));

Function<Object, String> converterFunction = entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
};
Function<Object, String> converterFunction = entry ->
NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Min", converterFunction);
fieldNameToConverterMap.put("10th", converterFunction);
Expand Down
42 changes: 19 additions & 23 deletions hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala
Expand Up @@ -77,7 +77,7 @@ class DedupeSparkJob(basePath: String,

val metadata = new HoodieTableMetaClient(fs.getConf, basePath)

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}"))
val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
val latestFiles: java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
val filteredStatuses = latestFiles.map(f => f.getPath)
Expand All @@ -92,17 +92,16 @@ class DedupeSparkJob(basePath: String,
val dupeDataSql =
s"""
SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time`
FROM ${tmpTableName} h
JOIN ${dedupeTblName} d
FROM $tmpTableName h
JOIN $dedupeTblName d
ON h.`_hoodie_record_key` = d.dupe_key
"""
val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0))
val fileToDeleteKeyMap = new HashMap[String, HashSet[String]]()

// Mark all files except the one with latest commits for deletion
dupeMap.foreach(rt => {
val key = rt._1
val rows = rt._2
val (key, rows) = rt
var maxCommit = -1L

rows.foreach(r => {
Expand All @@ -129,7 +128,7 @@ class DedupeSparkJob(basePath: String,
def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetaClient(fs.getConf, basePath)

val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}"))
val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)

val latestFiles: java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]())
Expand All @@ -138,38 +137,36 @@ class DedupeSparkJob(basePath: String,
val dupeFixPlan = planDuplicateFix()

// 1. Copy all latest files into the temp fix path
fileNameToPathMap.foreach { case (fileName, filePath) => {
fileNameToPathMap.foreach { case (fileName, filePath) =>
val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else ""
val dstPath = new Path(s"${repairOutputPath}/${filePath.getName}${badSuffix}")
LOG.info(s"Copying from ${filePath} to ${dstPath}")
val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix")
LOG.info(s"Copying from $filePath to $dstPath")
FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf)
}
}

// 2. Remove duplicates from the bad files
dupeFixPlan.foreach { case (fileName, keysToSkip) => {
dupeFixPlan.foreach { case (fileName, keysToSkip) =>
val commitTime = FSUtils.getCommitTime(fileNameToPathMap(fileName).getName)
val badFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}.bad")
val newFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}")
val badFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}.bad")
val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}")
LOG.info(" Skipping and writing new file for : " + fileName)
SparkHelpers.skipKeysAndWriteNewFile(commitTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName))
fs.delete(badFilePath, false)
}
}

// 3. Check that there are no duplicates anymore.
val df = sqlContext.read.parquet(s"${repairOutputPath}/*.parquet")
val df = sqlContext.read.parquet(s"$repairOutputPath/*.parquet")
df.registerTempTable("fixedTbl")
val dupeKeyDF = getDupeKeyDF("fixedTbl")
val dupeCnt = dupeKeyDF.count();
val dupeCnt = dupeKeyDF.count()
if (dupeCnt != 0) {
dupeKeyDF.show()
throw new HoodieException("Still found some duplicates!!.. Inspect output")
}

// 4. Additionally ensure no record keys are left behind.
val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => t._2.toString).toList)
val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"${repairOutputPath}/${t._2.getName}").toList)
val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"$repairOutputPath/${t._2.getName}").toList)
val missedRecordKeysDF = sourceDF.except(fixedDF)
val missedCnt = missedRecordKeysDF.count()
if (missedCnt != 0) {
Expand All @@ -180,17 +177,16 @@ class DedupeSparkJob(basePath: String,

println("No duplicates found & counts are in check!!!! ")
// 4. Prepare to copy the fixed files back.
fileNameToPathMap.foreach { case (fileName, filePath) => {
val srcPath = new Path(s"${repairOutputPath}/${filePath.getName}")
val dstPath = new Path(s"${basePath}/${duplicatedPartitionPath}/${filePath.getName}")
fileNameToPathMap.foreach { case (_, filePath) =>
val srcPath = new Path(s"$repairOutputPath/${filePath.getName}")
val dstPath = new Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}")
if (dryRun) {
LOG.info(s"[JUST KIDDING!!!] Copying from ${srcPath} to ${dstPath}")
LOG.info(s"[JUST KIDDING!!!] Copying from $srcPath to $dstPath")
} else {
// for real
LOG.info(s"[FOR REAL!!!] Copying from ${srcPath} to ${dstPath}")
LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath")
FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf)
}
}
}
}
}
14 changes: 5 additions & 9 deletions hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala
Expand Up @@ -94,12 +94,9 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
* @return
*/
def isFileContainsKey(rowKey: String, file: String): Boolean = {
println(s"Checking ${file} for key ${rowKey}")
val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '${rowKey}'")
if (ff.count() > 0)
return true
else
return false
println(s"Checking $file for key $rowKey")
val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '$rowKey'")
if (ff.count() > 0) true else false
}

/**
Expand All @@ -109,7 +106,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
* @param sqlContext
*/
def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) = {
println(getRowKeyDF(file).collect().size)
println(getRowKeyDF(file).collect().length)
}


Expand All @@ -128,8 +125,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) {
val bf = new com.uber.hoodie.common.BloomFilter(bfStr)
val foundCount = sqlContext.parquetFile(file)
.select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`")
.collect().
filter(r => !bf.mightContain(r.getString(0))).size
.collect().count(r => !bf.mightContain(r.getString(0)))
val totalCount = getKeyCount(file, sqlContext)
s"totalCount: ${totalCount}, foundCount: ${foundCount}"
totalCount == foundCount
Expand Down
Expand Up @@ -238,8 +238,8 @@ protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAl
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get();
final int maxVersion =
op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf)))
.reduce((x, y) -> x > y ? x : y).map(x -> x).orElse(0);
op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf)))
.reduce((x, y) -> x > y ? x : y).orElse(0);
List<HoodieLogFile> logFilesToBeMoved =
merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> {
Expand Down Expand Up @@ -322,8 +322,7 @@ private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient met
Set<HoodieLogFile> diff =
logFilesInFileSlice.stream().filter(lf -> !logFilesInCompactionOp.contains(lf))
.collect(Collectors.toSet());
Preconditions.checkArgument(diff.stream()
.filter(lf -> !lf.getBaseCommitTime().equals(compactionInstant)).count() == 0,
Preconditions.checkArgument(diff.stream().allMatch(lf -> lf.getBaseCommitTime().equals(compactionInstant)),
"There are some log-files which are neither specified in compaction plan "
+ "nor present after compaction request instant. Some of these :" + diff);
} else {
Expand Down Expand Up @@ -438,14 +437,14 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(operation.getPartitionPath(), lastInstant.getTimestamp())
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
List<HoodieLogFile> logFilesToRepair =
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
.collect(Collectors.toList());
logFilesToRepair.sort(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed());
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
.sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed())
.collect(Collectors.toList());
FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
int maxUsedVersion =
fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getLogVersion())
fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion)
.orElse(HoodieLogFile.LOGFILE_BASE_VERSION - 1);
String logExtn = fileSliceForCompaction.getLogFiles().findFirst().map(lf -> "." + lf.getFileExtension())
.orElse(HoodieLogFile.DELTA_EXTENSION);
Expand Down
20 changes: 10 additions & 10 deletions hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
Expand Up @@ -440,7 +440,7 @@ private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> prep
} else {
return hoodieTable.handleInsertPartition(commitTime, partition, recordItr, partitioner);
}
}, true).flatMap(writeStatuses -> writeStatuses.iterator());
}, true).flatMap(List::iterator);

return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime);
}
Expand Down Expand Up @@ -469,7 +469,7 @@ private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecor
Partitioner partitioner) {
return dedupedRecords.mapToPair(record -> new Tuple2<>(
new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record))
.partitionBy(partitioner).map(tuple -> tuple._2());
.partitionBy(partitioner).map(Tuple2::_2);
}

/**
Expand Down Expand Up @@ -499,7 +499,7 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();

List<HoodieWriteStat> stats = writeStatuses.map(status -> status.getStat()).collect();
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();

updateMetadataAndRollingStats(actionType, metadata, stats);

Expand All @@ -522,7 +522,7 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,

// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v));
extraMetadata.get().forEach(metadata::addMetadata);
}

try {
Expand Down Expand Up @@ -806,7 +806,7 @@ private void rollback(List<String> commits) {
});

List<String> pendingCompactionToRollback =
commits.stream().filter(c -> pendingCompactions.contains(c)).collect(Collectors.toList());
commits.stream().filter(pendingCompactions::contains).collect(Collectors.toList());
List<String> commitsToRollback =
commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList());

Expand Down Expand Up @@ -837,12 +837,12 @@ private void rollback(List<String> commits) {
}

// Remove interleaving pending compactions before rolling back commits
pendingCompactionToRollback.stream().forEach(this::deletePendingCompaction);
pendingCompactionToRollback.forEach(this::deletePendingCompaction);

List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback);

// cleanup index entries
commitsToRollback.stream().forEach(s -> {
commitsToRollback.forEach(s -> {
if (!index.rollbackCommit(s)) {
throw new HoodieRollbackException("Rollback index changes failed, for time :" + s);
}
Expand Down Expand Up @@ -1076,7 +1076,7 @@ JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records,
// everything
// so pick it from one of the records.
return new HoodieRecord<T>(rec1.getKey(), reducedData);
}, parallelism).map(recordTuple -> recordTuple._2());
}, parallelism).map(Tuple2::_2);
}

/**
Expand All @@ -1099,7 +1099,7 @@ private HoodieTable getTableAndInitCtx() {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
if (table.getMetaClient().getCommitActionType() == HoodieTimeline.COMMIT_ACTION) {
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeContext = metrics.getCommitCtx();
} else {
writeContext = metrics.getDeltaCommitCtx();
Expand Down Expand Up @@ -1214,7 +1214,7 @@ private void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTab

private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatuses,
HoodieTableMetaClient metaClient, String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat())
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat)
.collect();

HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
Expand Down
Expand Up @@ -117,10 +117,9 @@ private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) {

if (SparkEnv.get() != null) {
// 1 GB is the default conf used by Spark, look at SparkContext.scala
long executorMemoryInBytes = Long.valueOf(
Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP,
long executorMemoryInBytes = Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP,
DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024
* 1024L);
* 1024L;
// 0.6 is the default value used by Spark,
// look at {@link
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
Expand Down
Expand Up @@ -187,13 +187,11 @@ private int autoComputeParallelism(final Map<String, Long> recordsPerPartition,
for (String partitionPath : recordsPerPartition.keySet()) {
long numRecords = recordsPerPartition.get(partitionPath);
long numFiles =
filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath)
: 1L;
filesPerPartition.getOrDefault(partitionPath, 1L);

totalComparisons += numFiles * numRecords;
totalFiles +=
filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath)
: 0L;
filesPerPartition.getOrDefault(partitionPath, 0L);
totalRecords += numRecords;
}
logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles
Expand Down Expand Up @@ -340,7 +338,7 @@ JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileCompariso
}
}
return recordComparisons;
}).flatMapToPair(t -> t.iterator());
}).flatMapToPair(List::iterator);
}

/**
Expand Down Expand Up @@ -369,7 +367,7 @@ JavaPairRDD<String, String> findMatchingFilesForRecordKeys(

return fileSortedTripletRDD.mapPartitionsWithIndex(
new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
.flatMap(indexLookupResults -> indexLookupResults.iterator())
.flatMap(List::iterator)
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
List<Tuple2<String, String>> vals = new ArrayList<>();
Expand Down
Expand Up @@ -104,7 +104,7 @@ JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileCompariso
}
}
return recordComparisons;
}).flatMapToPair(t -> t.iterator());
}).flatMapToPair(List::iterator);
}

}
Expand Up @@ -97,7 +97,7 @@ public List<String> check(int maxAttempts, long initalDelayMs) {
.filter(p -> !fileNames.contains(new Path(basePath, p).getName()))
.collect(Collectors.toList());
})
.flatMap(itr -> itr.iterator()).collect();
.flatMap(List::iterator).collect();
if (remainingPaths.size() == 0) {
break; // we are done.
}
Expand Down
Expand Up @@ -65,9 +65,9 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private final WriteStatus writeStatus;
private final String fileId;
// Buffer for holding records in memory before they are flushed to disk
List<IndexedRecord> recordList = new ArrayList<>();
private List<IndexedRecord> recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
List<String> keysToDelete = new ArrayList<>();
private List<String> keysToDelete = new ArrayList<>();
private TableFileSystemView.RealtimeView fileSystemView;
private String partitionPath;
private Iterator<HoodieRecord<T>> recordItr;
Expand Down

0 comments on commit 6946dd7

Please sign in to comment.