From 60b23b979739601f413cdc30da1eae0c18fcb5e6 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 24 Nov 2021 10:10:28 -0800 Subject: [PATCH] [HUDI-2788] Fixing issues w/ Z-order Layout Optimization (#4026) * Simplyfying, tidying up * Fixed packaging for `TestOptimizeTable` * Cleaned up `HoodiFileIndex` file filtering seq; Removed optimization manually reading Parquet table circumventing Spark * Refactored `DataSkippingUtils`: - Fixed checks to validate all statistics cols are present - Fixed some predicates being constructed incorrectly - Rewrote comments for easier comprehension, added more notes - Tidying up * Tidying up tests * `lint` * Fixing compilation * `TestOptimizeTable` > `TestTableLayoutOptimization`; Added assertions to test data skipping paths * Fixed tests to properly hit data-skipping path * Fixed pruned files candidates lookup seq to conservatively included all non-indexed files * Added java-doc * Fixed compilation --- ...atialCurveOptimizationSortPartitioner.java | 2 +- .../apache/spark/ZCurveOptimizeHelper.java | 81 ++++----- .../org/apache/hudi/HoodieSparkUtils.scala | 4 +- .../org/apache/hudi/HoodieFileIndex.scala | 151 +++++++++++----- .../spark/sql/hudi/DataSkippingUtils.scala | 163 +++++++++++------- .../TestTableLayoutOptimization.scala} | 71 +++++--- 6 files changed, 290 insertions(+), 182 deletions(-) rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/{TestOptimizeTable.scala => functional/TestTableLayoutOptimization.scala} (83%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index fa12159eeac6..b809f424342b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -39,7 +39,7 @@ import org.apache.spark.sql.Row; /** - * A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition. + * A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition. * support z-curve optimization, hilbert will come soon. * @param HoodieRecordPayload type */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java index d2bd257f1ba4..72274c248105 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java @@ -189,11 +189,10 @@ public static Dataset getMinMaxValue(Dataset df, List cols) { SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); int numParallelism = (scanFiles.size() / 3 + 1); - List> colMinMaxInfos = new ArrayList<>(); + List> colMinMaxInfos; String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); try { - String description = "Listing parquet column statistics"; - jsc.setJobDescription(description); + jsc.setJobDescription("Listing parquet column statistics"); colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { Configuration conf = serializableConfiguration.value(); ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); @@ -209,7 +208,7 @@ public static Dataset getMinMaxValue(Dataset df, List cols) { } Map>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); - JavaRDD allMetaDataRDD = jsc.parallelize(fileToStatsListMap.values().stream().collect(Collectors.toList()), 1).map(f -> { + JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> { int colSize = f.size(); if (colSize == 0) { return null; @@ -299,50 +298,54 @@ public static void saveStatisticsInfo(Dataset df, String cols, String index Dataset statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols); // try to find last validate index table from index path try { - if (fs.exists(new Path(indexPath))) { - List allIndexTables = Arrays - .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); - List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); - List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); - Option latestIndexData = Option.empty(); - if (!candidateIndexTables.isEmpty()) { - latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); - // clean old index table, keep at most 1 index table. - candidateIndexTables.remove(candidateIndexTables.size() - 1); - candidateIndexTables.forEach(f -> { - try { - fs.delete(new Path(indexPath, f)); - } catch (IOException ie) { - throw new HoodieException(ie); - } - }); - } + // If there's currently no index, create one + if (!fs.exists(new Path(indexPath))) { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + return; + } - // clean residualTables - // retried cluster operations at the same instant time is also considered, - // the residual files produced by retried are cleaned up before save statistics - // save statistics info to index table which named commitTime - residualTables.forEach(f -> { + // Otherwise, clean up all indexes but the most recent one + + List allIndexTables = Arrays + .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); + List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); + List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); + Option latestIndexData = Option.empty(); + if (!candidateIndexTables.isEmpty()) { + latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); + // clean old index table, keep at most 1 index table. + candidateIndexTables.remove(candidateIndexTables.size() - 1); + candidateIndexTables.forEach(f -> { try { fs.delete(new Path(indexPath, f)); } catch (IOException ie) { throw new HoodieException(ie); } }); + } - if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { - // update the statistics info - String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - latestIndexData.get().registerTempTable(originalTable); - statisticsDF.registerTempTable(updateTable); - // update table by full out join - List columns = Arrays.asList(statisticsDF.schema().fieldNames()); - spark.sql(HoodieSparkUtils$ - .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); - } else { - statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + // clean residualTables + // retried cluster operations at the same instant time is also considered, + // the residual files produced by retried are cleaned up before save statistics + // save statistics info to index table which named commitTime + residualTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); } + }); + + if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { + // update the statistics info + String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + latestIndexData.get().registerTempTable(originalTable); + statisticsDF.registerTempTable(updateTable); + // update table by full out join + List columns = Arrays.asList(statisticsDF.schema().fieldNames()); + spark.sql(HoodieSparkUtils$ + .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); } else { statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index f0217f800471..360a080899ad 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -298,8 +298,8 @@ object HoodieSparkUtils extends SparkAdapterSupport { */ def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = { var selectsql = "" - for (i <- (0 to cols.size-1)) { - selectsql = selectsql + s" if (${leftTable}.${cols(0)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ," + for (i <- cols.indices) { + selectsql = selectsql + s" if (${leftTable}.${cols(i)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ," } "select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}" } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index bc6867453686..bc4557f9cf95 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -160,41 +160,92 @@ case class HoodieFileIndex( spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean } - private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]): Set[String] = { - var allFiles: Set[String] = Set.empty - var candidateFiles: Set[String] = Set.empty + /** + * Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters} + * conditions, by leveraging custom Z-order index (Z-index) bearing "min", "max", "num_nulls" statistic + * for all clustered columns + * + * NOTE: This method has to return complete set of candidate files, since only provided candidates will + * ultimately be scanned as part of query execution. Hence, this method has to maintain the + * invariant of conservatively including every base-file's name, that is NOT referenced in its index. + * + * @param dataFilters list of original data filters passed down from querying engine + * @return list of pruned (data-skipped) candidate base-files' names + */ + private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]): Option[Set[String]] = { val indexPath = metaClient.getZindexPath val fs = metaClient.getFs - if (fs.exists(new Path(indexPath)) && dataFilters.nonEmpty) { - // try to load latest index table from index path - val candidateIndexTables = fs.listStatus(new Path(indexPath)).filter(_.isDirectory) - .map(_.getPath.getName).filter(f => completedCommits.contains(f)).sortBy(x => x) - if (candidateIndexTables.nonEmpty) { - val dataFrameOpt = try { - Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) - } catch { - case _: Throwable => - logError("missing index skip data-skipping") - None - } - if (dataFrameOpt.isDefined) { - val indexSchema = dataFrameOpt.get.schema - val indexFiles = DataSkippingUtils.getIndexFiles(spark.sparkContext.hadoopConfiguration, new Path(indexPath, candidateIndexTables.last).toString) - val indexFilter = dataFilters.map(DataSkippingUtils.createZindexFilter(_, indexSchema)).reduce(And) - logInfo(s"index filter condition: $indexFilter") - dataFrameOpt.get.persist() - if (indexFiles.size <= 4) { - allFiles = DataSkippingUtils.readParquetFile(spark, indexFiles) - } else { - allFiles = dataFrameOpt.get.select("file").collect().map(_.getString(0)).toSet - } - candidateFiles = dataFrameOpt.get.filter(new Column(indexFilter)).select("file").collect().map(_.getString(0)).toSet - dataFrameOpt.get.unpersist() - } - } + if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || dataFilters.isEmpty) { + // scalastyle:off return + return Option.empty + // scalastyle:on return + } + + // Collect all index tables present in `.zindex` folder + val candidateIndexTables = + fs.listStatus(new Path(indexPath)) + .filter(_.isDirectory) + .map(_.getPath.getName) + .filter(f => completedCommits.contains(f)) + .sortBy(x => x) + + if (candidateIndexTables.isEmpty) { + // scalastyle:off return + return Option.empty + // scalastyle:on return + } + + val dataFrameOpt = try { + Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) + } catch { + case t: Throwable => + logError("Failed to read Z-index; skipping", t) + None } - allFiles -- candidateFiles + + dataFrameOpt.map(df => { + val indexSchema = df.schema + val indexFilter = + dataFilters.map(DataSkippingUtils.createZIndexLookupFilter(_, indexSchema)) + .reduce(And) + + logInfo(s"Index filter condition: $indexFilter") + + df.persist() + + val allIndexedFileNames = + df.select("file") + .collect() + .map(_.getString(0)) + .toSet + + val prunedCandidateFileNames = + df.filter(new Column(indexFilter)) + .select("file") + .collect() + .map(_.getString(0)) + .toSet + + df.unpersist() + + // NOTE: Z-index isn't guaranteed to have complete set of statistics for every + // base-file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to touching all of the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (Z-index) + // files and all outstanding base-files, and make sure that all base files not + // represented w/in Z-index are included in the output of this method + val notIndexedFileNames = + lookupFileNamesMissingFromIndex(allIndexedFileNames) + + prunedCandidateFileNames ++ notIndexedFileNames + }) + } + + private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { + val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet + allBaseFileNames -- allIndexedFileNames } /** @@ -206,18 +257,22 @@ case class HoodieFileIndex( */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - // try to load filterFiles from index - val filterFiles: Set[String] = if (enableDataSkipping()) { - filterFilesByDataSkippingIndex(dataFilters) - } else { - Set.empty - } + // Look up candidate files names in the Z-index, if all of the following conditions are true + // - Data-skipping is enabled + // - Z-index is present + // - List of predicates (filters) is present + val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesNamesInZIndex(dataFilters) + + logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") + if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. - val candidateFiles = if (!filterFiles.isEmpty) { - allFiles.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName)) - } else { - allFiles - } + // Filter in candidate files based on the Z-index lookup + val candidateFiles = + allFiles.filter(fileStatus => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) + ) + logInfo(s"Total files : ${allFiles.size}," + s" candidate files after data skipping: ${candidateFiles.size} " + s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") @@ -236,11 +291,13 @@ case class HoodieFileIndex( null } }).filterNot(_ == null) - val candidateFiles = if (!filterFiles.isEmpty) { - baseFileStatuses.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName)) - } else { - baseFileStatuses - } + + // Filter in candidate files based on the Z-index lookup + val candidateFiles = + baseFileStatuses.filter(fileStatus => + // NOTE: This predicate is true when {@code Option} is empty + candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))) + totalFileSize += baseFileStatuses.size candidateFileSize += candidateFiles.size PartitionDirectory(partition.values, candidateFiles) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index d26b669dd151..90980395aebf 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -36,120 +36,153 @@ import scala.collection.JavaConverters._ object DataSkippingUtils { /** - * create z_index filter and push those filters to index table to filter all candidate scan files. - * @param condition origin filter from query. - * @param indexSchema schema from index table. - * @return filters for index table. - */ - def createZindexFilter(condition: Expression, indexSchema: StructType): Expression = { - def buildExpressionInternal(colName: Seq[String], statisticValue: String): Expression = { - val appendColName = UnresolvedAttribute(colName).name + statisticValue - col(appendColName).expr - } - - def reWriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = { - val appendColName = UnresolvedAttribute(colName).name + "_minValue" - if (indexSchema.exists(p => p.name == appendColName)) { + * Translates provided {@link filterExpr} into corresponding filter-expression for Z-index index table + * to filter out candidate files that would hold records matching the original filter + * + * @param filterExpr original filter from query + * @param indexSchema index table schema + * @return filter for Z-index table + */ + def createZIndexLookupFilter(filterExpr: Expression, indexSchema: StructType): Expression = { + + def rewriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = { + val stats = Set.apply( + UnresolvedAttribute(colName).name + "_minValue", + UnresolvedAttribute(colName).name + "_maxValue", + UnresolvedAttribute(colName).name + "_num_nulls" + ) + + if (stats.forall(stat => indexSchema.exists(_.name == stat))) { conditionExpress } else { Literal.TrueLiteral } } - val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_minValue") - val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_maxValue") - val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, "_num_nulls") + def refColExpr(colName: Seq[String], statisticValue: String): Expression = + col(UnresolvedAttribute(colName).name + statisticValue).expr + + def minValue(colName: Seq[String]) = refColExpr(colName, "_minValue") + def maxValue(colName: Seq[String]) = refColExpr(colName, "_maxValue") + def numNulls(colName: Seq[String]) = refColExpr(colName, "_num_nulls") - condition match { - // query filter "colA = b" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table + def colContainsValuesEqualToLiteral(colName: Seq[String], value: Literal) = + And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)) + + def colContainsValuesEqualToLiterals(colName: Seq[String], list: Seq[Literal]) = + list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or) + + filterExpr match { + // Filter "colA = b" + // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup case EqualTo(attribute: AttributeReference, value: Literal) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))) - // query filter "b = colA" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table + rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value)) + // Filter "b = colA" + // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup case EqualTo(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))) - // query filter "colA = null" convert it to "colA_num_nulls = null" for index table + rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value)) + // Filter "colA = null" + // Translates to "colA_num_nulls = null" for index lookup case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) => val colName = getTargetColNameParts(equalNullSafe.left) - reWriteCondition(colName, EqualTo(num_nulls(colName), equalNullSafe.right)) - // query filter "colA < b" convert it to "colA_minValue < b" for index table + rewriteCondition(colName, EqualTo(numNulls(colName), equalNullSafe.right)) + // Filter "colA < b" + // Translates to "colA_minValue < b" for index lookup case LessThan(attribute: AttributeReference, value: Literal) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName,LessThan(minValue(colName), value)) - // query filter "b < colA" convert it to "colA_maxValue > b" for index table + rewriteCondition(colName, LessThan(minValue(colName), value)) + // Filter "b < colA" + // Translates to "b < colA_maxValue" for index lookup case LessThan(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, GreaterThan(maxValue(colName), value)) - // query filter "colA > b" convert it to "colA_maxValue > b" for index table + rewriteCondition(colName, GreaterThan(maxValue(colName), value)) + // Filter "colA > b" + // Translates to "colA_maxValue > b" for index lookup case GreaterThan(attribute: AttributeReference, value: Literal) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, GreaterThan(maxValue(colName), value)) - // query filter "b > colA" convert it to "colA_minValue < b" for index table + rewriteCondition(colName, GreaterThan(maxValue(colName), value)) + // Filter "b > colA" + // Translates to "b > colA_minValue" for index lookup case GreaterThan(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, LessThan(minValue(colName), value)) - // query filter "colA <= b" convert it to "colA_minValue <= b" for index table + rewriteCondition(colName, LessThan(minValue(colName), value)) + // Filter "colA <= b" + // Translates to "colA_minValue <= b" for index lookup case LessThanOrEqual(attribute: AttributeReference, value: Literal) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, LessThanOrEqual(minValue(colName), value)) - // query filter "b <= colA" convert it to "colA_maxValue >= b" for index table + rewriteCondition(colName, LessThanOrEqual(minValue(colName), value)) + // Filter "b <= colA" + // Translates to "b <= colA_maxValue" for index lookup case LessThanOrEqual(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value)) - // query filter "colA >= b" convert it to "colA_maxValue >= b" for index table + rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value)) + // Filter "colA >= b" + // Translates to "colA_maxValue >= b" for index lookup case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right)) - // query filter "b >= colA" convert it to "colA_minValue <= b" for index table + rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right)) + // Filter "b >= colA" + // Translates to "b >= colA_minValue" for index lookup case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, LessThanOrEqual(minValue(colName), value)) - // query filter "colA is null" convert it to "colA_num_nulls > 0" for index table + rewriteCondition(colName, LessThanOrEqual(minValue(colName), value)) + // Filter "colA is null" + // Translates to "colA_num_nulls > 0" for index lookup case IsNull(attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0))) - // query filter "colA is not null" convert it to "colA_num_nulls = 0" for index table + rewriteCondition(colName, GreaterThan(numNulls(colName), Literal(0))) + // Filter "colA is not null" + // Translates to "colA_num_nulls = 0" for index lookup case IsNotNull(attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0))) - // query filter "colA in (a,b)" convert it to " (colA_minValue <= a and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for index table + rewriteCondition(colName, EqualTo(numNulls(colName), Literal(0))) + // Filter "colA in (a, b, ...)" + // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR (colA_minValue <= b AND colA_maxValue >= b)" for index lookup case In(attribute: AttributeReference, list: Seq[Literal]) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, list.map { lit => - And(LessThanOrEqual(minValue(colName), lit), GreaterThanOrEqual(maxValue(colName), lit)) - }.reduce(Or)) - // query filter "colA like xxx" convert it to " (colA_minValue <= xxx and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with xxx) " for index table + rewriteCondition(colName, colContainsValuesEqualToLiterals(colName, list)) + // Filter "colA like xxx" + // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for index lookup + // NOTE: That this operator only matches string prefixes, and this is + // essentially equivalent to "colA = b" expression case StartsWith(attribute, v @ Literal(_: UTF8String, _)) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), v), GreaterThanOrEqual(maxValue(colName), v)) , - Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), v)))) - // query filter "colA not in (a, b)" convert it to " (not( colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and colA_maxValue = b)) " for index table + rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, v)) + // Filter "colA not in (a, b, ...)" + // Translates to "(colA_minValue > a OR colA_maxValue < a) AND (colA_minValue > b OR colA_maxValue < b)" for index lookup + // NOTE: This is an inversion of `in (a, b, ...)` expr case Not(In(attribute: AttributeReference, list: Seq[Literal])) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, list.map { lit => - Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), lit))) - }.reduce(And)) - // query filter "colA != b" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table + rewriteCondition(colName, Not(colContainsValuesEqualToLiterals(colName, list))) + // Filter "colA != b" + // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup + // NOTE: This is an inversion of `colA = b` expr case Not(EqualTo(attribute: AttributeReference, value: Literal)) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value)))) - // query filter "b != colA" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table + rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) + // Filter "b != colA" + // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup + // NOTE: This is an inversion of `colA != b` expr case Not(EqualTo(value: Literal, attribute: AttributeReference)) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value)))) - // query filter "colA not like xxxx" convert it to "not ( colA_minValue startWith xxx and colA_maxValue startWith xxx)" for index table + rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) + // Filter "colA not like xxx" + // Translates to "!(colA_minValue <= xxx AND colA_maxValue >= xxx)" for index lookup + // NOTE: This is a inversion of "colA like xxx" assuming that colA is a string-based type case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) => val colName = getTargetColNameParts(attribute) - reWriteCondition(colName, Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value)))) + rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) + case or: Or => - val resLeft = createZindexFilter(or.left, indexSchema) - val resRight = createZindexFilter(or.right, indexSchema) + val resLeft = createZIndexLookupFilter(or.left, indexSchema) + val resRight = createZIndexLookupFilter(or.right, indexSchema) Or(resLeft, resRight) case and: And => - val resLeft = createZindexFilter(and.left, indexSchema) - val resRight = createZindexFilter(and.right, indexSchema) + val resLeft = createZIndexLookupFilter(and.left, indexSchema) + val resRight = createZIndexLookupFilter(and.right, indexSchema) And(resLeft, resRight) case expr: Expression => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala similarity index 83% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala index b0fcbec27d07..2fddefb32180 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -18,28 +18,30 @@ package org.apache.hudi.functional -import java.sql.{Date, Timestamp} - import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.spark.ZCurveOptimizeHelper import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ import scala.util.Random -class TestOptimizeTable extends HoodieClientTestBase { - var spark: SparkSession = null +@Tag("functional") +class TestTableLayoutOptimization extends HoodieClientTestBase { + var spark: SparkSession = _ val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", @@ -67,11 +69,13 @@ class TestOptimizeTable extends HoodieClientTestBase { @ParameterizedTest @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testOptimizewithClustering(tableType: String): Unit = { + def testOptimizeWithClustering(tableType: String): Unit = { + val targetRecordsCount = 10000 // Bulk Insert Operation - val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") + val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList + val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + writeDf.write.format("org.apache.hudi") .options(commonOpts) .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) @@ -83,30 +87,41 @@ class TestOptimizeTable extends HoodieClientTestBase { .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 *1024 * 1024L)) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") .mode(SaveMode.Overwrite) .save(basePath) - assertEquals(1000, spark.read.format("hudi").load(basePath).count()) - // use unsorted col as filter. - assertEquals(spark.read - .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count(), - spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") - .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count()) - // use sorted col as filter. - assertEquals(spark.read.format("hudi").load(basePath) - .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count(), - spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") - .format("hudi").load(basePath) - .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count()) - // use sorted cols and unsorted cols as filter - assertEquals(spark.read.format("hudi").load(basePath) - .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count(), - spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") - .format("hudi").load(basePath) - .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count()) + val readDf = + spark.read + .format("hudi") + .load(basePath) + + val readDfSkip = + spark.read + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi") + .load(basePath) + + assertEquals(targetRecordsCount, readDf.count()) + assertEquals(targetRecordsCount, readDfSkip.count()) + + readDf.createOrReplaceTempView("hudi_snapshot_raw") + readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") + + def select(tableName: String) = + spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") + + assertRowsMatch( + select("hudi_snapshot_raw"), + select("hudi_snapshot_skipping") + ) + } + + def assertRowsMatch(one: DataFrame, other: DataFrame) = { + val rows = one.count() + assert(rows == other.count() && one.intersect(other).count() == rows) } @Test