Skip to content

Commit

Permalink
refine UT framework to promote GPU evaluation
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
  • Loading branch information
binmahone committed May 21, 2024
1 parent 6921dac commit 2ffcb81
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,13 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.stringConf
.createWithDefault(false.toString)

val FOLDABLE_NON_LIT_ALLOWED = conf("spark.rapids.sql.test.isFoldableNonLitAllowed")
.doc("Only to be used in tests. If `true` the foldable expressions that are not literals " +
"will be allowed")
.internal()
.booleanConf
.createWithDefault(false)

val TEST_CONF = conf("spark.rapids.sql.test.enabled")
.doc("Intended to be used by unit tests, if enabled all operations must run on the " +
"GPU or an error happens.")
Expand Down Expand Up @@ -2428,6 +2435,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isTestEnabled: Boolean = get(TEST_CONF)

lazy val isFoldableNonLitAllowed: Boolean = get(FOLDABLE_NON_LIT_ALLOWED)

/**
* Convert a string value to the injection configuration OomInjection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ abstract class BaseExprMeta[INPUT <: Expression](
case _ => ExpressionContext.getRegularOperatorContext(this)
}

val isFoldableNonLitAllowed: Boolean = false
val isFoldableNonLitAllowed: Boolean = conf.isFoldableNonLitAllowed

// There are 4 levels of timezone check in GPU plan tag phase:
// Level 1: Check whether an expression is related to timezone. This is achieved by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ object RapidsSQLTestsBaseTrait {
"org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback")
.set("spark.sql.warehouse.dir", warehouse)
.set("spark.sql.cache.serializer", "com.nvidia.spark.ParquetCachedBatchSerializer")
.set("spark.sql.session.timeZone", "UTC")
.set("spark.rapids.sql.explain", "ALL")
.setAppName("rapids spark plugin running Vanilla Spark UT")

conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.utils

import org.apache.spark.sql.rapids.suites.{RapidsCastSuite, RapidsDataFrameAggregateSuite, RapidsJsonFunctionsSuite, RapidsJsonSuite, RapidsMathFunctionsSuite, RapidsRegexpExpressionsSuite, RapidsStringExpressionsSuite, RapidsStringFunctionsSuite}
import org.apache.spark.sql.rapids.suites.{RapidsCastSuite, RapidsDataFrameAggregateSuite, RapidsJsonExpressionsSuite, RapidsJsonFunctionsSuite, RapidsJsonSuite, RapidsMathFunctionsSuite, RapidsRegexpExpressionsSuite, RapidsStringExpressionsSuite, RapidsStringFunctionsSuite}

// Some settings' line length exceeds 100
// scalastyle:off line.size.limit
Expand All @@ -41,6 +41,22 @@ class RapidsTestSettings extends BackendTestSettings {
.exclude("SPARK-17641: collect functions should not collect null values", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10772"))
.exclude("SPARK-19471: AggregationIterator does not initialize the generated result projection before using it", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10772"))
.exclude("SPARK-24788: RelationalGroupedDataset.toString with unresolved exprs should not fail", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10801"))
enableSuite[RapidsJsonExpressionsSuite]
.exclude("from_json - invalid data", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("from_json - input=empty array, schema=struct, output=single row with null", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("from_json - input=empty object, schema=struct, output=single row with null", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("SPARK-20549: from_json bad UTF-8", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("from_json with timestamp", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json - array", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json - array with single empty row", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json - empty array", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json with timestamp", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("SPARK-21513: to_json support map[string, struct] to json", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("SPARK-21513: to_json support map[struct, struct] to json", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("SPARK-21513: to_json support map[string, integer] to json", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json - array with maps", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("to_json - array with single map", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
.exclude("from_json missing fields", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10849"))
enableSuite[RapidsJsonFunctionsSuite]
enableSuite[RapidsJsonSuite]
.exclude("Casting long as timestamp", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10773"))
Expand All @@ -58,24 +74,11 @@ class RapidsTestSettings extends BackendTestSettings {
.exclude("SPARK-37360: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10773"))
enableSuite[RapidsMathFunctionsSuite]
enableSuite[RapidsRegexpExpressionsSuite]
.exclude("RegexReplace", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10774"))
.exclude("RegexExtract", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10774"))
.exclude("RegexExtractAll", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10774"))
.exclude("SPLIT", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10774"))
enableSuite[RapidsStringExpressionsSuite]
.exclude("SPARK-22498: Concat should not generate codes beyond 64KB", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("SPARK-22549: ConcatWs should not generate codes beyond 64KB", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("SPARK-22550: Elt should not generate codes beyond 64KB", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("StringComparison", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("Substring", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("ascii for string", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("base64/unbase64 for string", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("encode/decode for string", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("SPARK-22603: FormatString should not generate codes beyond 64KB", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("LOCATE", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("LPAD/RPAD", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("REPEAT", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("length for string / binary", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
.exclude("ParseUrl", KNOWN_ISSUE("https://github.com/NVIDIA/spark-rapids/issues/10775"))
enableSuite[RapidsStringFunctionsSuite]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,29 @@ spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.utils

import java.io.File
import java.util.TimeZone

import com.nvidia.spark.rapids.{GpuProjectExec, TestStats}
import org.apache.commons.io.{FileUtils => fu}
import org.apache.commons.math3.util.Precision
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, ConvertToLocalRelation, NullPropagation}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, ConvertToLocalRelation}
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData, TypeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.utils.RapidsQueryTestUtil.isNaNOrInf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String


trait RapidsTestsTrait extends RapidsTestsCommonTrait {

val originalTimeZone = TimeZone.getDefault

override def beforeAll(): Unit = {
// prepare working paths
val basePathDir = new File(basePath)
Expand All @@ -54,9 +55,12 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
super.beforeAll()
initializeSession()
_spark.sparkContext.setLogLevel("WARN")
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
}

override def afterAll(): Unit = {
TimeZone.setDefault(originalTimeZone)

try {
super.afterAll()
} finally {
Expand Down Expand Up @@ -91,12 +95,17 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
.config(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName +
"," + ConstantFolding.ruleName + "," + NullPropagation.ruleName)
"," + ConstantFolding.ruleName)
.config("spark.rapids.sql.enabled", "true")
.config("spark.plugins", "com.nvidia.spark.SQLPlugin")
.config("spark.sql.queryExecutionListeners",
"org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback")
.config("spark.sql.warehouse.dir", warehouse)
.config("spark.sql.session.timeZone","UTC")
.config("spark.rapids.sql.explain", "ALL")
.config("spark.rapids.sql.test.isFoldableNonLitAllowed", "true")
// uncomment below config to run `strict mode`, where fallback to CPU is treated as fail
// .config("spark.rapids.sql.test.enabled", "true")
.appName("rapids spark plugin running Vanilla Spark UT")

_spark = sparkBuilder
Expand All @@ -115,31 +124,20 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
val expr = resolver.resolveTimeZones(expression)
assert(expr.resolved)

if (canConvertToDataFrame(inputRow)) {
rapidsCheckExpression(expr, expected, inputRow)
} else {
logWarning(s"The status of this unit test is not guaranteed.")
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
checkEvaluationWithMutableProjection(expr, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkEvaluationWithUnsafeProjection(expr, catalystValue, inputRow)
}
checkEvaluationWithOptimization(expr, catalystValue, inputRow)
}
rapidsCheckExpression(expr, expected, inputRow)
}

/**
* Sort map data by key and return the sorted key array and value array.
*
* @param input
* : input map data.
* : input map data.
* @param kt
* : key type.
* : key type.
* @param vt
* : value type.
* : value type.
* @return
* the sorted key array and value array.
* the sorted key array and value array.
*/
private def getSortedArrays(
input: MapData,
Expand Down Expand Up @@ -202,7 +200,7 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
case (result: Double, expected: Double) =>
if (
(isNaNOrInf(result) || isNaNOrInf(expected))
|| (result == -0.0) || (expected == -0.0)
|| (result == -0.0) || (expected == -0.0)
) {
java.lang.Double.doubleToRawLongBits(result) ==
java.lang.Double.doubleToRawLongBits(expected)
Expand All @@ -221,20 +219,23 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
RapidsTestConstants.SUPPORTED_DATA_TYPE.acceptsType(expr.dataType)
}

def rapidsCheckExpression(expression: Expression, expected: Any, inputRow: InternalRow): Unit = {
val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
convertInternalRowToDataFrame(inputRow)
} else {
val schema = StructType(StructField("a", IntegerType, nullable = true) :: Nil)
val empData = Seq(Row(1))
_spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
def rapidsCheckExpression(origExpr: Expression, expected: Any, inputRow: InternalRow): Unit = {
// many of of the expressions in RAPIDS do not support
// vectorized parameters. (e.g. regexp_replace)
// So we downgrade all expression
// evaluation to use scalar parameters.
// In a follow-up issue we'll take care of the expressions
// those already support vectorized paramters.
val expression = origExpr.transformUp {
case BoundReference(ordinal, dataType, _) =>
Literal(inputRow.asInstanceOf[GenericInternalRow].get(ordinal, dataType), dataType)
}
val resultDF = df.select(Column(expression))
val resultDF = _spark.range(0, 1).select(Column(expression))
val result = resultDF.collect()
TestStats.testUnitNumber = TestStats.testUnitNumber + 1
if (
checkDataTypeSupported(expression) &&
expression.children.forall(checkDataTypeSupported)
expression.children.forall(checkDataTypeSupported)
) {
val projectTransformer = resultDF.queryExecution.executedPlan.collect {
case p: GpuProjectExec => p
Expand All @@ -254,13 +255,13 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
if (
!(checkResult(result.head.get(0), expected, expression.dataType, expression.nullable)
|| checkResult(
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
result.head.get(0)
), // decimal precision is wrong from value
CatalystTypeConverters.convertToCatalyst(expected),
expression.dataType,
expression.nullable
))
CatalystTypeConverters.createToCatalystConverter(expression.dataType)(
result.head.get(0)
), // decimal precision is wrong from value
CatalystTypeConverters.convertToCatalyst(expected),
expression.dataType,
expression.nullable
))
) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(
Expand Down Expand Up @@ -292,44 +293,4 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
}
true
}

def convertInternalRowToDataFrame(inputRow: InternalRow): DataFrame = {
val structFileSeq = new ArrayBuffer[StructField]()
val values = inputRow match {
case genericInternalRow: GenericInternalRow =>
genericInternalRow.values
case _ => throw new UnsupportedOperationException("Unsupported InternalRow.")
}
values.foreach {
case boolean: java.lang.Boolean =>
structFileSeq.append(StructField("bool", BooleanType, boolean == null))
case short: java.lang.Short =>
structFileSeq.append(StructField("i16", ShortType, short == null))
case byte: java.lang.Byte =>
structFileSeq.append(StructField("i8", ByteType, byte == null))
case integer: java.lang.Integer =>
structFileSeq.append(StructField("i32", IntegerType, integer == null))
case long: java.lang.Long =>
structFileSeq.append(StructField("i64", LongType, long == null))
case float: java.lang.Float =>
structFileSeq.append(StructField("fp32", FloatType, float == null))
case double: java.lang.Double =>
structFileSeq.append(StructField("fp64", DoubleType, double == null))
case utf8String: UTF8String =>
structFileSeq.append(StructField("str", StringType, utf8String == null))
case byteArr: Array[Byte] =>
structFileSeq.append(StructField("vbin", BinaryType, byteArr == null))
case decimal: Decimal =>
structFileSeq.append(
StructField("dec", DecimalType(decimal.precision, decimal.scale), decimal == null))
case null =>
structFileSeq.append(StructField("null", IntegerType, nullable = true))
case unsupported @ _ =>
throw new UnsupportedOperationException(s"Unsupported type: ${unsupported.getClass}")
}
val fields = structFileSeq.toSeq
_spark.internalCreateDataFrame(
_spark.sparkContext.parallelize(Seq(inputRow)),
StructType(fields))
}
}

0 comments on commit 2ffcb81

Please sign in to comment.