Skip to content

Commit

Permalink
[Delta] make delta UTs compatible with multi spark versions
Browse files Browse the repository at this point in the history
  • Loading branch information
寻径 committed Dec 6, 2023
1 parent 0b74898 commit b0ca735
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
* Why we need a GlutenQueryTest when we already have QueryTest?
* 1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare compares double
*/
import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans._
Expand All @@ -41,6 +42,20 @@ abstract class GlutenQueryTest extends PlanTest {

protected def spark: SparkSession

def testWithSpecifiedSparkVersion(
testName: String,
minSparkVersion: Option[String] = None,
maxSparkVersion: Option[String] = None)(testFun: => Any): Unit = {
if (
minSparkVersion.forall(_ <= SPARK_VERSION_SHORT)
&& maxSparkVersion.forall(_ >= SPARK_VERSION_SHORT)
) {
test(testName) {
testFun
}
}
}

/** Runs the plan and makes sure the answer contains all of the keywords. */
def checkKeywordsExist(df: DataFrame, keywords: String*): Unit = {
val outputs = df.collect().map(_.mkString).mkString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
}

test("column mapping mode") {
// IdMapping is supported in Delta 2.2 (related to Spark3.3.1)
testWithSpecifiedSparkVersion("column mapping mode = id", Some("3.3.1")) {
spark.sql(s"""
|create table delta_cm1 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "id")
Expand All @@ -56,6 +57,24 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
checkAnswer(df2, Row("v2") :: Nil)
}

// NameMapping is supported in Delta 2.0 (related to Spark3.2.0)
testWithSpecifiedSparkVersion("column mapping mode = name", Some("3.2.0")) {
spark.sql(s"""
|create table delta_cm2 (id int, name string) using delta
|tblproperties ("delta.columnMapping.mode"= "name")
|""".stripMargin)
spark.sql(s"""
|insert into delta_cm2 values (1, "v1"), (2, "v2")
|""".stripMargin)
val df1 = runQueryAndCompare("select * from delta_cm2") { _ => }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)

val df2 = runQueryAndCompare("select name from delta_cm2 where id = 2") { _ => }
checkLengthAndPlan(df2, 1)
checkAnswer(df2, Row("v2") :: Nil)
}

test("basic test with stats.skipping disabled") {
withSQLConf("spark.databricks.delta.stats.skipping" -> "false") {
spark.sql(s"""
Expand Down

0 comments on commit b0ca735

Please sign in to comment.