Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2788] Fixing issues w/ Z-order Layout Optimization #4026

Merged
merged 12 commits into from
Nov 24, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.spark.sql.Row;

/**
* A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition.
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
* support z-curve optimization, hilbert will come soon.
* @param <T> HoodieRecordPayload type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,10 @@ public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {

SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos = new ArrayList<>();
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
String description = "Listing parquet column statistics";
jsc.setJobDescription(description);
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
Expand All @@ -209,7 +208,7 @@ public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
}

Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(fileToStatsListMap.values().stream().collect(Collectors.toList()), 1).map(f -> {
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
Expand Down Expand Up @@ -299,50 +298,54 @@ public static void saveStatisticsInfo(Dataset<Row> df, String cols, String index
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
if (fs.exists(new Path(indexPath))) {
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// If there's currently no index, create one
if (!fs.exists(new Path(indexPath))) {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
return;
}

// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
// Otherwise, clean up all indexes but the most recent one

List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}

if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add error message to the exception to get some specific context?
understand that this code existed prior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will actually tackle this on as part of #4060

}
});

if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ object HoodieSparkUtils extends SparkAdapterSupport {
*/
def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = {
var selectsql = ""
for (i <- (0 to cols.size-1)) {
selectsql = selectsql + s" if (${leftTable}.${cols(0)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
for (i <- cols.indices) {
selectsql = selectsql + s" if (${leftTable}.${cols(i)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
}
"select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,41 +160,92 @@ case class HoodieFileIndex(
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
}

private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]): Set[String] = {
var allFiles: Set[String] = Set.empty
var candidateFiles: Set[String] = Set.empty
/**
* Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters}
* conditions, by leveraging custom Z-order index (Z-index) bearing "min", "max", "num_nulls" statistic
* for all clustered columns
*
* NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* @param dataFilters list of original data filters passed down from querying engine
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]): Option[Set[String]] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to drop Z-index in its name? Here we actually read from Z-index (folder name of which is also .zindex) so we'd neutralize those things across the board

val indexPath = metaClient.getZindexPath
val fs = metaClient.getFs
if (fs.exists(new Path(indexPath)) && dataFilters.nonEmpty) {
// try to load latest index table from index path
val candidateIndexTables = fs.listStatus(new Path(indexPath)).filter(_.isDirectory)
.map(_.getPath.getName).filter(f => completedCommits.contains(f)).sortBy(x => x)
if (candidateIndexTables.nonEmpty) {
val dataFrameOpt = try {
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
} catch {
case _: Throwable =>
logError("missing index skip data-skipping")
None
}

if (dataFrameOpt.isDefined) {
val indexSchema = dataFrameOpt.get.schema
val indexFiles = DataSkippingUtils.getIndexFiles(spark.sparkContext.hadoopConfiguration, new Path(indexPath, candidateIndexTables.last).toString)
val indexFilter = dataFilters.map(DataSkippingUtils.createZindexFilter(_, indexSchema)).reduce(And)
logInfo(s"index filter condition: $indexFilter")
dataFrameOpt.get.persist()
if (indexFiles.size <= 4) {
allFiles = DataSkippingUtils.readParquetFile(spark, indexFiles)
} else {
allFiles = dataFrameOpt.get.select("file").collect().map(_.getString(0)).toSet
}
candidateFiles = dataFrameOpt.get.filter(new Column(indexFilter)).select("file").collect().map(_.getString(0)).toSet
dataFrameOpt.get.unpersist()
}
}
if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || dataFilters.isEmpty) {
// scalastyle:off return
return Option.empty
// scalastyle:on return
}

// Collect all index tables present in `.zindex` folder
val candidateIndexTables =
fs.listStatus(new Path(indexPath))
.filter(_.isDirectory)
.map(_.getPath.getName)
.filter(f => completedCommits.contains(f))
.sortBy(x => x)

if (candidateIndexTables.isEmpty) {
// scalastyle:off return
return Option.empty
// scalastyle:on return
}

val dataFrameOpt = try {
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
} catch {
case t: Throwable =>
logError("Failed to read Z-index; skipping", t)
None
}
allFiles -- candidateFiles

dataFrameOpt.map(df => {
val indexSchema = df.schema
val indexFilter =
dataFilters.map(DataSkippingUtils.createZIndexLookupFilter(_, indexSchema))
.reduce(And)

logInfo(s"Index filter condition: $indexFilter")

df.persist()

val allIndexedFileNames =
df.select("file")
.collect()
.map(_.getString(0))
.toSet

val prunedCandidateFileNames =
df.filter(new Column(indexFilter))
.select("file")
.collect()
.map(_.getString(0))
.toSet

df.unpersist()

// NOTE: Z-index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to touching all of the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (Z-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in Z-index are included in the output of this method
val notIndexedFileNames =
lookupFileNamesMissingFromIndex(allIndexedFileNames)

prunedCandidateFileNames ++ notIndexedFileNames
})
}

private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
}

/**
Expand All @@ -206,18 +257,22 @@ case class HoodieFileIndex(
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// try to load filterFiles from index
val filterFiles: Set[String] = if (enableDataSkipping()) {
filterFilesByDataSkippingIndex(dataFilters)
} else {
Set.empty
}
// Look up candidate files names in the Z-index, if all of the following conditions are true
// - Data-skipping is enabled
// - Z-index is present
// - List of predicates (filters) is present
val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesNamesInZIndex(dataFilters)

logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}")

if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
val candidateFiles = if (!filterFiles.isEmpty) {
allFiles.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
} else {
allFiles
}
// Filter in candidate files based on the Z-index lookup
val candidateFiles =
allFiles.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So by default - data skipping being off, this is how none of the files in allFiles are filtered out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))
)

logInfo(s"Total files : ${allFiles.size}," +
s" candidate files after data skipping: ${candidateFiles.size} " +
s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Expand All @@ -236,11 +291,13 @@ case class HoodieFileIndex(
null
}
}).filterNot(_ == null)
val candidateFiles = if (!filterFiles.isEmpty) {
baseFileStatuses.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
} else {
baseFileStatuses
}

// Filter in candidate files based on the Z-index lookup
val candidateFiles =
baseFileStatuses.filter(fileStatus =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)))

totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(partition.values, candidateFiles)
Expand Down
Loading