Skip to content

Commit

Permalink
[Delta] fix and refine Delta UT
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored and 寻径 committed Dec 14, 2023
1 parent 211fee4 commit f21cba8
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ object ScanTransformerFactory {
} else {
ExpressionConverter.transformDynamicPruningExpr(scanExec.partitionFilters, reuseSubquery)
}
val fileIndex = scanExec.relation.location
lookupDataSourceScanTransformer(fileIndex.getClass.getName) match {
val fileFormat = scanExec.relation.fileFormat
lookupDataSourceScanTransformer(fileFormat.getClass.getName) match {
case Some(clz) =>
clz
.getDeclaredConstructor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
* Why we need a GlutenQueryTest when we already have QueryTest?
* 1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare compares double
*/
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -41,6 +42,20 @@ abstract class GlutenQueryTest extends PlanTest {

protected def spark: SparkSession

def testWithSpecifiedSparkVersion(
testName: String,
minSparkVersion: Option[String] = None,
maxSparkVersion: Option[String] = None)(testFun: => Any): Unit = {
if (
minSparkVersion.forall(_ <= SPARK_VERSION_SHORT)
&& maxSparkVersion.forall(_ >= SPARK_VERSION_SHORT)
) {
test(testName) {
testFun
}
}
}

/** Runs the plan and makes sure the answer contains all of the keywords. */
def checkKeywordsExist(df: DataFrame, keywords: String*): Unit = {
val outputs = df.collect().map(_.mkString).mkString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension
package io.glutenproject.execution

import io.glutenproject.execution.FileSourceScanExecTransformer
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.extension

import io.glutenproject.execution.{DataSourceScanTransformerRegister, FileSourceScanExecTransformer}
package io.glutenproject.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec

class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.spark.sql.delta.files.TahoeLogFileIndex"
override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package io.glutenproject.extension

import io.glutenproject.execution.{FileSourceScanExecTransformer, ProjectExecTransformer}
import io.glutenproject.execution.{DeltaScanTransformer, ProjectExecTransformer}
import io.glutenproject.extension.DeltaRewriteTransformerRules.columnMappingRule
import io.glutenproject.extension.columnar.TransformHints

Expand Down Expand Up @@ -53,7 +53,7 @@ object DeltaRewriteTransformerRules {
// If it enables Delta Column Mapping(e.g. nameMapping and idMapping),
// transform the metadata of Delta into Parquet's,
// so that gluten can read Delta File using Parquet Reader.
case p: FileSourceScanExecTransformer
case p: DeltaScanTransformer
if isDeltaColumnMappingFileFormat(p.relation.fileFormat) && notAppliedColumnMappingRule(
p) =>
transformColumnMappingPlan(p)
Expand All @@ -71,7 +71,7 @@ object DeltaRewriteTransformerRules {
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
*/
private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan match {
case plan: FileSourceScanExecTransformer =>
case plan: DeltaScanTransformer =>
val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat]
// a mapping between the table schemas name to parquet schemas.
val columnNameMapping = mutable.Map.empty[String, String]
Expand Down Expand Up @@ -136,7 +136,7 @@ object DeltaRewriteTransformerRules {
val newRequiredFields = plan.requiredSchema.map {
e => StructField(columnNameMapping(e.name), e.dataType, e.nullable, e.metadata)
}
val scanExecTransformer = new FileSourceScanExecTransformer(
val scanExecTransformer = new DeltaScanTransformer(
newFsRelation,
newOutput,
StructType(newRequiredFields),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
}

test("column mapping mode") {
// IdMapping is supported in Delta 2.2 (related to Spark3.3.1)
testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3.1")) {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
Expand All @@ -56,6 +57,24 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
checkAnswer(df2, Row("v2") :: Nil)
}

// NameMapping is supported in Delta 2.0 (related to Spark3.2.0)
testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2.0")) {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}

test("basic test with stats.skipping disabled") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
spark.sql(s"""
Expand Down

0 comments on commit f21cba8

Please sign in to comment.