From b15ebe4b6721f9533150aa5831986bea081843e2 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 12 Oct 2017 17:24:56 -0700 Subject: [PATCH 01/10] Fix HadoopMapReduceCommitProtocol based on failures seen with Phoenix output format --- .../io/HadoopMapReduceCommitProtocol.scala | 24 +++++++------- .../spark/rdd/PairRDDFunctionsSuite.scala | 31 +++++++++++++------ 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index a7e6859ef6b64..95c99d29c3a9c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import java.util.{Date, UUID} import scala.collection.mutable +import scala.util.Try import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.Path @@ -47,6 +48,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) /** OutputCommitter from Hadoop is not serializable so marking it transient. */ @transient private var committer: OutputCommitter = _ + /** + * Checks whether there are files to be committed to a valid output location. + * + * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, + * it is necessary to check whether a valid output path is specified. + * [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for + * committers not writing to distributed file systems. + */ + private val hasValidPath = Try { new Path(path) }.isSuccess + /** * Tracks files staged by this task for absolute output paths. These outputs are not managed by * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. @@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) - /** - * Checks whether there are files to be committed to an absolute output location. - * - * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null, - * it is necessary to check whether the output path is specified. Output path may not be required - * for committers not writing to distributed file systems. - */ - private def hasAbsPathFiles: Boolean = path != null - protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -142,7 +144,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") - if (hasAbsPathFiles) { + if (hasValidPath) { val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) for ((src, dst) <- filesToMove) { fs.rename(new Path(src), new Path(dst)) @@ -153,7 +155,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def abortJob(jobContext: JobContext): Unit = { committer.abortJob(jobContext, JobStatus.State.FAILED) - if (hasAbsPathFiles) { + if (hasValidPath) { val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) fs.delete(absPathStagingDir, true) } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 07579c5098014..a2991d1e029f5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -568,21 +568,34 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assert(FakeWriterWithCallback.exception.getMessage contains "failed to write") } - test("saveAsNewAPIHadoopDataset should respect empty output directory when " + + test("saveAsNewAPIHadoopDataset should support invalid output paths when " + "there are no files to be committed to an absolute output location") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) - val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) - job.setOutputKeyClass(classOf[Integer]) - job.setOutputValueClass(classOf[Integer]) - job.setOutputFormatClass(classOf[NewFakeFormat]) - val jobConfiguration = job.getConfiguration + def saveRddWithPath(path: String): Unit = { + val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) + job.setOutputKeyClass(classOf[Integer]) + job.setOutputValueClass(classOf[Integer]) + job.setOutputFormatClass(classOf[NewFakeFormat]) + if (null != path) { + job.getConfiguration.set("mapred.output.dir", path) + } else { + job.getConfiguration.unset("mapred.output.dir") + } + val jobConfiguration = job.getConfiguration + + // just test that the job does not fail with java.lang.IllegalArgumentException. + pairs.saveAsNewAPIHadoopDataset(jobConfiguration) + } - // just test that the job does not fail with - // java.lang.IllegalArgumentException: Can not create a Path from a null string - pairs.saveAsNewAPIHadoopDataset(jobConfiguration) + saveRddWithPath(null) + saveRddWithPath("") + saveRddWithPath("test:") } + // In spark 2.1, only null was supported - not other invalid paths. + // org.apache.hadoop.mapred.FileOutputFormat.getOutputPath fails with IllegalArgumentException + // for non-null invalid paths. test("saveAsHadoopDataset should respect empty output directory when " + "there are no files to be committed to an absolute output location") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1) From 3ff766f61afbd09dcc7a73eae02e68a39114ce3f Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Thu, 12 Oct 2017 18:47:16 -0700 Subject: [PATCH 02/10] [SPARK-22263][SQL] Refactor deterministic as lazy value ## What changes were proposed in this pull request? The method `deterministic` is frequently called in optimizer. Refactor `deterministic` as lazy value, in order to avoid redundant computations. ## How was this patch tested? Simple benchmark test over TPC-DS queries, run time from query string to optimized plan(continuous 20 runs, and get the average of last 5 results): Before changes: 12601 ms After changes: 11993ms This is 4.8% performance improvement. Also run test with Unit test. Author: Wang Gengliang Closes #19478 from gengliangwang/deterministicAsLazyVal. --- .../sql/catalyst/expressions/CallMethodViaReflection.scala | 2 +- .../apache/spark/sql/catalyst/expressions/Expression.scala | 4 ++-- .../org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala | 2 +- .../spark/sql/catalyst/expressions/aggregate/First.scala | 2 +- .../spark/sql/catalyst/expressions/aggregate/Last.scala | 2 +- .../spark/sql/catalyst/expressions/aggregate/collect.scala | 2 +- .../org/apache/spark/sql/catalyst/expressions/misc.scala | 2 +- .../sql/execution/aggregate/TypedAggregateExpression.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/aggregate/udaf.scala | 2 +- .../org/apache/spark/sql/TypedImperativeAggregateSuite.scala | 2 +- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 4 ++-- 11 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index cd97304302e48..65bb9a8c642b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -76,7 +76,7 @@ case class CallMethodViaReflection(children: Seq[Expression]) } } - override def deterministic: Boolean = false + override lazy val deterministic: Boolean = false override def nullable: Boolean = true override val dataType: DataType = StringType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index c058425b4bc36..0e75ac88dc2b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -79,7 +79,7 @@ abstract class Expression extends TreeNode[Expression] { * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext. * By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true. */ - def deterministic: Boolean = children.forall(_.deterministic) + lazy val deterministic: Boolean = children.forall(_.deterministic) def nullable: Boolean @@ -265,7 +265,7 @@ trait NonSQLExpression extends Expression { * An expression that is nondeterministic. */ trait Nondeterministic extends Expression { - final override def deterministic: Boolean = false + final override lazy val deterministic: Boolean = false final override def foldable: Boolean = false @transient diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 527f1670c25e1..179853032035e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -49,7 +49,7 @@ case class ScalaUDF( udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { - override def deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) + override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) override def toString: String = s"${udfName.map(name => s"UDF:$name").getOrElse("UDF")}(${children.mkString(", ")})" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala index bfc58c22886cc..4e671e1f3e6eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala @@ -44,7 +44,7 @@ case class First(child: Expression, ignoreNullsExpr: Expression) override def nullable: Boolean = true // First is not a deterministic function. - override def deterministic: Boolean = false + override lazy val deterministic: Boolean = false // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala index 96a6ec08a160a..0ccabb9d98914 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala @@ -44,7 +44,7 @@ case class Last(child: Expression, ignoreNullsExpr: Expression) override def nullable: Boolean = true // Last is not a deterministic function. - override def deterministic: Boolean = false + override lazy val deterministic: Boolean = false // Return data type. override def dataType: DataType = child.dataType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 405c2065680f5..be972f006352e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -44,7 +44,7 @@ abstract class Collect[T <: Growable[Any] with Iterable[Any]] extends TypedImper // Both `CollectList` and `CollectSet` are non-deterministic since their results depend on the // actual order of input rows. - override def deterministic: Boolean = false + override lazy val deterministic: Boolean = false override def update(buffer: T, input: InternalRow): T = { val value = child.eval(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index ef293ff3f18ea..b86e271fe2958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -119,7 +119,7 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { // scalastyle:on line.size.limit case class Uuid() extends LeafExpression { - override def deterministic: Boolean = false + override lazy val deterministic: Boolean = false override def nullable: Boolean = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala index 717758fdf716f..aab8cc50b9526 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala @@ -127,7 +127,7 @@ case class SimpleTypedAggregateExpression( nullable: Boolean) extends DeclarativeAggregate with TypedAggregateExpression with NonSQLExpression { - override def deterministic: Boolean = true + override lazy val deterministic: Boolean = true override def children: Seq[Expression] = inputDeserializer.toSeq :+ bufferDeserializer @@ -221,7 +221,7 @@ case class ComplexTypedAggregateExpression( inputAggBufferOffset: Int = 0) extends TypedImperativeAggregate[Any] with TypedAggregateExpression with NonSQLExpression { - override def deterministic: Boolean = true + override lazy val deterministic: Boolean = true override def children: Seq[Expression] = inputDeserializer.toSeq diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index fec1add18cbf2..72aa4adff4e64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -340,7 +340,7 @@ case class ScalaUDAF( override def dataType: DataType = udaf.dataType - override def deterministic: Boolean = udaf.deterministic + override lazy val deterministic: Boolean = udaf.deterministic override val inputTypes: Seq[DataType] = udaf.inputSchema.map(_.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala index b76f168220d84..c5fb17345222a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala @@ -268,7 +268,7 @@ object TypedImperativeAggregateSuite { } } - override def deterministic: Boolean = true + override lazy val deterministic: Boolean = true override def children: Seq[Expression] = Seq(child) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index e9bdcf00b9346..68af99ea272a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -48,7 +48,7 @@ private[hive] case class HiveSimpleUDF( with Logging with UserDefinedExpression { - override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def nullable: Boolean = true @@ -131,7 +131,7 @@ private[hive] case class HiveGenericUDF( override def nullable: Boolean = true - override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def foldable: Boolean = isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] From ec122209fb35a65637df42eded64b0203e105aae Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 13 Oct 2017 13:09:35 +0800 Subject: [PATCH 03/10] [SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids between logical and physical plan ## What changes were proposed in this pull request? Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node. ## How was this patch tested? new regression test. Author: Wenchen Fan Closes #19483 from cloud-fan/bug2. --- .../datasources/FileFormatWriter.scala | 7 +++++- .../datasources/FileFormatWriterSuite.scala | 2 +- .../apache/spark/sql/hive/InsertSuite.scala | 22 +++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 75b1695fbc275..1fac01a2c26c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -180,8 +180,13 @@ object FileFormatWriter extends Logging { val rdd = if (orderingMatched) { queryExecution.toRdd } else { + // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and + // the physical plan may have different attribute ids due to optimizer removing some + // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. + val orderingExpr = requiredOrdering + .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns)) SortExec( - requiredOrdering.map(SortOrder(_, Ascending)), + orderingExpr, global = false, child = queryExecution.executedPlan).execute() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 6f8767db176aa..13f0e0bca86c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -32,7 +32,7 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { } } - test("FileFormatWriter should respect the input query schema") { + test("SPARK-22252: FileFormatWriter should respect the input query schema") { withTable("t1", "t2", "t3", "t4") { spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index aa5cae33f5cd9..ab91727049ff5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -728,4 +728,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") { + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + withTable("tab1", "tab2") { + Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1") + + spark.sql( + """ + |CREATE TABLE tab2 (word string, length int) + |PARTITIONED BY (first string) + """.stripMargin) + + spark.sql( + """ + |INSERT INTO TABLE tab2 PARTITION(first) + |SELECT word, length, cast(first as string) as first FROM tab1 + """.stripMargin) + + checkAnswer(spark.table("tab2"), Row("a", 3, "b")) + } + } + } } From 2f00a71a876321af02865d7cd53ada167e1ce2e3 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Thu, 12 Oct 2017 22:45:19 -0700 Subject: [PATCH 04/10] [SPARK-22257][SQL] Reserve all non-deterministic expressions in ExpressionSet ## What changes were proposed in this pull request? For non-deterministic expressions, they should be considered as not contained in the [[ExpressionSet]]. This is consistent with how we define `semanticEquals` between two expressions. Otherwise, combining expressions will remove non-deterministic expressions which should be reserved. E.g. Combine filters of ```scala testRelation.where(Rand(0) > 0.1).where(Rand(0) > 0.1) ``` should result in ```scala testRelation.where(Rand(0) > 0.1 && Rand(0) > 0.1) ``` ## How was this patch tested? Unit test Author: Wang Gengliang Closes #19475 from gengliangwang/non-deterministic-expressionSet. --- .../catalyst/expressions/ExpressionSet.scala | 23 ++++++--- .../expressions/ExpressionSetSuite.scala | 51 +++++++++++++++---- .../optimizer/FilterPushdownSuite.scala | 15 ++++++ 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index 305ac90e245b8..7e8e7b8cd5f18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -30,8 +30,9 @@ object ExpressionSet { } /** - * A [[Set]] where membership is determined based on a canonical representation of an [[Expression]] - * (i.e. one that attempts to ignore cosmetic differences). See [[Canonicalize]] for more details. + * A [[Set]] where membership is determined based on determinacy and a canonical representation of + * an [[Expression]] (i.e. one that attempts to ignore cosmetic differences). + * See [[Canonicalize]] for more details. * * Internally this set uses the canonical representation, but keeps also track of the original * expressions to ease debugging. Since different expressions can share the same canonical @@ -46,6 +47,10 @@ object ExpressionSet { * set.contains(1 + a) => true * set.contains(a + 2) => false * }}} + * + * For non-deterministic expressions, they are always considered as not contained in the [[Set]]. + * On adding a non-deterministic expression, simply append it to the original expressions. + * This is consistent with how we define `semanticEquals` between two expressions. */ class ExpressionSet protected( protected val baseSet: mutable.Set[Expression] = new mutable.HashSet, @@ -53,7 +58,9 @@ class ExpressionSet protected( extends Set[Expression] { protected def add(e: Expression): Unit = { - if (!baseSet.contains(e.canonicalized)) { + if (!e.deterministic) { + originals += e + } else if (!baseSet.contains(e.canonicalized) ) { baseSet.add(e.canonicalized) originals += e } @@ -74,9 +81,13 @@ class ExpressionSet protected( } override def -(elem: Expression): ExpressionSet = { - val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) - val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) - new ExpressionSet(newBaseSet, newOriginals) + if (elem.deterministic) { + val newBaseSet = baseSet.clone().filterNot(_ == elem.canonicalized) + val newOriginals = originals.clone().filterNot(_.canonicalized == elem.canonicalized) + new ExpressionSet(newBaseSet, newOriginals) + } else { + new ExpressionSet(baseSet.clone(), originals.clone()) + } } override def iterator: Iterator[Expression] = originals.iterator diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala index a1000a0e80799..12eddf557109f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala @@ -175,20 +175,14 @@ class ExpressionSetSuite extends SparkFunSuite { aUpper > bUpper || aUpper <= Rand(1L) || aUpper <= 10, aUpper <= Rand(1L) || aUpper <= 10 || aUpper > bUpper) - // Partial reorder case: we don't reorder non-deterministic expressions, - // but we can reorder sub-expressions in deterministic AND/OR expressions. - // There are two predicates: - // (aUpper > bUpper || bUpper > 100) => we can reorder sub-expressions in it. - // (aUpper === Rand(1L)) - setTest(1, + // Keep all the non-deterministic expressions even they are semantically equal. + setTest(2, Rand(1L), Rand(1L)) + + setTest(2, (aUpper > bUpper || bUpper > 100) && aUpper === Rand(1L), (bUpper > 100 || aUpper > bUpper) && aUpper === Rand(1L)) - // There are three predicates: - // (Rand(1L) > aUpper) - // (aUpper <= Rand(1L) && aUpper > bUpper) - // (aUpper > 10 && bUpper > 10) => we can reorder sub-expressions in it. - setTest(1, + setTest(2, Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (aUpper > 10 && bUpper > 10), Rand(1L) > aUpper || (aUpper <= Rand(1L) && aUpper > bUpper) || (bUpper > 10 && aUpper > 10)) @@ -219,4 +213,39 @@ class ExpressionSetSuite extends SparkFunSuite { assert((initialSet ++ setToAddWithSameExpression).size == 2) assert((initialSet ++ setToAddWithOutSameExpression).size == 3) } + + test("add single element to set with non-deterministic expressions") { + val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + + assert((initialSet + (aUpper + 1)).size == 2) + assert((initialSet + Rand(0)).size == 3) + assert((initialSet + (aUpper + 2)).size == 3) + } + + test("remove single element to set with non-deterministic expressions") { + val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + + assert((initialSet - (aUpper + 1)).size == 1) + assert((initialSet - Rand(0)).size == 2) + assert((initialSet - (aUpper + 2)).size == 2) + } + + test("add multiple elements to set with non-deterministic expressions") { + val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + val setToAddWithSameDeterministicExpression = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + val setToAddWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 :: Nil) + + assert((initialSet ++ setToAddWithSameDeterministicExpression).size == 3) + assert((initialSet ++ setToAddWithOutSameExpression).size == 4) + } + + test("remove multiple elements to set with non-deterministic expressions") { + val initialSet = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + val setToRemoveWithSameDeterministicExpression = ExpressionSet(aUpper + 1 :: Rand(0) :: Nil) + val setToRemoveWithOutSameExpression = ExpressionSet(aUpper + 3 :: aUpper + 4 :: Nil) + + assert((initialSet -- setToRemoveWithSameDeterministicExpression).size == 1) + assert((initialSet -- setToRemoveWithOutSameExpression).size == 2) + } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 582b3ead5e54a..de0e7c7ee49ac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -94,6 +94,21 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("combine redundant deterministic filters") { + val originalQuery = + testRelation + .where(Rand(0) > 0.1 && 'a === 1) + .where(Rand(0) > 0.1 && 'a === 1) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where(Rand(0) > 0.1 && 'a === 1 && Rand(0) > 0.1) + .analyze + + comparePlans(optimized, correctAnswer) + } + test("SPARK-16164: Filter pushdown should keep the ordering in the logical plan") { val originalQuery = testRelation From e6e36004afc3f9fc8abea98542248e9de11b4435 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 13 Oct 2017 23:09:12 +0800 Subject: [PATCH 05/10] [SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC table instead of ORC file schema ## What changes were proposed in this pull request? Before Hive 2.0, ORC File schema has invalid column names like `_col1` and `_col2`. This is a well-known limitation and there are several Apache Spark issues with `spark.sql.hive.convertMetastoreOrc=true`. This PR ignores ORC File schema and use Spark schema. ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun Closes #19470 from dongjoon-hyun/SPARK-18355. --- .../spark/sql/hive/orc/OrcFileFormat.scala | 31 ++++++---- .../sql/hive/execution/SQLQuerySuite.scala | 62 ++++++++++++++++++- 2 files changed, 80 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index c76f0ebb36a60..194e69c93e1a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -134,12 +134,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file // using the given physical schema. Instead, we simply return an empty iterator. - val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) - if (maybePhysicalSchema.isEmpty) { + val isEmptyFile = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)).isEmpty + if (isEmptyFile) { Iterator.empty } else { - val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -163,6 +162,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, + dataSchema, requiredSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), recordsIterator) @@ -272,25 +272,32 @@ private[orc] object OrcRelation extends HiveInspectors { def unwrapOrcStructs( conf: Configuration, dataSchema: StructType, + requiredSchema: StructType, maybeStructOI: Option[StructObjectInspector], iterator: Iterator[Writable]): Iterator[InternalRow] = { val deserializer = new OrcSerde - val mutableRow = new SpecificInternalRow(dataSchema.map(_.dataType)) - val unsafeProjection = UnsafeProjection.create(dataSchema) + val mutableRow = new SpecificInternalRow(requiredSchema.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(requiredSchema) def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = { - val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map { - case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal + val (fieldRefs, fieldOrdinals) = requiredSchema.zipWithIndex.map { + case (field, ordinal) => + var ref = oi.getStructFieldRef(field.name) + if (ref == null) { + ref = oi.getStructFieldRef("_col" + dataSchema.fieldIndex(field.name)) + } + ref -> ordinal }.unzip - val unwrappers = fieldRefs.map(unwrapperFor) + val unwrappers = fieldRefs.map(r => if (r == null) null else unwrapperFor(r)) iterator.map { value => val raw = deserializer.deserialize(value) var i = 0 val length = fieldRefs.length while (i < length) { - val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) + val fieldRef = fieldRefs(i) + val fieldValue = if (fieldRef == null) null else oi.getStructFieldData(raw, fieldRef) if (fieldValue == null) { mutableRow.setNullAt(fieldOrdinals(i)) } else { @@ -306,8 +313,8 @@ private[orc] object OrcRelation extends HiveInspectors { } def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + conf: Configuration, dataSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => dataSchema.fieldIndex(a.name): Integer) val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 09c59000b3e3f..94fa43dec7313 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -2050,4 +2050,64 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet").foreach { format => + test(s"SPARK-18355 Read data from a hive table with a new column - $format") { + val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + + Seq("true", "false").foreach { value => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> value, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> value) { + withTempDatabase { db => + client.runSqlHive( + s""" + |CREATE TABLE $db.t( + | click_id string, + | search_id string, + | uid bigint) + |PARTITIONED BY ( + | ts string, + | hour string) + |STORED AS $format + """.stripMargin) + + client.runSqlHive( + s""" + |INSERT INTO TABLE $db.t + |PARTITION (ts = '98765', hour = '01') + |VALUES (12, 2, 12345) + """.stripMargin + ) + + checkAnswer( + sql(s"SELECT click_id, search_id, uid, ts, hour FROM $db.t"), + Row("12", "2", 12345, "98765", "01")) + + client.runSqlHive(s"ALTER TABLE $db.t ADD COLUMNS (dummy string)") + + checkAnswer( + sql(s"SELECT click_id, search_id FROM $db.t"), + Row("12", "2")) + + checkAnswer( + sql(s"SELECT search_id, click_id FROM $db.t"), + Row("2", "12")) + + checkAnswer( + sql(s"SELECT search_id FROM $db.t"), + Row("2")) + + checkAnswer( + sql(s"SELECT dummy, click_id FROM $db.t"), + Row(null, "12")) + + checkAnswer( + sql(s"SELECT click_id, search_id, uid, dummy, ts, hour FROM $db.t"), + Row("12", "2", 12345, null, "98765", "01")) + } + } + } + } + } } From 6412ea1759d39a2380c572ec24cfd8ae4f2d81f7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 14 Oct 2017 00:35:12 +0800 Subject: [PATCH 06/10] [SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf ## What changes were proposed in this pull request? This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported. ```scala scala> sql("select named_struct('a', 1, 'A', 2).a").show +--------------------------+ |named_struct(a, 1, A, 2).a| +--------------------------+ | 1| +--------------------------+ scala> sql("select named_struct('a', 1, 'A', 2).A").show +--------------------------+ |named_struct(a, 1, A, 2).A| +--------------------------+ | 2| +--------------------------+ ``` And vice versa, with `spark.sql.caseSensitive=false`, the following is supported. ```scala scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show +--------------------+--------------------+ |named_struct(a, 1).A|named_struct(A, 1).a| +--------------------+--------------------+ | 1| 1| +--------------------+--------------------+ ``` However, types are considered different. For example, SET operations fail. ```scala scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct <> struct at the first column of the second table;; 'Union :- Project [named_struct(a, 1) AS named_struct(a, 1)#57] : +- OneRowRelation$ +- Project [named_struct(A, 2) AS named_struct(A, 2)#58] +- OneRowRelation$ ``` This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`. ```scala scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show +------------------+ |named_struct(a, 1)| +------------------+ | [1]| | [2]| +------------------+ ``` ## How was this patch tested? Pass the Jenkins with a newly add test case. Author: Dongjoon Hyun Closes #18460 from dongjoon-hyun/SPARK-21247. --- .../sql/catalyst/analysis/TypeCoercion.scala | 10 ++++ .../org/apache/spark/sql/types/DataType.scala | 7 ++- .../catalyst/analysis/TypeCoercionSuite.scala | 52 +++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 38 ++++++++++++++ 4 files changed, 102 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 9ffe646b5e4ec..532d22dbf2321 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -100,6 +100,16 @@ object TypeCoercion { case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) => Some(TimestampType) + case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) => + Some(StructType(fields1.zip(fields2).map { case (f1, f2) => + // Since `t1.sameType(t2)` is true, two StructTypes have the same DataType + // except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`. + // - Different names: use f1.name + // - Different nullabilities: `nullable` is true iff one of them is nullable. + val dataType = findTightestCommonType(f1.dataType, f2.dataType).get + StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable) + })) + case _ => None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 30745c6a9d42a..d6e0df12218ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** @@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType { * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). */ private[spark] def sameType(other: DataType): Boolean = - DataType.equalsIgnoreNullability(this, other) + if (SQLConf.get.caseSensitiveAnalysis) { + DataType.equalsIgnoreNullability(this, other) + } else { + DataType.equalsIgnoreCaseAndNullability(this, other) + } /** * Returns the same data type but set all nullability fields are true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index d62e3b6dfe34f..793e04f66f0f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest { widenFunc: (DataType, DataType) => Option[DataType], t1: DataType, t2: DataType, - expected: Option[DataType]): Unit = { + expected: Option[DataType], + isSymmetric: Boolean = true): Unit = { var found = widenFunc(t1, t2) assert(found == expected, s"Expected $expected as wider common type for $t1 and $t2, found $found") // Test both directions to make sure the widening is symmetric. - found = widenFunc(t2, t1) - assert(found == expected, - s"Expected $expected as wider common type for $t2 and $t1, found $found") + if (isSymmetric) { + found = widenFunc(t2, t1) + assert(found == expected, + s"Expected $expected as wider common type for $t2 and $t1, found $found") + } } test("implicit type cast - ByteType") { @@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest { widenTest(NullType, StructType(Seq()), Some(StructType(Seq()))) widenTest(StringType, MapType(IntegerType, StringType, true), None) widenTest(ArrayType(IntegerType), StructType(Seq()), None) + + widenTest( + StructType(Seq(StructField("a", IntegerType))), + StructType(Seq(StructField("b", IntegerType))), + None) + widenTest( + StructType(Seq(StructField("a", IntegerType, nullable = false))), + StructType(Seq(StructField("a", DoubleType, nullable = false))), + None) + + widenTest( + StructType(Seq(StructField("a", IntegerType, nullable = false))), + StructType(Seq(StructField("a", IntegerType, nullable = false))), + Some(StructType(Seq(StructField("a", IntegerType, nullable = false))))) + widenTest( + StructType(Seq(StructField("a", IntegerType, nullable = false))), + StructType(Seq(StructField("a", IntegerType, nullable = true))), + Some(StructType(Seq(StructField("a", IntegerType, nullable = true))))) + widenTest( + StructType(Seq(StructField("a", IntegerType, nullable = true))), + StructType(Seq(StructField("a", IntegerType, nullable = false))), + Some(StructType(Seq(StructField("a", IntegerType, nullable = true))))) + widenTest( + StructType(Seq(StructField("a", IntegerType, nullable = true))), + StructType(Seq(StructField("a", IntegerType, nullable = true))), + Some(StructType(Seq(StructField("a", IntegerType, nullable = true))))) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + widenTest( + StructType(Seq(StructField("a", IntegerType))), + StructType(Seq(StructField("A", IntegerType))), + None) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkWidenType( + TypeCoercion.findTightestCommonType, + StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))), + StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))), + Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))), + isSymmetric = false) + } } test("wider common type for decimal and array") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 93a7777b70b46..f0c58e2e5bf45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("SPARK-21247: Allow case-insensitive type equality in Set operation") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))") + sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))") + + withTable("t", "S") { + sql("CREATE TABLE t(c struct) USING parquet") + sql("CREATE TABLE S(C struct) USING parquet") + Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach { + case (left, right) => + checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty) + } + } + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val m1 = intercept[AnalysisException] { + sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))") + }.message + assert(m1.contains("Union can only be performed on tables with the compatible column types")) + + val m2 = intercept[AnalysisException] { + sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))") + }.message + assert(m2.contains("Except can only be performed on tables with the compatible column types")) + + withTable("t", "S") { + sql("CREATE TABLE t(c struct) USING parquet") + sql("CREATE TABLE S(C struct) USING parquet") + checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty) + val m = intercept[AnalysisException] { + sql("SELECT * FROM t, S WHERE c = C") + }.message + assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch")) + } + } + } + test("SPARK-21335: support un-aliased subquery") { withTempView("v") { Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v") From 3823dc88d3816c7d1099f9601426108acc90574c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 13 Oct 2017 10:49:48 -0700 Subject: [PATCH 07/10] [SPARK-22252][SQL][FOLLOWUP] Command should not be a LeafNode ## What changes were proposed in this pull request? This is a minor folllowup of #19474 . #19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method. This PR fixes this by making `Command` extend `LogicalPlan` ## How was this patch tested? N/A Author: Wenchen Fan Closes #19493 from cloud-fan/minor. --- .../org/apache/spark/sql/catalyst/plans/logical/Command.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala index 38f47081b6f55..ec5766e1f67f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute * commands can be used by parsers to represent DDL operations. Commands, unlike queries, are * eagerly executed. */ -trait Command extends LeafNode { +trait Command extends LogicalPlan { override def output: Seq[Attribute] = Seq.empty + override def children: Seq[LogicalPlan] = Seq.empty } From 1bb8b76045420b61a37806d6f4765af15c4052a7 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Fri, 13 Oct 2017 15:13:06 -0700 Subject: [PATCH 08/10] [MINOR][SS] keyWithIndexToNumValues" -> "keyWithIndexToValue" ## What changes were proposed in this pull request? This PR changes `keyWithIndexToNumValues` to `keyWithIndexToValue`. There will be directories on HDFS named with this `keyWithIndexToNumValues`. So if we ever want to fix this, let's fix it now. ## How was this patch tested? existing unit test cases. Author: Liwei Lin Closes #19435 from lw-lin/keyWithIndex. --- .../streaming/state/SymmetricHashJoinStateManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index d256fb578d921..6b386308c79fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -384,7 +384,7 @@ class SymmetricHashJoinStateManager( } /** A wrapper around a [[StateStore]] that stores [(key, index) -> value]. */ - private class KeyWithIndexToValueStore extends StateStoreHandler(KeyWithIndexToValuesType) { + private class KeyWithIndexToValueStore extends StateStoreHandler(KeyWithIndexToValueType) { private val keyWithIndexExprs = keyAttributes :+ Literal(1L) private val keyWithIndexSchema = keySchema.add("index", LongType) private val indexOrdinalInKeyWithIndexRow = keyAttributes.size @@ -471,7 +471,7 @@ class SymmetricHashJoinStateManager( object SymmetricHashJoinStateManager { def allStateStoreNames(joinSides: JoinSide*): Seq[String] = { - val allStateStoreTypes: Seq[StateStoreType] = Seq(KeyToNumValuesType, KeyWithIndexToValuesType) + val allStateStoreTypes: Seq[StateStoreType] = Seq(KeyToNumValuesType, KeyWithIndexToValueType) for (joinSide <- joinSides; stateStoreType <- allStateStoreTypes) yield { getStateStoreName(joinSide, stateStoreType) } @@ -483,8 +483,8 @@ object SymmetricHashJoinStateManager { override def toString(): String = "keyToNumValues" } - private case object KeyWithIndexToValuesType extends StateStoreType { - override def toString(): String = "keyWithIndexToNumValues" + private case object KeyWithIndexToValueType extends StateStoreType { + override def toString(): String = "keyWithIndexToValue" } private def getStateStoreName(joinSide: JoinSide, storeType: StateStoreType): String = { From 06df34d35ec088277445ef09cfb24bfe996f072e Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Fri, 13 Oct 2017 17:12:50 -0700 Subject: [PATCH 09/10] [SPARK-11034][LAUNCHER][MESOS] Launcher: add support for monitoring Mesos apps ## What changes were proposed in this pull request? Added Launcher support for monitoring Mesos apps in Client mode. SPARK-11033 can handle the support for Mesos/Cluster mode since the Standalone/Cluster and Mesos/Cluster modes use the same code at client side. ## How was this patch tested? I verified it manually by running launcher application, able to launch, stop and kill the mesos applications and also can invoke other launcher API's. Author: Devaraj K Closes #19385 from devaraj-kavali/SPARK-11034. --- .../MesosCoarseGrainedSchedulerBackend.scala | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 80c0a041b7322..603c980cb268d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskStat import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress @@ -89,6 +90,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Synchronization protected by stateLock private[this] var stopCalled: Boolean = false + private val launcherBackend = new LauncherBackend() { + override protected def onStopRequest(): Unit = { + stopSchedulerBackend() + setState(SparkAppHandle.State.KILLED) + } + } + // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -182,6 +190,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def start() { super.start() + if (sc.deployMode == "client") { + launcherBackend.connect() + } val startedBefore = IdHelper.startedBefore.getAndSet(true) val suffix = if (startedBefore) { @@ -202,6 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) + launcherBackend.setState(SparkAppHandle.State.SUBMITTED) startScheduler(driver) } @@ -295,15 +307,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( this.mesosExternalShuffleClient.foreach(_.init(appId)) this.schedulerDriver = driver markRegistered() + launcherBackend.setAppId(appId) + launcherBackend.setState(SparkAppHandle.State.RUNNING) } override def sufficientResourcesRegistered(): Boolean = { totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio } - override def disconnected(d: org.apache.mesos.SchedulerDriver) {} + override def disconnected(d: org.apache.mesos.SchedulerDriver) { + launcherBackend.setState(SparkAppHandle.State.SUBMITTED) + } - override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} + override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) { + launcherBackend.setState(SparkAppHandle.State.RUNNING) + } /** * Method called by Mesos to offer resources on slaves. We respond by launching an executor, @@ -611,6 +629,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } override def stop() { + stopSchedulerBackend() + launcherBackend.setState(SparkAppHandle.State.FINISHED) + launcherBackend.close() + } + + private def stopSchedulerBackend() { // Make sure we're not launching tasks during shutdown stateLock.synchronized { if (stopCalled) { From e3536406ec6ff65a8b41ba2f2fd40517a760cfd6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 13 Oct 2017 23:08:17 -0700 Subject: [PATCH 10/10] [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsTracker metrics collection fails if a new file isn't yet visible ## What changes were proposed in this pull request? `BasicWriteTaskStatsTracker.getFileSize()` to catch `FileNotFoundException`, log info and then return 0 as a file size. This ensures that if a newly created file isn't visible due to the store not always having create consistency, the metric collection doesn't cause the failure. ## How was this patch tested? New test suite included, `BasicWriteTaskStatsTrackerSuite`. This not only checks the resilience to missing files, but verifies the existing logic as to how file statistics are gathered. Note that in the current implementation 1. if you call `Tracker..getFinalStats()` more than once, the file size count will increase by size of the last file. This could be fixed by clearing the filename field inside `getFinalStats()` itself. 2. If you pass in an empty or null string to `Tracker.newFile(path)` then IllegalArgumentException is raised, but only in `getFinalStats()`, rather than in `newFile`. There's a test for this behaviour in the new suite, as it verifies that only FNFEs get swallowed. Author: Steve Loughran Closes #18979 from steveloughran/cloud/SPARK-21762-missing-files-in-metrics. --- .../datasources/BasicWriteStatsTracker.scala | 49 +++- .../BasicWriteTaskStatsTrackerSuite.scala | 220 ++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 8 + 3 files changed, 265 insertions(+), 12 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index b8f7d130d569f..11af0aaa7b206 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -44,20 +47,32 @@ case class BasicWriteTaskStats( * @param hadoopConf */ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) - extends WriteTaskStatsTracker { + extends WriteTaskStatsTracker with Logging { private[this] var numPartitions: Int = 0 private[this] var numFiles: Int = 0 + private[this] var submittedFiles: Int = 0 private[this] var numBytes: Long = 0L private[this] var numRows: Long = 0L - private[this] var curFile: String = null - + private[this] var curFile: Option[String] = None - private def getFileSize(filePath: String): Long = { + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { val path = new Path(filePath) val fs = path.getFileSystem(hadoopConf) - fs.getFileStatus(path).getLen() + try { + Some(fs.getFileStatus(path).getLen()) + } catch { + case e: FileNotFoundException => + // may arise against eventually consistent object stores + logDebug(s"File $path is not yet visible", e) + None + } } @@ -70,12 +85,19 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def newFile(filePath: String): Unit = { - if (numFiles > 0) { - // we assume here that we've finished writing to disk the previous file by now - numBytes += getFileSize(curFile) + statCurrentFile() + curFile = Some(filePath) + submittedFiles += 1 + } + + private def statCurrentFile(): Unit = { + curFile.foreach { path => + getFileSize(path).foreach { len => + numBytes += len + numFiles += 1 + } + curFile = None } - curFile = filePath - numFiles += 1 } override def newRow(row: InternalRow): Unit = { @@ -83,8 +105,11 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def getFinalStats(): WriteTaskStats = { - if (numFiles > 0) { - numBytes += getFileSize(curFile) + statCurrentFile() + if (submittedFiles != numFiles) { + logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " + + "This could be due to the output format not writing empty files, " + + "or files being not immediately visible in the filesystem.") } BasicWriteTaskStats(numPartitions, numFiles, numBytes, numRows) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala new file mode 100644 index 0000000000000..bf3c8ede9a980 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.nio.charset.Charset + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Test how BasicWriteTaskStatsTracker handles files. + * + * Two different datasets are written (alongside 0), one of + * length 10, one of 3. They were chosen to be distinct enough + * that it is straightforward to determine which file lengths were added + * from the sum of all files added. Lengths like "10" and "5" would + * be less informative. + */ +class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { + + private val tempDir = Utils.createTempDir() + private val tempDirPath = new Path(tempDir.toURI) + private val conf = new Configuration() + private val localfs = tempDirPath.getFileSystem(conf) + private val data1 = "0123456789".getBytes(Charset.forName("US-ASCII")) + private val data2 = "012".getBytes(Charset.forName("US-ASCII")) + private val len1 = data1.length + private val len2 = data2.length + + /** + * In teardown delete the temp dir. + */ + protected override def afterAll(): Unit = { + Utils.deleteRecursively(tempDir) + } + + /** + * Assert that the stats match that expected. + * @param tracker tracker to check + * @param files number of files expected + * @param bytes total number of bytes expected + */ + private def assertStats( + tracker: BasicWriteTaskStatsTracker, + files: Int, + bytes: Int): Unit = { + val stats = finalStatus(tracker) + assert(files === stats.numFiles, "Wrong number of files") + assert(bytes === stats.numBytes, "Wrong byte count of file size") + } + + private def finalStatus(tracker: BasicWriteTaskStatsTracker): BasicWriteTaskStats = { + tracker.getFinalStats().asInstanceOf[BasicWriteTaskStats] + } + + test("No files in run") { + val tracker = new BasicWriteTaskStatsTracker(conf) + assertStats(tracker, 0, 0) + } + + test("Missing File") { + val missing = new Path(tempDirPath, "missing") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(missing.toString) + assertStats(tracker, 0, 0) + } + + test("Empty filename is forwarded") { + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile("") + intercept[IllegalArgumentException] { + finalStatus(tracker) + } + } + + test("Null filename is only picked up in final status") { + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(null) + intercept[IllegalArgumentException] { + finalStatus(tracker) + } + } + + test("0 byte file") { + val file = new Path(tempDirPath, "file0") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + touch(file) + assertStats(tracker, 1, 0) + } + + test("File with data") { + val file = new Path(tempDirPath, "file-with-data") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + write1(file) + assertStats(tracker, 1, len1) + } + + test("Open file") { + val file = new Path(tempDirPath, "file-open") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file.toString) + val stream = localfs.create(file, true) + try { + assertStats(tracker, 1, 0) + stream.write(data1) + stream.flush() + assert(1 === finalStatus(tracker).numFiles, "Wrong number of files") + } finally { + stream.close() + } + } + + test("Two files") { + val file1 = new Path(tempDirPath, "f-2-1") + val file2 = new Path(tempDirPath, "f-2-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file1.toString) + write1(file1) + tracker.newFile(file2.toString) + write2(file2) + assertStats(tracker, 2, len1 + len2) + } + + test("Three files, last one empty") { + val file1 = new Path(tempDirPath, "f-3-1") + val file2 = new Path(tempDirPath, "f-3-2") + val file3 = new Path(tempDirPath, "f-3-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file1.toString) + write1(file1) + tracker.newFile(file2.toString) + write2(file2) + tracker.newFile(file3.toString) + touch(file3) + assertStats(tracker, 3, len1 + len2) + } + + test("Three files, one not found") { + val file1 = new Path(tempDirPath, "f-4-1") + val file2 = new Path(tempDirPath, "f-4-2") + val file3 = new Path(tempDirPath, "f-3-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + // file 1 + tracker.newFile(file1.toString) + write1(file1) + + // file 2 is noted, but not created + tracker.newFile(file2.toString) + + // file 3 is noted & then created + tracker.newFile(file3.toString) + write2(file3) + + // the expected size is file1 + file3; only two files are reported + // as found + assertStats(tracker, 2, len1 + len2) + } + + /** + * Write a 0-byte file. + * @param file file path + */ + private def touch(file: Path): Unit = { + localfs.create(file, true).close() + } + + /** + * Write a byte array. + * @param file path to file + * @param data data + * @return bytes written + */ + private def write(file: Path, data: Array[Byte]): Integer = { + val stream = localfs.create(file, true) + try { + stream.write(data) + } finally { + stream.close() + } + data.length + } + + /** + * Write a data1 array. + * @param file file + */ + private def write1(file: Path): Unit = { + write(file, data1) + } + + /** + * Write a data2 array. + * + * @param file file + */ + private def write2(file: Path): Unit = { + write(file, data2) + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 94fa43dec7313..60935c3e85c43 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2110,4 +2110,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet", "csv", "json", "text").foreach { format => + test(s"Writing empty datasets should not fail - $format") { + withTempDir { dir => + Seq("str").toDS.limit(0).write.format(format).save(dir.getCanonicalPath + "/tmp") + } + } + } }