Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [skip ci] Fuzz testing in Spark SQL #7625

Closed
wants to merge 87 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
6f2b909
Fix SPARK-9292.
JoshRosen Jul 24, 2015
03120d5
Check condition type in resolved()
JoshRosen Jul 24, 2015
e1f462e
Initial commit for SQL expression fuzzing harness
JoshRosen Jul 11, 2015
f8daec7
Apply implicit casts (in a hacky way for now)
JoshRosen Jul 11, 2015
df00e7a
More messy WIP prototyping on expression fuzzing
JoshRosen Jul 11, 2015
2dcbc10
Add some comments; speed up classpath search
JoshRosen Jul 11, 2015
c20a679
Move dummy type coercion to a helper method
JoshRosen Jul 11, 2015
95860de
More code cleanup and comments
JoshRosen Jul 11, 2015
abaed51
Use non-mutable interpreted projection.
JoshRosen Jul 11, 2015
129ad6c
Log expression after coercion
JoshRosen Jul 11, 2015
e1f91df
Run tests in deterministic order
JoshRosen Jul 13, 2015
adc3c7f
Test with random inputs of all types
JoshRosen Jul 23, 2015
ae5e151
Ignore BinaryType for now, since it led to some spurious failures.
JoshRosen Jul 23, 2015
a354208
Begin to add a DataFrame API fuzzer.
JoshRosen Jul 24, 2015
13f8c56
Don't puts nulls into the DataFrame
JoshRosen Jul 24, 2015
dd16f4d
Print logical plans.
JoshRosen Jul 24, 2015
7f2b771
Fuzzer improvements.
JoshRosen Jul 24, 2015
326d759
Fix SPARK-9293
JoshRosen Jul 24, 2015
4a2c684
Merge branch 'SPARK-9293' into fuzz-test
JoshRosen Jul 24, 2015
37e4ce8
Support methods that take varargs Column parameters.
JoshRosen Jul 24, 2015
558f04a
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 24, 2015
2f1b802
Add analysis rule to detect sorting on unsupported column types (SPAR…
JoshRosen Jul 24, 2015
c0889c0
Merge branch 'SPARK-9295' into fuzz-test
JoshRosen Jul 24, 2015
d7a3535
[SPARK-9303] Decimal should use java.math.Decimal directly instead of…
JoshRosen Jul 24, 2015
74bbc8c
Merge branch 'SPARK-9303' into fuzz-test
JoshRosen Jul 24, 2015
bfe1451
Update to allow sorting by null literals
JoshRosen Jul 24, 2015
7a7ec4d
Merge branch 'SPARK-9295' into fuzz-test
JoshRosen Jul 24, 2015
55221fa
Shouldn't use SortMergeJoin when joining on unsortable columns.
viirya Jul 24, 2015
a240707
Use forall instead of exists for readability.
viirya Jul 24, 2015
dc94314
Merge remote-tracking branch 'origin/pr/7645/head' into fuzz-test
JoshRosen Jul 24, 2015
16dfac9
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 24, 2015
a3f30bf
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 24, 2015
68c0e97
Commit some outstanding changes.
JoshRosen Jul 24, 2015
2d4ed76
Move to fuzzing package.
JoshRosen Jul 24, 2015
ac8dd74
Begin to clean up random DF generator
JoshRosen Jul 25, 2015
0b3938b
Add basic backtracking to improve chance of generating executable plan.
JoshRosen Jul 26, 2015
c836884
Hacky approach to try to execute child plans first.
JoshRosen Jul 26, 2015
fb1a666
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 26, 2015
396c235
Enable Unsafe by default
JoshRosen Jul 21, 2015
71fe0bb
Ignore failing ScalaUDFSuite test.
JoshRosen Jul 21, 2015
04fbe3e
Do not use UnsafeExternalSort operator if codegen is disabled
JoshRosen Jul 21, 2015
a7979dc
Disable unsafe Exchange path when RangePartitioning is used
JoshRosen Jul 21, 2015
4fcae4a
Reduce page size to make HiveCompatibilitySuite pass.
JoshRosen Jul 21, 2015
7e49d0e
Fix use-after-free bug in UnsafeExternalSorter.
JoshRosen Jul 26, 2015
0b95b71
Merge branch 'unsafe-by-default' into fuzz-test
JoshRosen Jul 26, 2015
454c921
Hack to enable join types to be tested
JoshRosen Jul 26, 2015
11f80a3
[SPARK-9368][SQL] Support get(ordinal, dataType) generic getter in Un…
rxin Jul 27, 2015
9989064
JoinedRow.
rxin Jul 27, 2015
24a3e46
Added support for DateType/TimestampType.
rxin Jul 27, 2015
fb6ca30
Support BinaryType.
rxin Jul 27, 2015
0f57c55
Reset the changes in ExpressionEvalHelper.
rxin Jul 27, 2015
3063788
Reset the change for real this time.
rxin Jul 27, 2015
5e3e266
Merge remote-tracking branch 'origin/pr/7682/head' into fuzz-test
JoshRosen Jul 27, 2015
6214682
Fixes to null handling in UnsafeRow
JoshRosen Jul 28, 2015
4c09a78
Enable Unsafe by default
JoshRosen Jul 21, 2015
54579b1
Disable unsafe Exchange path when RangePartitioning is used
JoshRosen Jul 21, 2015
601fcbd
Reduce page size to make HiveCompatibilitySuite pass.
JoshRosen Jul 21, 2015
e5f7464
Add task completion callback to avoid leak in limit after sort
JoshRosen Jul 27, 2015
c8eb2ee
Fix test in UnsafeRowConverterSuite
JoshRosen Jul 28, 2015
ef1c62d
Also match TungstenProject in checkNumProjects
JoshRosen Jul 28, 2015
203f1d8
Use TaskAttemptIds to track unroll memory
JoshRosen Jul 28, 2015
d7a2788
Use TaskAttemptIds to track shuffle memory
JoshRosen Jul 28, 2015
b38e70f
Roll back fix in PySpark, which is no longer necessary
JoshRosen Jul 28, 2015
d8bd892
Fix capitalization
JoshRosen Jul 28, 2015
56edb41
Move Executor's cleanup into Task so that TaskContext is defined when…
JoshRosen Jul 28, 2015
f4f5859
More thread -> task changes
JoshRosen Jul 28, 2015
e2b69c9
Fix ShuffleMemoryManagerSuite
JoshRosen Jul 28, 2015
63492c4
Fix long line.
JoshRosen Jul 28, 2015
6dc34f4
Merge branch 'unsafe-row-null-fixes' into unsafe-by-default
JoshRosen Jul 28, 2015
6ac2d82
Merge branch 'unsafe-by-default' into fuzz-test
JoshRosen Jul 29, 2015
18615a6
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 30, 2015
704abc1
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Aug 14, 2015
b549b3e
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Aug 16, 2015
ca8168a
Fix compilation with latest master.
JoshRosen Aug 16, 2015
0c7e9d0
Update to ignore some new analysis exceptions.
JoshRosen Aug 16, 2015
fb0671f
Move RandomDataFrameGenerator to own file.
JoshRosen Aug 16, 2015
78a71af
WIP
JoshRosen Aug 17, 2015
3b06849
Filter failing BinaryType array test.
JoshRosen Aug 17, 2015
574130b
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Aug 23, 2015
a4c9b33
WIP
JoshRosen Aug 24, 2015
d36f8f5
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen May 27, 2016
ae5055a
Also ignore BRound expression.
JoshRosen May 27, 2016
dfdab5e
Fix serializability.
JoshRosen May 27, 2016
bb4cc2a
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Jul 29, 2016
e60f231
More input type validation.
JoshRosen Jul 29, 2016
94087cb
Updates for DataSet API.
JoshRosen Jul 29, 2016
d1d3d53
Merge remote-tracking branch 'origin/master' into fuzz-test
JoshRosen Aug 16, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@
<artifactId>scalap</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalaz</groupId>
<artifactId>scalaz-core_2.10</artifactId>
<version>7.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ trait CheckAnalysis extends PredicateHelper {
aggregateExprs.foreach(checkValidAggregateExpression)
groupingExprs.foreach(checkValidGroupingExprs)

case s @ SetOperation(left, right) if left.output.length != right.output.length =>
failAnalysis(
s"${s.nodeName} can only be performed on tables with the same number of columns, " +
s"but the left table has ${left.output.length} columns and the right has " +
s"${right.output.length}")

case Sort(orders, _, _) =>
orders.foreach { order =>
if (!RowOrdering.isOrderable(order.dataType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}

private[this] def decimalToTimestamp(d: Decimal): Long = {
(d.toBigDecimal * 1000000L).longValue()
d.toJavaBigDecimal.multiply(java.math.BigDecimal.valueOf(1000000L)).longValue()
}
private[this] def doubleToTimestamp(d: Double): Any = {
if (d.isNaN || d.isInfinite) null else (d * 1000000L).toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ trait ExtractValue extends Expression
/**
* Returns the value of fields in the Struct `child`.
*
* No need to do type checking since it is handled by [[ExtractValue]].
*
* Note that we can pass in the field name directly to keep case preserving in `toString`.
* For example, when get field `yEAr` from `<year: int, month: int>`, we should pass in `yEAr`.
*/
case class GetStructField(child: Expression, ordinal: Int, name: Option[String] = None)
extends UnaryExpression with ExtractValue {
extends UnaryExpression with ExtractValue with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(StructType)

lazy val childSchema = child.dataType.asInstanceOf[StructType]

Expand Down Expand Up @@ -144,16 +144,15 @@ case class GetStructField(child: Expression, ordinal: Int, name: Option[String]
/**
* For a child whose data type is an array of structs, extracts the `ordinal`-th fields of all array
* elements, and returns them as a new array.
*
* No need to do type checking since it is handled by [[ExtractValue]].
*/
case class GetArrayStructFields(
child: Expression,
field: StructField,
ordinal: Int,
numFields: Int,
containsNull: Boolean) extends UnaryExpression with ExtractValue {
containsNull: Boolean) extends UnaryExpression with ExtractValue with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
override def dataType: DataType = ArrayType(field.dataType, containsNull)
override def toString: String = s"$child.${field.name}"
override def sql: String = s"${child.sql}.${quoteIdentifier(field.name)}"
Expand Down Expand Up @@ -215,8 +214,7 @@ case class GetArrayStructFields(
case class GetArrayItem(child: Expression, ordinal: Expression)
extends BinaryExpression with ExpectsInputTypes with ExtractValue {

// We have done type checking for child in `ExtractValue`, so only need to check the `ordinal`.
override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegralType)
override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, IntegralType)

override def toString: String = s"$child[$ordinal]"
override def sql: String = s"${child.sql}[${ordinal.sql}]"
Expand Down Expand Up @@ -264,8 +262,7 @@ case class GetMapValue(child: Expression, key: Expression)

private def keyType = child.dataType.asInstanceOf[MapType].keyType

// We have done type checking for child in `ExtractValue`, so only need to check the `key`.
override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, keyType)
override def inputTypes: Seq[AbstractDataType] = Seq(MapType, keyType)

override def toString: String = s"$child[$key]"
override def sql: String = s"${child.sql}[${key.sql}]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.types._

/**
* Return the unscaled Long value of a Decimal, assuming it fits in a Long.
* Note: this expression is internal and created only by the optimizer,
* we don't need to do type check for it.
* Note: this expression is internal and created only by the optimizer.
*/
case class UnscaledValue(child: Expression) extends UnaryExpression {
case class UnscaledValue(child: Expression) extends UnaryExpression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType)
override def dataType: DataType = LongType
override def toString: String = s"UnscaledValue($child)"

Expand All @@ -41,11 +41,15 @@ case class UnscaledValue(child: Expression) extends UnaryExpression {

/**
* Create a Decimal from an unscaled Long value.
* Note: this expression is internal and created only by the optimizer,
* we don't need to do type check for it.
* Note: this expression is internal and created only by the optimizer.
*/
case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends UnaryExpression {
case class MakeDecimal(
child: Expression,
precision: Int,
scale: Int)
extends UnaryExpression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(LongType)
override def dataType: DataType = DecimalType(precision, scale)
override def nullable: Boolean = true
override def toString: String = s"MakeDecimal($child,$precision,$scale)"
Expand Down Expand Up @@ -80,7 +84,12 @@ case class PromotePrecision(child: Expression) extends UnaryExpression {
* Rounds the decimal to given scale and check whether the decimal can fit in provided precision
* or not, returns null if not.
*/
case class CheckOverflow(child: Expression, dataType: DecimalType) extends UnaryExpression {
case class CheckOverflow(
child: Expression,
dataType: DecimalType)
extends UnaryExpression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType)

override def nullable: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute

object JoinType {

val supportedJoinTypes = Seq(
"inner",
"outer", "full", "fullouter",
"leftouter", "left",
"rightouter", "right",
"leftsemi",
"leftanti")

def apply(typ: String): JoinType = typ.toLowerCase.replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
Expand All @@ -29,16 +38,8 @@ object JoinType {
case "leftsemi" => LeftSemi
case "leftanti" => LeftAnti
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter",
"leftouter", "left",
"rightouter", "right",
"leftsemi",
"leftanti")

throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
"Supported join types include: " + supportedJoinTypes.mkString("'", "', '", "'") + ".")
}
}

Expand Down
12 changes: 12 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@
<artifactId>xbean-asm5-shaded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.clapper</groupId>
<artifactId>classutil_${scala.binary.version}</artifactId>
<version>1.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalaz</groupId>
<artifactId>scalaz-core_${scala.binary.version}</artifactId>
<version>7.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
3 changes: 2 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class UDFSuite extends QueryTest with SharedSQLContext {
assert(result.count() === 2)
}

test("UDFs everywhere") {
// Temporarily ignored until we implement code generation for ScalaUDF.
ignore("UDFs everywhere") {
spark.udf.register("groupFunction", (n: Int) => { n > 10 })
spark.udf.register("havingFilter", (n: Long) => { n > 2000 })
spark.udf.register("whereFilter", (n: Int) => { n < 150 })
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.fuzzing

import scala.util.Random
import scala.util.control.NonFatal

import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

object DataFrameFuzzingUtils {

def randomChoice[T](values: Seq[T]): T = {
values(Random.nextInt(values.length))
}

/**
* Build a list of column names and types for the given StructType, taking nesting into account.
* For nested struct fields, this will emit both the column for the struct field itself as well as
* fields for the nested struct's fields. This process will be performed recursively in order to
* handle deeply-nested structs.
*/
def getColumnsAndTypes(struct: StructType): Seq[(String, DataType)] = {
struct.flatMap { field =>
val nestedFieldInfos: Seq[(String, DataType)] = field.dataType match {
case nestedStruct: StructType =>
Seq((field.name, field.dataType)) ++ getColumnsAndTypes(nestedStruct).map {
case (nestedColName, dataType) => (field.name + "." + nestedColName, dataType)
}
case _ => Seq.empty
}
Seq((field.name, field.dataType)) ++ nestedFieldInfos
}
}

def getRandomColumnName(
df: DataFrame,
condition: DataType => Boolean = _ => true): Option[String] = {
val columnsWithTypes = getColumnsAndTypes(df.schema)
val candidateColumns = columnsWithTypes.filter(c => condition(c._2))
if (candidateColumns.isEmpty) {
None
} else {
Some(randomChoice(candidateColumns)._1)
}
}
}


/**
* This test suite generates random data frames, then applies random sequences of operations to
* them in order to construct random queries. We don't have a source of truth for these random
* queries but nevertheless they are still useful for testing that we don't crash in bad ways.
*/
class DataFrameFuzzingSuite extends QueryTest with SharedSparkContext {


override protected def spark: SparkSession = sqlContext.sparkSession

val tempDir = Utils.createTempDir()

private var sqlContext: SQLContext = _
private var dataGenerator: RandomDataFrameGenerator = _

override def beforeAll(): Unit = {
super.beforeAll()
sqlContext = new SQLContext(sc)
dataGenerator = new RandomDataFrameGenerator(123, sqlContext)
sqlContext.conf.setConf(SQLConf.SHUFFLE_PARTITIONS, 10)
}

def tryToExecute(df: DataFrame): DataFrame = {
try {
df.rdd.count()
df
} catch {
case NonFatal(e) =>
// scalastyle:off println
println(df.queryExecution)
// scalastyle:on println
throw e
}
}

// TODO: make these regexes.
val ignoredAnalysisExceptionMessages = Seq(
// TODO: filter only for binary type:
"cannot sort data type array<",
"cannot be used in grouping expression",
"cannot be used in join condition",
"can only be performed on tables with the same number of columns",
"number of columns doesn't match",
"unsupported join type",
"is neither present in the group by, nor is it an aggregate function",
"is ambiguous, could be:",
"unresolved operator 'Project", // TODO
"unresolved operator 'Union", // TODO: disabled to let me find new errors
"unresolved operator 'Except", // TODO: disabled to let me find new errors
"unresolved operator 'Intersect", // TODO: disabled to let me find new errors
"Cannot resolve column name" // TODO: only ignore for join?
)

def getRandomTransformation(df: DataFrame): DataFrameTransformation = {
(1 to 1000).iterator.map(_ => ReflectiveFuzzing.getTransformation(df)).flatten.next()
}

def applyRandomTransform(df: DataFrame): DataFrame = {
val tf = getRandomTransformation(df)
// scalastyle:off println
println(" " + tf)
// scalastyle:on println
tf.apply(df)
}

def resetConfs(): Unit = {
sqlContext.conf.getAllDefinedConfs.foreach { case (key, defaultValue, doc) =>
sqlContext.conf.setConfString(key, defaultValue)
}
sqlContext.conf.setConfString("spark.sql.crossJoin.enabled", "true")
sqlContext.conf.setConfString("spark.sql.autoBroadcastJoinThreshold", "-1")
}

private val configurations = Seq(
"default" -> Seq(),
"no optimization" -> Seq(SQLConf.OPTIMIZER_MAX_ITERATIONS.key -> "0"),
"disable-wholestage-codegen" -> Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false"),
"disable-exchange-reuse" -> Seq(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false")
)

def replan(df: DataFrame): DataFrame = {
new Dataset[Row](sqlContext.sparkSession, df.logicalPlan, RowEncoder(df.schema))
}

test("fuzz test") {
for (i <- 1 to 1000) {
// scalastyle:off println
println(s"Iteration $i")
// scalastyle:on println
try {
resetConfs()
var df = dataGenerator.randomDataFrame(
numCols = Random.nextInt(2) + 1,
numRows = 20,
allowComplexTypes = false)
var depth = 3
while (depth > 0) {
df = tryToExecute(applyRandomTransform(df))
depth -= 1
}
val defaultResult = replan(df).collect()
configurations.foreach { case (confName, confsToSet) =>
resetConfs()
withClue(s"configuration = $confName") {
confsToSet.foreach { case (key, value) =>
sqlContext.conf.setConfString(key, value)
}
checkAnswer(replan(df), defaultResult)
}
}
println(s"Finished all tests successfully for plan:\n${df.logicalPlan}")
} catch {
case e: UnresolvedException[_] =>
// println("skipped due to unresolved")
case e: Exception
if ignoredAnalysisExceptionMessages.exists {
m => Option(e.getMessage).getOrElse("").toLowerCase.contains(m.toLowerCase)
} =>
// println("Skipped due to expected AnalysisException " + e)
}
}
}
}
Loading