Skip to content

Commit

Permalink
[SPARK-32526][SQL] Fix some test cases of sql/catalyst module in sc…
Browse files Browse the repository at this point in the history
…ala 2.13

### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), total of 88 failed and 2 aborted test cases were fixed, the related suite as follow:

- `DataSourceV2AnalysisBaseSuite` related test cases (71 FAILED -> Pass)
- `TreeNodeSuite` (1 FAILED -> Pass)
- `MetadataSuite `(1 FAILED -> Pass)
- `InferFiltersFromConstraintsSuite `(3 FAILED -> Pass)
- `StringExpressionsSuite ` (1 FAILED -> Pass)
- `JacksonParserSuite ` (1 FAILED -> Pass)
- `HigherOrderFunctionsSuite `(1 FAILED -> Pass)
- `ExpressionParserSuite` (1 FAILED -> Pass)
- `CollectionExpressionsSuite `(6 FAILED -> Pass)
- `SchemaUtilsSuite` (2 FAILED -> Pass)
- `ExpressionSetSuite `(ABORTED -> Pass)
- `ArrayDataIndexedSeqSuite `(ABORTED -> Pass)

The main change of this pr as following:

- `Optimizer` and `Analyzer` are changed to pass compile, `ArrayBuffer` is not a `Seq` in scala 2.13, call `toSeq` method manually to compatible with Scala 2.12

- `m.mapValues().view.force` pattern return a `Map` in scala 2.12 but return a `IndexedSeq` in scala 2.13, call `toMap` method manually to compatible with Scala 2.12. `TreeNode` are changed to pass `DataSourceV2AnalysisBaseSuite` related test cases and `TreeNodeSuite` failed case.

- call `toMap` method of `Metadata#hash` method `case map` branch because `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13.

- `impl` contact method of `ExpressionSet` in Scala 2.13 version refer to `ExpressionSet` in Scala 2.12 to support `+ + ` method conform to `ExpressionSet` semantics

- `GenericArrayData` not accept `ArrayBuffer` input, call `toSeq` when use `ArrayBuffer` construction `GenericArrayData`   for Scala version compatibility

-  Call `toSeq` in `RandomDataGenerator#randomRow` method to ensure contents of `fields` is `Seq` not `ArrayBuffer`

-  Call `toSeq` Let `JacksonParser#parse` still return a `Seq` because the check method of `JacksonParserSuite#"skipping rows using pushdown filters"` dependence on `Seq` type
- Call `toSeq` in `AstBuilder#visitFunctionCall`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13

- Add a `LongType` match to `ArraySetLike.nullValueHolder`

- Add a `sorted` to ensure `duplicateColumns` string in `SchemaUtils.checkColumnNameDuplication` method error message have a deterministic order

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/catalyst -Pscala-2.13 -am
mvn test -pl sql/catalyst -Pscala-2.13
```

**Before**
```
Tests: succeeded 3853, failed 103, canceled 0, ignored 6, pending 0
*** 3 SUITES ABORTED ***
*** 103 TESTS FAILED ***
```

**After**

```
Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0
*** 1 SUITE ABORTED ***
*** 15 TESTS FAILED ***
```

Closes #29370 from LuciferYang/fix-DataSourceV2AnalysisBaseSuite.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
LuciferYang authored and srowen committed Aug 13, 2020
1 parent 0c850c7 commit 6ae2cb2
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import scala.collection.mutable
import scala.collection.{mutable, View}
import scala.collection.mutable.ArrayBuffer

object ExpressionSet {
Expand Down Expand Up @@ -86,6 +86,12 @@ class ExpressionSet protected(
}
}

override def concat(elems: IterableOnce[Expression]): Set[Expression] = {
val newSet = new ExpressionSet(baseSet.clone(), originals.clone())
elems.iterator.foreach(newSet.add)
newSet
}

override def iterator: Iterator[Expression] = originals.iterator

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,20 +1281,20 @@ class Analyzer(
}

if (attrMapping.isEmpty) {
newPlan -> attrMapping
newPlan -> attrMapping.toSeq
} else {
assert(!attrMapping.groupBy(_._1.exprId)
.exists(_._2.map(_._2.exprId).distinct.length > 1),
"Found duplicate rewrite attributes")
val attributeRewrites = AttributeMap(attrMapping)
val attributeRewrites = AttributeMap(attrMapping.toSeq)
// Using attrMapping from the children plans to rewrite their parent node.
// Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
newPlan.transformExpressions {
case a: Attribute =>
dedupAttr(a, attributeRewrites)
case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
} -> attrMapping
} -> attrMapping.toSeq
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,7 @@ trait ArraySetLike {
@transient protected lazy val nullValueHolder = et match {
case ByteType => "(byte) 0"
case ShortType => "(short) 0"
case LongType => "(long) 0"
case _ => "0"
}

Expand Down Expand Up @@ -3155,7 +3156,7 @@ case class ArrayDistinct(child: Expression)
}
}
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
}
}

Expand Down Expand Up @@ -3313,7 +3314,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi
i += 1
}
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
} else {
(array1, array2) =>
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
Expand Down Expand Up @@ -3344,7 +3345,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi
arrayBuffer += elem
}
}))
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
}
}

Expand Down Expand Up @@ -3476,7 +3477,7 @@ object ArrayUnion {
arrayBuffer += elem
}
}))
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
}
}

Expand Down Expand Up @@ -3538,7 +3539,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina
}
i += 1
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
} else {
new GenericArrayData(Array.emptyObjectArray)
}
Expand Down Expand Up @@ -3586,7 +3587,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina
}
i += 1
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
} else {
new GenericArrayData(Array.emptyObjectArray)
}
Expand Down Expand Up @@ -3777,7 +3778,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
}
i += 1
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
} else {
(array1, array2) =>
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
Expand Down Expand Up @@ -3822,7 +3823,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
}
i += 1
}
new GenericArrayData(arrayBuffer)
new GenericArrayData(arrayBuffer.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ case class ArrayFilter(
}
i += 1
}
new GenericArrayData(buffer)
new GenericArrayData(buffer.toSeq)
}

override def prettyName: String = "filter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2370,8 +2370,8 @@ case class Sentences(
widx = wi.current
if (Character.isLetterOrDigit(word.charAt(0))) words += UTF8String.fromString(word)
}
result += new GenericArrayData(words)
result += new GenericArrayData(words.toSeq)
}
new GenericArrayData(result)
new GenericArrayData(result.toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ class JacksonParser(
case null => None
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
case rows => rows.toSeq
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ object CombineUnions extends Rule[LogicalPlan] {
flattened += child
}
}
union.copy(children = flattened)
union.copy(children = flattened.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1612,13 +1612,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// Create the function call.
val name = ctx.functionName.getText
val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
val arguments = ctx.argument.asScala.map(expression) match {
// Call `toSeq`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13
val arguments = ctx.argument.asScala.map(expression).toSeq match {
case Seq(UnresolvedStar(None))
if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct =>
// Transform COUNT(*) into COUNT(1).
Seq(Literal(1))
case expressions =>
expressions.toSeq
expressions
}
val filter = Option(ctx.where).map(expression(_))
val function = UnresolvedFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case s: Seq[_] =>
s.map(mapChild)
case m: Map[_, _] =>
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13
// `mapValues` is lazy and we need to force it to materialize
m.mapValues(mapChild).view.force
m.mapValues(mapChild).view.force.toMap
case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg)
case Some(child) => Some(mapChild(child))
case nonChild: AnyRef => nonChild
Expand Down Expand Up @@ -411,6 +413,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
} else {
Some(arg)
}
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13
case m: Map[_, _] => m.mapValues {
case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
Expand All @@ -421,7 +425,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
arg
}
case other => other
}.view.force // `mapValues` is lazy and we need to force it to materialize
}.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ object Metadata {
/** Computes the hash code for the types we support. */
private def hash(obj: Any): Int = {
obj match {
// `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13, call
// `toMap` for Scala version compatibility.
case map: Map[_, _] =>
map.mapValues(hash).##
map.mapValues(hash).toMap.##
case arr: Array[_] =>
// Seq.empty[T] has the same hashCode regardless of T.
arr.toSeq.map(hash).##
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[spark] object SchemaUtils {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw new AnalysisException(
s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}")
s"Found duplicate column(s) $colType: ${duplicateColumns.toSeq.sorted.mkString(", ")}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ object RandomDataGenerator {
arr += gen()
i += 1
}
arr
arr.toSeq
}
fields += data
case StructType(children) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SchemaUtilsSuite extends SparkFunSuite {
test(s"Check column name duplication in $testType cases") {
def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = {
val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " +
duplicatedColumns.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ")
duplicatedColumns.sorted.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ")
val schema = StructType.fromDDL(schemaStr)
var msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication(
Expand Down

0 comments on commit 6ae2cb2

Please sign in to comment.