diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java index 212e2d277372..cc139315d048 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/StatsCommand.java @@ -143,9 +143,8 @@ public String fileSizeStats( Snapshot s = globalHistogram.getSnapshot(); rows.add(printFileSizeHistogram("ALL", s)); - Function converterFunction = entry -> { - return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); - }; + Function converterFunction = entry -> + NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); Map> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put("Min", converterFunction); fieldNameToConverterMap.put("10th", converterFunction); diff --git a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala index 0058043ed638..1e6f003dba56 100644 --- a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala +++ b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/DedupeSparkJob.scala @@ -77,7 +77,7 @@ class DedupeSparkJob(basePath: String, val metadata = new HoodieTableMetaClient(fs.getConf, basePath) - val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}")) + val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath")) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]()) val filteredStatuses = latestFiles.map(f => f.getPath) @@ -92,8 +92,8 @@ class DedupeSparkJob(basePath: String, val dupeDataSql = s""" SELECT `_hoodie_record_key`, `_hoodie_partition_path`, `_hoodie_file_name`, `_hoodie_commit_time` - FROM ${tmpTableName} h - JOIN ${dedupeTblName} d + FROM $tmpTableName h + JOIN $dedupeTblName d ON h.`_hoodie_record_key` = d.dupe_key """ val dupeMap = sqlContext.sql(dupeDataSql).collectAsList().groupBy(r => r.getString(0)) @@ -101,8 +101,7 @@ class DedupeSparkJob(basePath: String, // Mark all files except the one with latest commits for deletion dupeMap.foreach(rt => { - val key = rt._1 - val rows = rt._2 + val (key, rows) = rt var maxCommit = -1L rows.foreach(r => { @@ -129,7 +128,7 @@ class DedupeSparkJob(basePath: String, def fixDuplicates(dryRun: Boolean = true) = { val metadata = new HoodieTableMetaClient(fs.getConf, basePath) - val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}")) + val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath")) val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles) val latestFiles: java.util.List[HoodieDataFile] = fsView.getLatestDataFiles().collect(Collectors.toList[HoodieDataFile]()) @@ -138,30 +137,28 @@ class DedupeSparkJob(basePath: String, val dupeFixPlan = planDuplicateFix() // 1. Copy all latest files into the temp fix path - fileNameToPathMap.foreach { case (fileName, filePath) => { + fileNameToPathMap.foreach { case (fileName, filePath) => val badSuffix = if (dupeFixPlan.contains(fileName)) ".bad" else "" - val dstPath = new Path(s"${repairOutputPath}/${filePath.getName}${badSuffix}") - LOG.info(s"Copying from ${filePath} to ${dstPath}") + val dstPath = new Path(s"$repairOutputPath/${filePath.getName}$badSuffix") + LOG.info(s"Copying from $filePath to $dstPath") FileUtil.copy(fs, filePath, fs, dstPath, false, true, fs.getConf) } - } // 2. Remove duplicates from the bad files - dupeFixPlan.foreach { case (fileName, keysToSkip) => { + dupeFixPlan.foreach { case (fileName, keysToSkip) => val commitTime = FSUtils.getCommitTime(fileNameToPathMap(fileName).getName) - val badFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}.bad") - val newFilePath = new Path(s"${repairOutputPath}/${fileNameToPathMap(fileName).getName}") + val badFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}.bad") + val newFilePath = new Path(s"$repairOutputPath/${fileNameToPathMap(fileName).getName}") LOG.info(" Skipping and writing new file for : " + fileName) SparkHelpers.skipKeysAndWriteNewFile(commitTime, fs, badFilePath, newFilePath, dupeFixPlan(fileName)) fs.delete(badFilePath, false) } - } // 3. Check that there are no duplicates anymore. - val df = sqlContext.read.parquet(s"${repairOutputPath}/*.parquet") + val df = sqlContext.read.parquet(s"$repairOutputPath/*.parquet") df.registerTempTable("fixedTbl") val dupeKeyDF = getDupeKeyDF("fixedTbl") - val dupeCnt = dupeKeyDF.count(); + val dupeCnt = dupeKeyDF.count() if (dupeCnt != 0) { dupeKeyDF.show() throw new HoodieException("Still found some duplicates!!.. Inspect output") @@ -169,7 +166,7 @@ class DedupeSparkJob(basePath: String, // 4. Additionally ensure no record keys are left behind. val sourceDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => t._2.toString).toList) - val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"${repairOutputPath}/${t._2.getName}").toList) + val fixedDF = sparkHelper.getDistinctKeyDF(fileNameToPathMap.map(t => s"$repairOutputPath/${t._2.getName}").toList) val missedRecordKeysDF = sourceDF.except(fixedDF) val missedCnt = missedRecordKeysDF.count() if (missedCnt != 0) { @@ -180,17 +177,16 @@ class DedupeSparkJob(basePath: String, println("No duplicates found & counts are in check!!!! ") // 4. Prepare to copy the fixed files back. - fileNameToPathMap.foreach { case (fileName, filePath) => { - val srcPath = new Path(s"${repairOutputPath}/${filePath.getName}") - val dstPath = new Path(s"${basePath}/${duplicatedPartitionPath}/${filePath.getName}") + fileNameToPathMap.foreach { case (_, filePath) => + val srcPath = new Path(s"$repairOutputPath/${filePath.getName}") + val dstPath = new Path(s"$basePath/$duplicatedPartitionPath/${filePath.getName}") if (dryRun) { - LOG.info(s"[JUST KIDDING!!!] Copying from ${srcPath} to ${dstPath}") + LOG.info(s"[JUST KIDDING!!!] Copying from $srcPath to $dstPath") } else { // for real - LOG.info(s"[FOR REAL!!!] Copying from ${srcPath} to ${dstPath}") + LOG.info(s"[FOR REAL!!!] Copying from $srcPath to $dstPath") FileUtil.copy(fs, srcPath, fs, dstPath, false, true, fs.getConf) } } - } } } diff --git a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala index f383e5fba4fa..81db77053274 100644 --- a/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala +++ b/hoodie-cli/src/main/scala/com/uber/hoodie/cli/SparkHelpers.scala @@ -94,12 +94,9 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { * @return */ def isFileContainsKey(rowKey: String, file: String): Boolean = { - println(s"Checking ${file} for key ${rowKey}") - val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '${rowKey}'") - if (ff.count() > 0) - return true - else - return false + println(s"Checking $file for key $rowKey") + val ff = getRowKeyDF(file).filter(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}` = '$rowKey'") + if (ff.count() > 0) true else false } /** @@ -109,7 +106,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { * @param sqlContext */ def getKeyCount(file: String, sqlContext: org.apache.spark.sql.SQLContext) = { - println(getRowKeyDF(file).collect().size) + println(getRowKeyDF(file).collect().length) } @@ -128,8 +125,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { val bf = new com.uber.hoodie.common.BloomFilter(bfStr) val foundCount = sqlContext.parquetFile(file) .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") - .collect(). - filter(r => !bf.mightContain(r.getString(0))).size + .collect().count(r => !bf.mightContain(r.getString(0))) val totalCount = getKeyCount(file, sqlContext) s"totalCount: ${totalCount}, foundCount: ${foundCount}" totalCount == foundCount diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java index 2f6495f0540b..6a43c9c88da5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -238,8 +238,8 @@ protected static List> getRenamingActionsToAl fileSystemView.getLatestMergedFileSlicesBeforeOrOn(op.getPartitionPath(), lastInstant.getTimestamp()) .filter(fs -> fs.getFileId().equals(op.getFileId())).findFirst().get(); final int maxVersion = - op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) - .reduce((x, y) -> x > y ? x : y).map(x -> x).orElse(0); + op.getDeltaFilePaths().stream().map(lf -> FSUtils.getFileVersionFromLog(new Path(lf))) + .reduce((x, y) -> x > y ? x : y).orElse(0); List logFilesToBeMoved = merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); return logFilesToBeMoved.stream().map(lf -> { @@ -322,8 +322,7 @@ private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient met Set diff = logFilesInFileSlice.stream().filter(lf -> !logFilesInCompactionOp.contains(lf)) .collect(Collectors.toSet()); - Preconditions.checkArgument(diff.stream() - .filter(lf -> !lf.getBaseCommitTime().equals(compactionInstant)).count() == 0, + Preconditions.checkArgument(diff.stream().allMatch(lf -> lf.getBaseCommitTime().equals(compactionInstant)), "There are some log-files which are neither specified in compaction plan " + "nor present after compaction request instant. Some of these :" + diff); } else { @@ -438,14 +437,14 @@ public List> getRenamingActionsForUnschedulin fileSystemView.getLatestMergedFileSlicesBeforeOrOn(operation.getPartitionPath(), lastInstant.getTimestamp()) .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); List logFilesToRepair = - merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant)) - .collect(Collectors.toList()); - logFilesToRepair.sort(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed()); + merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant)) + .sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed()) + .collect(Collectors.toList()); FileSlice fileSliceForCompaction = fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime()) .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); int maxUsedVersion = - fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getLogVersion()) + fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion) .orElse(HoodieLogFile.LOGFILE_BASE_VERSION - 1); String logExtn = fileSliceForCompaction.getLogFiles().findFirst().map(lf -> "." + lf.getFileExtension()) .orElse(HoodieLogFile.DELTA_EXTENSION); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 7389cf6e0b3d..3693b55bf929 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -440,7 +440,7 @@ private JavaRDD upsertRecordsInternal(JavaRDD> prep } else { return hoodieTable.handleInsertPartition(commitTime, partition, recordItr, partitioner); } - }, true).flatMap(writeStatuses -> writeStatuses.iterator()); + }, true).flatMap(List::iterator); return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime); } @@ -469,7 +469,7 @@ private JavaRDD> partition(JavaRDD> dedupedRecor Partitioner partitioner) { return dedupedRecords.mapToPair(record -> new Tuple2<>( new Tuple2<>(record.getKey(), Option.apply(record.getCurrentLocation())), record)) - .partitionBy(partitioner).map(tuple -> tuple._2()); + .partitionBy(partitioner).map(Tuple2::_2); } /** @@ -499,7 +499,7 @@ private boolean commit(String commitTime, JavaRDD writeStatuses, HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - List stats = writeStatuses.map(status -> status.getStat()).collect(); + List stats = writeStatuses.map(WriteStatus::getStat).collect(); updateMetadataAndRollingStats(actionType, metadata, stats); @@ -522,7 +522,7 @@ private boolean commit(String commitTime, JavaRDD writeStatuses, // add in extra metadata if (extraMetadata.isPresent()) { - extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v)); + extraMetadata.get().forEach(metadata::addMetadata); } try { @@ -806,7 +806,7 @@ private void rollback(List commits) { }); List pendingCompactionToRollback = - commits.stream().filter(c -> pendingCompactions.contains(c)).collect(Collectors.toList()); + commits.stream().filter(pendingCompactions::contains).collect(Collectors.toList()); List commitsToRollback = commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList()); @@ -837,12 +837,12 @@ private void rollback(List commits) { } // Remove interleaving pending compactions before rolling back commits - pendingCompactionToRollback.stream().forEach(this::deletePendingCompaction); + pendingCompactionToRollback.forEach(this::deletePendingCompaction); List stats = table.rollback(jsc, commitsToRollback); // cleanup index entries - commitsToRollback.stream().forEach(s -> { + commitsToRollback.forEach(s -> { if (!index.rollbackCommit(s)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + s); } @@ -1076,7 +1076,7 @@ JavaRDD> deduplicateRecords(JavaRDD> records, // everything // so pick it from one of the records. return new HoodieRecord(rec1.getKey(), reducedData); - }, parallelism).map(recordTuple -> recordTuple._2()); + }, parallelism).map(Tuple2::_2); } /** @@ -1099,7 +1099,7 @@ private HoodieTable getTableAndInitCtx() { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - if (table.getMetaClient().getCommitActionType() == HoodieTimeline.COMMIT_ACTION) { + if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { writeContext = metrics.getDeltaCommitCtx(); @@ -1214,7 +1214,7 @@ private void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTab private HoodieCommitMetadata doCompactionCommit(JavaRDD writeStatuses, HoodieTableMetaClient metaClient, String compactionCommitTime, Optional> extraMetadata) { - List updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat()) + List updateStatusMap = writeStatuses.map(WriteStatus::getStat) .collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java index 6efa70b0d4ab..5746e8d16725 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieMemoryConfig.java @@ -117,10 +117,9 @@ private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) { if (SparkEnv.get() != null) { // 1 GB is the default conf used by Spark, look at SparkContext.scala - long executorMemoryInBytes = Long.valueOf( - Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP, + long executorMemoryInBytes = Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024 - * 1024L); + * 1024L; // 0.6 is the default value used by Spark, // look at {@link // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 050958159035..fcbd90f0b3d2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -187,13 +187,11 @@ private int autoComputeParallelism(final Map recordsPerPartition, for (String partitionPath : recordsPerPartition.keySet()) { long numRecords = recordsPerPartition.get(partitionPath); long numFiles = - filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) - : 1L; + filesPerPartition.getOrDefault(partitionPath, 1L); totalComparisons += numFiles * numRecords; totalFiles += - filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) - : 0L; + filesPerPartition.getOrDefault(partitionPath, 0L); totalRecords += numRecords; } logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles @@ -340,7 +338,7 @@ JavaPairRDD> explodeRecordRDDWithFileCompariso } } return recordComparisons; - }).flatMapToPair(t -> t.iterator()); + }).flatMapToPair(List::iterator); } /** @@ -369,7 +367,7 @@ JavaPairRDD findMatchingFilesForRecordKeys( return fileSortedTripletRDD.mapPartitionsWithIndex( new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) - .flatMap(indexLookupResults -> indexLookupResults.iterator()) + .flatMap(List::iterator) .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> { List> vals = new ArrayList<>(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index 49c5fd3fc105..db1f76f32867 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -104,7 +104,7 @@ JavaPairRDD> explodeRecordRDDWithFileCompariso } } return recordComparisons; - }).flatMapToPair(t -> t.iterator()); + }).flatMapToPair(List::iterator); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java index 43913089e73d..0171e07b83a1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java @@ -97,7 +97,7 @@ public List check(int maxAttempts, long initalDelayMs) { .filter(p -> !fileNames.contains(new Path(basePath, p).getName())) .collect(Collectors.toList()); }) - .flatMap(itr -> itr.iterator()).collect(); + .flatMap(List::iterator).collect(); if (remainingPaths.size() == 0) { break; // we are done. } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 5434c576630d..452d111eb56e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -65,9 +65,9 @@ public class HoodieAppendHandle extends HoodieIOH private final WriteStatus writeStatus; private final String fileId; // Buffer for holding records in memory before they are flushed to disk - List recordList = new ArrayList<>(); + private List recordList = new ArrayList<>(); // Buffer for holding records (to be deleted) in memory before they are flushed to disk - List keysToDelete = new ArrayList<>(); + private List keysToDelete = new ArrayList<>(); private TableFileSystemView.RealtimeView fileSystemView; private String partitionPath; private Iterator> recordItr; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java index 0d991642d7b4..537bbedaf89b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java @@ -168,8 +168,7 @@ private List getFilesToCleanKeepingLatestCommits(String partitionPath) // The window of commit retain == max query run time. So a query could be running which // still // uses this file. - if (fileCommitTime.equals(lastVersion) || (lastVersionBeforeEarliestCommitToRetain != null - && fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { // move on to the next file continue; } @@ -180,9 +179,7 @@ private List getFilesToCleanKeepingLatestCommits(String partitionPath) .compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - if (aFile.isPresent()) { - deletePaths.add(aFile.get().getFileStatus().getPath().toString()); - } + aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileStatus().getPath().toString())); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well deletePaths.addAll(aSlice.getLogFiles().map(file -> file.getPath().toString()) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index a5dff6b90c14..f8647e5cbd68 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -195,12 +195,11 @@ private boolean deleteArchivedInstants(List archivedInstants) thr // Remove older meta-data from auxiliary path too Optional latestCommitted = - archivedInstants.stream() - .filter(i -> { - return i.isCompleted() - && (i.getAction().equals(COMMIT_ACTION) || (i.getAction().equals(DELTA_COMMIT_ACTION))); - }) - .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).findFirst(); + archivedInstants.stream() + .filter(i -> { + return i.isCompleted() + && (i.getAction().equals(COMMIT_ACTION) || (i.getAction().equals(DELTA_COMMIT_ACTION))); + }).max(Comparator.comparing(HoodieInstant::getTimestamp)); if (latestCommitted.isPresent()) { success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index bf575d416913..210f311bcc46 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -254,9 +254,7 @@ public void write(GenericRecord oldRecord) { public WriteStatus close() { try { // write out any pending records (this can happen when inserts are turned into updates) - Iterator pendingRecordsItr = keyToNewRecords.keySet().iterator(); - while (pendingRecordsItr.hasNext()) { - String key = pendingRecordsItr.next(); + for (String key : keyToNewRecords.keySet()) { if (!writtenRecordKeys.contains(key)) { HoodieRecord hoodieRecord = keyToNewRecords.get(key); writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 4d47e76055a1..ae74a58f36b2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -90,7 +90,7 @@ public JavaRDD compact(JavaSparkContext jsc, log.info("Compactor compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)) - .flatMap(writeStatusesItr -> writeStatusesItr.iterator()); + .flatMap(List::iterator); } private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, @@ -141,7 +141,7 @@ private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, } Iterable> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream) - .map(s -> { + .peek(s -> { s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog()); s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); @@ -154,7 +154,6 @@ private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); s.getStat().setRuntimeStats(runtimeStats); - return s; }).collect(toList()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java index fa5db25278b9..d7444818ca3a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -74,7 +74,7 @@ public Map captureMetrics(HoodieWriteConfig writeConfig, Optiona metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue()); metrics.put(TOTAL_IO_MB, totalIO.doubleValue()); metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); - metrics.put(TOTAL_LOG_FILES, Double.valueOf(logFiles.size())); + metrics.put(TOTAL_LOG_FILES, (double) logFiles.size()); return metrics; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java index 57c32311b381..5ada7c4cb01b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java @@ -139,13 +139,8 @@ private FSDataOutputStream wrapOutputStream(final Path path, return fsDataOutputStream; } - SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(fsDataOutputStream, - new Runnable() { - @Override - public void run() { - openStreams.remove(path.getName()); - } - }); + SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream( + fsDataOutputStream, () -> openStreams.remove(path.getName())); openStreams.put(path.getName(), os); return os; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index aa9afa6fc3b0..5adf45fe06e6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -145,7 +145,7 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { logger.info(String - .format("Sending rollback metrics (duration=%d, numFilesDeleted=$d)", durationInMs, + .format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); registerGauge(getMetricsName("rollback", "duration"), durationInMs); registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); @@ -180,12 +180,7 @@ String getMetricsName(String action, String metric) { void registerGauge(String metricName, final long value) { try { MetricRegistry registry = Metrics.getInstance().getRegistry(); - registry.register(metricName, new Gauge() { - @Override - public Long getValue() { - return value; - } - }); + registry.register(metricName, (Gauge) () -> value); } catch (Exception e) { // Here we catch all exception, so the major upsert pipeline will not be affected if the // metrics system diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 0a7eeaecab6e..b9d7c18d3105 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -201,17 +201,15 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId); } else { AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); - ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) - .withConf(getHadoopConf()).build(); BoundedInMemoryExecutor wrapper = null; - try { + try (ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) + .withConf(getHadoopConf()).build()) { wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader), - new UpdateHandler(upsertHandle), x -> x); + new UpdateHandler(upsertHandle), x -> x); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - reader.close(); upsertHandle.close(); if (null != wrapper) { wrapper.shutdownNow(); @@ -480,7 +478,7 @@ private List cleanPartitionPaths(List partitionsToClean .merge(e2)).collect(); Map partitionCleanStatsMap = partitionCleanStats.stream() - .collect(Collectors.toMap(e -> e._1(), e -> e._2())); + .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); // Return PartitionCleanStat for each partition passed. diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index ad065fbfcadf..849dbae45caf 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -52,16 +52,7 @@ import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -185,105 +176,103 @@ public List rollback(JavaSparkContext jsc, List comm .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants() .filter(i -> commits.contains(i.getTimestamp())) - .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); + .collect(Collectors.toMap(HoodieInstant::getTimestamp, i -> i)); // Atomically un-publish all non-inflight commits - commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue()) + commitsAndCompactions.entrySet().stream().map(Map.Entry::getValue) .filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight); logger.info("Unpublished " + commits); Long startTime = System.currentTimeMillis(); List allRollbackStats = jsc.parallelize(FSUtils .getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) - .map((Function>) partitionPath -> { - return commits.stream().map(commit -> { - HoodieInstant instant = commitsAndCompactions.get(commit); - HoodieRollbackStat hoodieRollbackStats = null; - // Need to put the path filter here since Filter is not serializable - // PathFilter to get all parquet files and log files that need to be deleted - PathFilter filter = (path) -> { - if (path.toString().contains(".parquet")) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commits.contains(fileCommitTime); - } else if (path.toString().contains(".log")) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commits.contains(fileCommitTime); + .map((Function>) partitionPath -> commits.stream().map(commit -> { + HoodieInstant instant = commitsAndCompactions.get(commit); + HoodieRollbackStat hoodieRollbackStats = null; + // Need to put the path filter here since Filter is not serializable + // PathFilter to get all parquet files and log files that need to be deleted + PathFilter filter = (path) -> { + if (path.toString().contains(".parquet")) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commits.contains(fileCommitTime); + } else if (path.toString().contains(".log")) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commits.contains(fileCommitTime); + } + return false; + }; + + switch (instant.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + try { + Map results = super + .deleteCleanedFiles(partitionPath, Collections.singletonList(commit)); + hoodieRollbackStats = HoodieRollbackStat.newBuilder() + .withPartitionPath(partitionPath).withDeletedFileResults(results).build(); + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); } - return false; - }; - - switch (instant.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.COMPACTION_ACTION: - try { - Map results = super - .deleteCleanedFiles(partitionPath, Arrays.asList(commit)); - hoodieRollbackStats = HoodieRollbackStat.newBuilder() - .withPartitionPath(partitionPath).withDeletedFileResults(results).build(); - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.getCommitTimeline().getInstantDetails( + new HoodieInstant(true, instant.getAction(), instant.getTimestamp())) + .get(), HoodieCommitMetadata.class); + + // read commit file and (either append delete blocks or delete file) + final Map filesToDeletedStatus = new HashMap<>(); + Map filesToNumBlocksRollback = new HashMap<>(); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); + final Set deletedFiles = filesToDeletedStatus.entrySet().stream() + .map(entry -> { + Path filePath = entry.getKey().getPath(); + return FSUtils.getFileIdFromFilePath(filePath); + }).collect(Collectors.toSet()); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus, + filesToNumBlocksRollback, deletedFiles); } - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - metaClient.getCommitTimeline().getInstantDetails( - new HoodieInstant(true, instant.getAction(), instant.getTimestamp())) - .get(), HoodieCommitMetadata.class); - - // read commit file and (either append delete blocks or delete file) - final Map filesToDeletedStatus = new HashMap<>(); - Map filesToNumBlocksRollback = new HashMap<>(); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - final Set deletedFiles = filesToDeletedStatus.entrySet().stream() - .map(entry -> { - Path filePath = entry.getKey().getPath(); - return FSUtils.getFileIdFromFilePath(filePath); - }).collect(Collectors.toSet()); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - hoodieRollbackStats = rollback(index, partitionPath, commit, commitMetadata, filesToDeletedStatus, - filesToNumBlocksRollback, deletedFiles); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - default: break; - } - return hoodieRollbackStats; - }).collect(Collectors.toList()); - }).flatMap(x -> x.iterator()).filter(x -> x != null).collect(); + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } + default: + break; + } + return hoodieRollbackStats; + }).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect(); commitsAndCompactions.entrySet().stream().map( entry -> new HoodieInstant(true, entry.getValue().getAction(), @@ -312,9 +301,8 @@ protected HoodieRollingStatMetadata getRollingStats() { Optional lastRollingStat = Optional.ofNullable(commitMetadata.getExtraMetadata() .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)); if (lastRollingStat.isPresent()) { - HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata + return HoodieCommitMetadata .fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class); - return rollingStatMetadata; } } return null; @@ -411,8 +399,7 @@ private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { HoodieRollingStat rollingStatForFile = partitionRollingStats.get(fileSlice.getFileId()); if (rollingStatForFile != null) { long inserts = rollingStatForFile.getInserts(); - long totalSize = averageRecordSize * inserts; - return totalSize; + return averageRecordSize * inserts; } } } @@ -427,10 +414,7 @@ private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { private boolean isSmallFile(String partitionPath, FileSlice fileSlice) { long totalSize = getTotalFileSize(partitionPath, fileSlice); - if (totalSize < config.getParquetMaxFileSize()) { - return true; - } - return false; + return totalSize < config.getParquetMaxFileSize(); } // TODO (NA) : Make this static part of utility @@ -470,11 +454,8 @@ private HoodieRollbackStat rollback(HoodieIndex hoodieIndex, String partitionPat commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() .filter(wStat -> { // Filter out stats without prevCommit since they are all inserts - if (wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null - && !deletedFiles.contains(wStat.getFileId())) { - return true; - } - return false; + return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null + && !deletedFiles.contains(wStat.getFileId()); }).forEach(wStat -> { HoodieLogFormat.Writer writer = null; String baseCommitTime = wStat.getPrevCommit(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java index 1f30cfad4611..01aac41272d5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java @@ -21,10 +21,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import java.io.Serializable; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.TreeMap; +import java.util.*; import java.util.stream.Stream; /** @@ -33,10 +30,7 @@ public class HoodieFileGroup implements Serializable { public static Comparator getReverseCommitTimeComparator() { - return (o1, o2) -> { - // reverse the order - return o2.compareTo(o1); - }; + return Comparator.reverseOrder(); } /** @@ -127,7 +121,7 @@ private boolean isFileSliceCommitted(FileSlice slice) { * Get all the the file slices including in-flight ones as seen in underlying file-system */ public Stream getAllFileSlicesIncludingInflight() { - return fileSlices.entrySet().stream().map(sliceEntry -> sliceEntry.getValue()); + return fileSlices.entrySet().stream().map(Map.Entry::getValue); } /** @@ -143,8 +137,8 @@ public Optional getLatestFileSlicesIncludingInflight() { public Stream getAllFileSlices() { if (!timeline.empty()) { return fileSlices.entrySet().stream() - .map(sliceEntry -> sliceEntry.getValue()) - .filter(slice -> isFileSliceCommitted(slice)); + .map(Map.Entry::getValue) + .filter(this::isFileSliceCommitted); } return Stream.empty(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java index 9f159d6cae51..88acc4ae6b68 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java @@ -103,7 +103,7 @@ public static Comparator getBaseInstantAndLogVersionComparator() return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion()); } // reverse the order by base-commits - return new Integer(baseInstantTime2.compareTo(baseInstantTime1)); + return baseInstantTime2.compareTo(baseInstantTime1); }; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java index f42238ed6721..346c31903bbd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/StorageSchemes.java @@ -53,14 +53,13 @@ public boolean supportsAppend() { } public static boolean isSchemeSupported(String scheme) { - return Arrays.stream(values()).filter(s -> s.getScheme().equals(scheme)).count() > 0; + return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme)); } public static boolean isAppendSupported(String scheme) { if (!isSchemeSupported(scheme)) { throw new IllegalArgumentException("Unsupported scheme :" + scheme); } - return Arrays.stream(StorageSchemes.values()) - .filter(s -> s.supportsAppend() && s.scheme.equals(scheme)).count() > 0; + return Arrays.stream(StorageSchemes.values()).anyMatch(s -> s.supportsAppend() && s.scheme.equals(scheme)); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java index 007a0f825ed4..345f2906e611 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java @@ -95,27 +95,24 @@ public static void createHoodieProperties(FileSystem fs, Path metadataFolder, fs.mkdirs(metadataFolder); } Path propertyPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE); - FSDataOutputStream outputStream = fs.create(propertyPath); - try { + try (FSDataOutputStream outputStream = fs.create(propertyPath)) { if (!properties.containsKey(HOODIE_TABLE_NAME_PROP_NAME)) { throw new IllegalArgumentException( - HOODIE_TABLE_NAME_PROP_NAME + " property needs to be specified"); + HOODIE_TABLE_NAME_PROP_NAME + " property needs to be specified"); } if (!properties.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) { properties.setProperty(HOODIE_TABLE_TYPE_PROP_NAME, DEFAULT_TABLE_TYPE.name()); } if (properties.getProperty(HOODIE_TABLE_TYPE_PROP_NAME) == HoodieTableType.MERGE_ON_READ - .name() - && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { + .name() + && !properties.containsKey(HOODIE_PAYLOAD_CLASS_PROP_NAME)) { properties.setProperty(HOODIE_PAYLOAD_CLASS_PROP_NAME, DEFAULT_PAYLOAD_CLASS); } if (!properties.containsKey(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)) { properties.setProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER); } properties - .store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); - } finally { - outputStream.close(); + .store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis())); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java index fc0402f08d5f..e6d87ca5c6a9 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java @@ -57,7 +57,7 @@ public static HoodieLogBlock getBlock(HoodieLogFile logFile, long blockSize, long blockEndPos, Map header, - Map footer) throws IOException { + Map footer) { return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily, Optional.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 91520b7252ba..cf5cb9eecd7b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -91,10 +91,10 @@ public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, List pendingCompactionInstants = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); this.fileIdToPendingCompaction = ImmutableMap.copyOf( - CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream().map(entry -> { - return Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), - CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue()))); - }).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); + CompactionUtils.getAllPendingCompactionOperations(metaClient).entrySet().stream() + .map(entry -> Pair.of(entry.getKey(), Pair.of(entry.getValue().getKey(), + CompactionOperation.convertFromAvroRecordInstance(entry.getValue().getValue())))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue))); } /** @@ -152,10 +152,10 @@ private List addFilesToView(FileStatus[] statuses) { String fileId = pair.getValue(); HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, visibleActiveTimeline); if (dataFiles.containsKey(pair)) { - dataFiles.get(pair).forEach(dataFile -> group.addDataFile(dataFile)); + dataFiles.get(pair).forEach(group::addDataFile); } if (logFiles.containsKey(pair)) { - logFiles.get(pair).forEach(logFile -> group.addLogFile(logFile)); + logFiles.get(pair).forEach(group::addLogFile); } if (fileIdToPendingCompaction.containsKey(fileId)) { // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears @@ -219,9 +219,7 @@ public Stream getLatestDataFiles(final String partitionPath) { @Override public Stream getLatestDataFiles() { return fileGroupMap.values().stream() - .map(fileGroup -> { - return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); - }) + .map(fileGroup -> fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst()) .filter(Optional::isPresent) .map(Optional::get); } @@ -230,15 +228,13 @@ public Stream getLatestDataFiles() { public Stream getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime) { return getAllFileGroups(partitionPath) - .map(fileGroup -> { - return fileGroup.getAllDataFiles() - .filter(dataFile -> - HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), - maxCommitTime, - HoodieTimeline.LESSER_OR_EQUAL)) - .filter(df -> !isDataFileDueToPendingCompaction(df)) - .findFirst(); - }) + .map(fileGroup -> fileGroup.getAllDataFiles() + .filter(dataFile -> + HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), + maxCommitTime, + HoodieTimeline.LESSER_OR_EQUAL)) + .filter(df -> !isDataFileDueToPendingCompaction(df)) + .findFirst()) .filter(Optional::isPresent) .map(Optional::get); } @@ -246,12 +242,10 @@ public Stream getLatestDataFilesBeforeOrOn(String partitionPath, @Override public Stream getLatestDataFilesInRange(List commitsToReturn) { return fileGroupMap.values().stream() - .map(fileGroup -> { - return fileGroup.getAllDataFiles() - .filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime()) - && !isDataFileDueToPendingCompaction(dataFile)) - .findFirst(); - }) + .map(fileGroup -> fileGroup.getAllDataFiles() + .filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime()) + && !isDataFileDueToPendingCompaction(dataFile)) + .findFirst()) .filter(Optional::isPresent) .map(Optional::get); } @@ -259,15 +253,13 @@ public Stream getLatestDataFilesInRange(List commitsToRe @Override public Stream getLatestDataFilesOn(String partitionPath, String instantTime) { return getAllFileGroups(partitionPath) - .map(fileGroup -> { - return fileGroup.getAllDataFiles() - .filter(dataFile -> - HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), - instantTime, - HoodieTimeline.EQUAL)) - .filter(df -> !isDataFileDueToPendingCompaction(df)) - .findFirst(); - }) + .map(fileGroup -> fileGroup.getAllDataFiles() + .filter(dataFile -> + HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), + instantTime, + HoodieTimeline.EQUAL)) + .filter(df -> !isDataFileDueToPendingCompaction(df)) + .findFirst()) .filter(Optional::isPresent) .map(Optional::get); } @@ -275,7 +267,7 @@ public Stream getLatestDataFilesOn(String partitionPath, String @Override public Stream getAllDataFiles(String partitionPath) { return getAllFileGroups(partitionPath) - .map(fileGroup -> fileGroup.getAllDataFiles()) + .map(HoodieFileGroup::getAllDataFiles) .flatMap(dataFileList -> dataFileList) .filter(df -> !isDataFileDueToPendingCompaction(df)); } @@ -283,7 +275,7 @@ public Stream getAllDataFiles(String partitionPath) { @Override public Stream getLatestFileSlices(String partitionPath) { return getAllFileGroups(partitionPath) - .map(fileGroup -> fileGroup.getLatestFileSlice()) + .map(HoodieFileGroup::getLatestFileSlice) .filter(Optional::isPresent) .map(Optional::get) .map(this::filterDataFileAfterPendingCompaction); @@ -312,11 +304,8 @@ public Stream getLatestUnCompactedFileSlices(String partitionPath) { */ private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { Pair compactionWithInstantTime = fileIdToPendingCompaction.get(fileSlice.getFileId()); - if ((null != compactionWithInstantTime) - && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey())) { - return true; - } - return false; + return (null != compactionWithInstantTime) + && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.getKey()); } /** @@ -330,7 +319,7 @@ private FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) { // Data file is filtered out of the file-slice as the corresponding compaction // instant not completed yet. FileSlice transformed = new FileSlice(fileSlice.getBaseInstantTime(), fileSlice.getFileId()); - fileSlice.getLogFiles().forEach(lf -> transformed.addLogFile(lf)); + fileSlice.getLogFiles().forEach(transformed::addLogFile); return transformed; } return fileSlice; @@ -358,8 +347,8 @@ private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, F merged.setDataFile(penultimateSlice.getDataFile().get()); } // Add Log files from penultimate and last slices - penultimateSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf)); - lastSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf)); + penultimateSlice.getLogFiles().forEach(merged::addLogFile); + lastSlice.getLogFiles().forEach(merged::addLogFile); return merged; } @@ -409,7 +398,7 @@ public Stream getLatestFileSliceInRange(List commitsToReturn) @Override public Stream getAllFileSlices(String partitionPath) { return getAllFileGroups(partitionPath) - .map(group -> group.getAllFileSlices()) + .map(HoodieFileGroup::getAllFileSlices) .flatMap(sliceList -> sliceList); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index 636bec583ec9..a112a6de29f0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -80,10 +80,8 @@ public static Set filterParquetRowKeys(Configuration configuration, Path Schema readSchema = HoodieAvroUtils.getRecordKeySchema(); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); - ParquetReader reader = null; Set rowKeys = new HashSet<>(); - try { - reader = AvroParquetReader.builder(filePath).withConf(conf).build(); + try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build()) { Object obj = reader.read(); while (obj != null) { if (obj instanceof GenericRecord) { @@ -97,15 +95,8 @@ public static Set filterParquetRowKeys(Configuration configuration, Path } catch (IOException e) { throw new HoodieIOException("Failed to read row keys from Parquet " + filePath, e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - // ignore - } - } } + // ignore return rowKeys; } @@ -212,10 +203,7 @@ static class RecordKeysFilterFunction implements Function { @Override public Boolean apply(String recordKey) { - if (candidateKeys.contains(recordKey)) { - return true; - } - return false; + return candidateKeys.contains(recordKey); } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java index 54348d5f56c7..9beaa24dc17a 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ReflectionUtils.java @@ -83,7 +83,7 @@ public static Object loadClass(String clazz, Class[] constructorArgTypes, Obj */ public static Object loadClass(String clazz, Object... constructorArgs) { Class[] constructorArgTypes = Arrays.stream(constructorArgs) - .map(arg -> arg.getClass()).toArray(Class[]::new); + .map(Object::getClass).toArray(Class[]::new); return loadClass(clazz, constructorArgTypes, constructorArgs); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java index e2bd10b8ddbe..b48f891467c6 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -131,9 +131,9 @@ protected static String arrayWritableToString(ArrayWritable writable) { builder.append(String.format("(Size: %s)[", values.length)); for (Writable w : values) { if (w instanceof ArrayWritable) { - builder.append(arrayWritableToString((ArrayWritable) w) + " "); + builder.append(arrayWritableToString((ArrayWritable) w)).append(" "); } else { - builder.append(w + " "); + builder.append(w).append(" "); } } builder.append("]"); @@ -186,7 +186,7 @@ public static Schema generateProjectionSchema(Schema writeSchema, List f if (field == null) { throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + "Derived Schema Fields: " - + schemaFieldsMap.keySet().stream().collect(Collectors.toList())); + + new ArrayList<>(schemaFieldsMap.keySet())); } projectedFields .add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala index 0dd03ee15f3e..75d13e7cfbd7 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala @@ -58,7 +58,7 @@ object AvroConversionUtils { def createConverterToAvro(dataType: DataType, structName: String, - recordNamespace: String): (Any) => Any = { + recordNamespace: String): Any => Any = { dataType match { case BinaryType => (item: Any) => item match { diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala index 7e4f8f03efc0..5df7118bd4dc 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -79,7 +79,7 @@ object DataSourceWriteOptions { val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" val INSERT_OPERATION_OPT_VAL = "insert" val UPSERT_OPERATION_OPT_VAL = "upsert" - val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL; + val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL /** * The storage type for the underlying data, for this write. diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala index 019a76e235e1..1f7b47682abc 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/IncrementalRelation.scala @@ -56,7 +56,7 @@ class IncrementalRelation(val sqlContext: SQLContext, // TODO : Figure out a valid HoodieWriteConfig val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), sqlContext.sparkContext) - val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants(); + val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") } @@ -90,7 +90,7 @@ class IncrementalRelation(val sqlContext: SQLContext, } // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view // will filter out all the files incorrectly. - sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class"); + sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) sqlContext.read.options(sOpts) .schema(latestSchema) // avoid AnalysisException for empty input diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index a91e8f396bd5..189c97a7cf9a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -99,7 +99,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi Path outputPath = new Path(outputDir); if (fs.exists(outputPath)) { logger.warn( - String.format("The output path %targetBasePath already exists, deleting", outputPath)); + String.format("The output path %s targetBasePath already exists, deleting", outputPath)); fs.delete(new Path(outputDir), true); } @@ -155,7 +155,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi } if (fs.exists(targetFilePath)) { logger.error(String.format( - "The target output commit file (%targetBasePath) already exists.", targetFilePath)); + "The target output commit file (%s targetBasePath) already exists.", targetFilePath)); } FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf()); } @@ -166,7 +166,8 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi // Create the _SUCCESS tag Path successTagPath = new Path(outputDir + "/_SUCCESS"); if (!fs.exists(successTagPath)) { - logger.info("Creating _SUCCESS under targetBasePath: " + outputDir); + logger.info(String.format( + "Creating _SUCCESS under targetBasePath: $s", outputDir)); fs.createNewFile(successTagPath); } } @@ -175,7 +176,7 @@ public static void main(String[] args) throws IOException { // Take input configs final Config cfg = new Config(); new JCommander(cfg, args); - logger.info(String.format("Snapshot hoodie table from %targetBasePath to %targetBasePath", + logger.info(String.format("Snapshot hoodie table from %s targetBasePath to %stargetBasePath", cfg.basePath, cfg.outputPath)); // Create a spark job to do the snapshot copy diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index 7cd040dfed98..b5008cf31aab 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -95,14 +95,8 @@ public static String parseSchema(FileSystem fs, String schemaFile) throws Except } long len = fs.getFileStatus(p).getLen(); ByteBuffer buf = ByteBuffer.allocate((int) len); - FSDataInputStream inputStream = null; - try { - inputStream = fs.open(p); + try (FSDataInputStream inputStream = fs.open(p)) { inputStream.readFully(0, buf.array(), 0, buf.array().length); - } finally { - if (inputStream != null) { - inputStream.close(); - } } return new String(buf.array()); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java index 3228fdeb5e3e..f6a0c90e3855 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java @@ -28,6 +28,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.TimeZone; import org.apache.avro.generic.GenericRecord; @@ -73,7 +74,7 @@ public TimestampBasedKeyGenerator(TypedProperties config) { if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { DataSourceUtils - .checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); + .checkRequiredProperties(config, Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.inputDateFormat = new SimpleDateFormat( config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.inputDateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java index d4df829c4360..4629d4b40aaf 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java @@ -23,7 +23,8 @@ import com.uber.hoodie.common.util.TypedProperties; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; + import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,7 +53,7 @@ public static class Config { public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP)); this.fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), jssc.hadoopConfiguration()); try { this.sourceSchema = new Schema.Parser().parse( diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java index e77a67bed335..3312db5aae8f 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java @@ -25,7 +25,8 @@ import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.net.URL; -import java.util.Arrays; +import java.util.Collections; + import org.apache.avro.Schema; import org.apache.spark.api.java.JavaSparkContext; @@ -55,7 +56,7 @@ private String fetchSchemaFromRegistry(String registryUrl) throws IOException { public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.SCHEMA_REGISTRY_URL_PROP)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SCHEMA_REGISTRY_URL_PROP)); String registryUrl = props.getString(Config.SCHEMA_REGISTRY_URL_PROP); try { this.schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java index e8abfba79180..feb06d5b362a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroConvertor.java @@ -87,7 +87,7 @@ public Schema getSchema() { } - public GenericRecord fromAvroBinary(byte[] avroBinary) throws IOException { + public GenericRecord fromAvroBinary(byte[] avroBinary) { initSchema(); initInjection(); return recordInjection.invert(avroBinary).get(); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java index a4d01d1b9c75..4e14714134bf 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java @@ -39,9 +39,8 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Sch @Override protected JavaRDD toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) { - JavaRDD recordRDD = KafkaUtils + return KafkaUtils .createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, kafkaParams, offsetRanges).values().map(obj -> (GenericRecord) obj); - return recordRDD; } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java index 2d4d07851b2d..6d962b276d57 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java @@ -26,10 +26,7 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.utilities.schema.SchemaProvider; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; @@ -58,7 +55,7 @@ static class Config { public DFSSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { super(props, sparkContext, schemaProvider); - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP)); this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), sparkContext.hadoopConfiguration()); } @@ -75,16 +72,14 @@ public Pair>, String> fetchNewData( new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true); while (fitr.hasNext()) { LocatedFileStatus fileStatus = fitr.next(); - if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream().filter( - pfx -> fileStatus.getPath().getName().startsWith(pfx)).count() > 0) { + if (fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() + .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; } eligibleFiles.add(fileStatus); } // sort them by modification time. - eligibleFiles.sort((FileStatus f1, FileStatus f2) -> Long.valueOf(f1.getModificationTime()) - .compareTo(Long.valueOf( - f2.getModificationTime()))); + eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime)); // Filter based on checkpoint & input size, if needed long currentBytes = 0; @@ -110,8 +105,7 @@ public Pair>, String> fetchNewData( // no data to read if (filteredFiles.size() == 0) { return new ImmutablePair<>(Optional.empty(), - lastCheckpointStr.isPresent() ? lastCheckpointStr.get() - : String.valueOf(Long.MIN_VALUE)); + lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE))); } // read the files out. diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java index faf6d0966785..3d6af40fab58 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java @@ -76,7 +76,7 @@ static class Config { public HiveIncrPullSource(TypedProperties props, JavaSparkContext sparkContext, SchemaProvider schemaProvider) { super(props, sparkContext, schemaProvider); - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP)); this.incrPullRootPath = props.getString(Config.ROOT_INPUT_PATH_PROP); this.fs = FSUtils.getFs(incrPullRootPath, sparkContext.hadoopConfiguration()); } @@ -121,7 +121,7 @@ public Pair>, String> fetchNewData( if (!commitToPull.isPresent()) { return new ImmutablePair<>(Optional.empty(), - lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); + lastCheckpointStr.orElse("")); } // read the files out. diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java index 3167dbe592ce..6b1018e152b9 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonDFSSource.java @@ -35,6 +35,6 @@ public JsonDFSSource(TypedProperties props, JavaSparkContext sparkContext, Schem @Override protected JavaRDD fromFiles(AvroConvertor convertor, String pathStr) { - return sparkContext.textFile(pathStr).map((String j) -> convertor.fromJson(j)); + return sparkContext.textFile(pathStr).map(convertor::fromJson); } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java index 6dd111654b21..b271e370494a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/JsonKafkaSource.java @@ -40,6 +40,6 @@ public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext protected JavaRDD toAvroRDD(OffsetRange[] offsetRanges, AvroConvertor avroConvertor) { return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, offsetRanges) - .values().map(jsonStr -> avroConvertor.fromJson(jsonStr)); + .values().map(avroConvertor::fromJson); } } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java index 5e0328016fa7..4699fcaf32cc 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/KafkaSource.java @@ -25,11 +25,8 @@ import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; import com.uber.hoodie.utilities.schema.SchemaProvider; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Optional; + +import java.util.*; import java.util.stream.Collectors; import kafka.common.TopicAndPartition; import org.apache.avro.generic.GenericRecord; @@ -85,7 +82,7 @@ public static HashMap strToOffsets */ public static String offsetsToStr(OffsetRange[] ranges) { StringBuilder sb = new StringBuilder(); - // atleast 1 partition will be present. + // at least 1 partition will be present. sb.append(ranges[0].topic() + ","); sb.append(Arrays.stream(ranges) .map(r -> String.format("%s:%d", r.partition(), r.untilOffset())) @@ -106,8 +103,7 @@ public static OffsetRange[] computeOffsetRanges( HashMap toOffsetMap, long numEvents) { - Comparator byPartition = (OffsetRange o1, OffsetRange o2) -> - Integer.valueOf(o1.partition()).compareTo(Integer.valueOf(o2.partition())); + Comparator byPartition = Comparator.comparing(OffsetRange::partition); // Create initial offset ranges for each 'to' partition, with from = to offsets. OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; @@ -144,7 +140,7 @@ public static OffsetRange[] computeOffsetRanges( } public static long totalNewMessages(OffsetRange[] ranges) { - return Arrays.asList(ranges).stream().mapToLong(r -> r.count()).sum(); + return Arrays.stream(ranges).mapToLong(OffsetRange::count).sum(); } } @@ -166,13 +162,20 @@ public static java.util.Map toJavaMap(Map m) { } } + /** + * Kafka reset offset strategies + */ + enum KafkaResetOffsetStrategies { + LARGEST, + SMALLEST + } /** * Configs to be passed for this source. All standard Kafka consumer configs are also respected */ static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; - private static final String DEFAULT_AUTO_RESET_OFFSET = "largest"; + private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST; } @@ -187,7 +190,7 @@ public KafkaSource(TypedProperties props, JavaSparkContext sparkContext, SchemaP for (Object prop : props.keySet()) { kafkaParams.put(prop.toString(), props.getString(prop.toString())); } - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.KAFKA_TOPIC_NAME)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); } @@ -200,7 +203,7 @@ public Pair>, String> fetchNewData( // Obtain current metadata for the topic KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams)); Either, Set> either = cluster.getPartitions( - ScalaHelpers.toScalaSet(new HashSet<>(Arrays.asList(topicName)))); + ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName)))); if (either.isLeft()) { // log errors. and bail out. throw new HoodieDeltaStreamerException("Error obtaining partition metadata", @@ -213,17 +216,20 @@ public Pair>, String> fetchNewData( if (lastCheckpointStr.isPresent()) { fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); } else { - String autoResetValue = props - .getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET); - if (autoResetValue.equals("smallest")) { - fromOffsets = new HashMap(ScalaHelpers.toJavaMap( - cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); - } else if (autoResetValue.equals("largest")) { - fromOffsets = new HashMap( - ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); - } else { - throw new HoodieNotSupportedException( - "Auto reset value must be one of 'smallest' or 'largest' "); + KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf( + props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); + switch (autoResetValue) { + case SMALLEST: + fromOffsets = new HashMap(ScalaHelpers.toJavaMap( + cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + break; + case LARGEST: + fromOffsets = new HashMap( + ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); + break; + default: + throw new HoodieNotSupportedException( + "Auto reset value must be one of 'smallest' or 'largest' "); } } @@ -236,7 +242,7 @@ public Pair>, String> fetchNewData( OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); if (totalNewMsgs <= 0) { - return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : ""); + return new ImmutablePair<>(Optional.empty(), lastCheckpointStr.orElse("")); } else { log.info("About to read " + totalNewMsgs + " from Kafka for topic :" + topicName); } diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java index f4c589a00269..57369de33cbf 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java @@ -71,7 +71,7 @@ private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { public Pair>, String> fetchNewData(Optional lastCheckpointStr, long sourceLimit) { - int nextCommitNum = lastCheckpointStr.isPresent() ? Integer.parseInt(lastCheckpointStr.get()) + 1 : 0; + int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 1).orElse(0); String commitTime = String.format("%05d", nextCommitNum); // No new data. if (sourceLimit <= 0) { diff --git a/pom.xml b/pom.xml index f4255a0782ae..88196084c9a4 100644 --- a/pom.xml +++ b/pom.xml @@ -197,6 +197,29 @@ + + org.scalastyle + scalastyle-maven-plugin + 1.0.0 + + false + true + true + false + ${project.basedir}/src/main/scala + ${project.basedir}/src/test/scala + style/scalastyle-config.xml + UTF-8 + + + + compile + + check + + + + org.apache.maven.plugins maven-compiler-plugin @@ -634,11 +657,6 @@ 1.9.13 - - junit - junit - ${junit.version} - org.apache.hadoop hadoop-hdfs diff --git a/style/scalastyle-config.xml b/style/scalastyle-config.xml new file mode 100644 index 000000000000..e7e41a123ae9 --- /dev/null +++ b/style/scalastyle-config.xml @@ -0,0 +1,98 @@ + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +