Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cff0ff5
enable adaptive query execution default
Dec 9, 2019
5d44f3e
fix the failed unit tests
Dec 26, 2019
626a448
fix ALSSuite ut
Dec 26, 2019
82973df
fix the NPE in HiveCompatibilitySuite.semijoin
Dec 27, 2019
3b354ac
fix the failed ut and resolve the comments
Dec 29, 2019
6a3e12d
fix compile error
Dec 29, 2019
ef2e571
code style
Dec 29, 2019
afbc4c1
fix failed uts
Dec 29, 2019
f8a9cc0
fix ALSSuite
Dec 29, 2019
b222471
disable aqe in SQLQueryTestSuite
Dec 29, 2019
55c3db5
disable aqe in SQLMetricsSuite
Dec 30, 2019
07e7fb3
resolve the comments
Dec 30, 2019
08828b4
resolve the comments and fix the failed ut
Dec 31, 2019
da7bd9f
disable LocalReader when repartition
Jan 3, 2020
19748db
fix the failed unit test
Jan 4, 2020
b3ebac8
fix the failed pyspark ut and resolve the comments
Jan 7, 2020
c21d0db
rebase and remove the unnecessary import
Jan 7, 2020
a91d719
compile error
Jan 7, 2020
7c50070
resolve the compile error in pyspark
Jan 7, 2020
115f940
resolve the comments
Jan 7, 2020
ceb72c9
fix the failed unit test and resolve the comments
Jan 8, 2020
920de79
import order issue
Jan 8, 2020
1915f82
fix the failed unit test and resolve the comments
Jan 9, 2020
d6bb22a
resolve comment and fix compile issue
Jan 9, 2020
6e24639
update the check in containSubQuery
Jan 9, 2020
98d19b4
fix compile issue
Jan 9, 2020
987395c
fix the compile error and resolve commtnes
Jan 9, 2020
9ad3a79
change the order of case match in containShuffle
Jan 9, 2020
9478a89
remove the change in AdaptiveQueryExecSuite
Jan 9, 2020
3554cae
fix the failed ut
Jan 9, 2020
fa0d1be
fix the column.py
Jan 10, 2020
ffe8e3e
fix the failed python unit tests
Jan 11, 2020
253e6fa
compile error
Jan 11, 2020
c559b80
python code style
Jan 11, 2020
5d4152f
check style
Jan 11, 2020
5d1af66
fix the failed test in window.py
Jan 11, 2020
82a8906
small fix
Jan 11, 2020
18e00de
fix the failed sparkR unit test
Jan 12, 2020
8b5e744
disable AQE
Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_mllib_recommendation.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ test_that("spark.als", {
stats <- summary(model)
expect_equal(stats$rank, 10)
test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item"))
predictions <- collect(predict(model, test))
result <- predict(model, test)
predictions <- collect(arrange(result, desc(result$item), result$user))

expect_equal(predictions$prediction, c(0.6324540, 3.6218479, -0.4568263),
tolerance = 1e-4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,12 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
(ex, act) =>
ex.userFactors.first().getSeq[Float](1) === act.userFactors.first().getSeq[Float](1)
} { (ex, act, df, enc) =>
// With AQE on/off, the order of result may be different. Here sortby the result.
val expected = ex.transform(df).selectExpr("prediction")
.first().getFloat(0)
.sort("prediction").first().getFloat(0)
testTransformerByGlobalCheckFunc(df, act, "prediction") {
case rows: Seq[Row] =>
expected ~== rows.head.getFloat(0) absTol 1e-6
expected ~== rows.sortBy(_.getFloat(0)).head.getFloat(0) absTol 1e-6
}(enc)
}
}
Expand Down Expand Up @@ -696,7 +697,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
val model = als.fit(df)
def testTransformIdExceedsIntRange[A : Encoder](dataFrame: DataFrame): Unit = {
val e1 = intercept[SparkException] {
model.transform(dataFrame).first
model.transform(dataFrame).collect()
}
TestUtils.assertExceptionMsg(e1, msg)
val e2 = intercept[StreamingQueryException] {
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,9 @@ def over(self, window):
>>> window = Window.partitionBy("name").orderBy("age") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
>>> from pyspark.sql.functions import rank, min
>>> from pyspark.sql.functions import desc
>>> df.withColumn("rank", rank().over(window)) \
.withColumn("min", min('age').over(window)).show()
.withColumn("min", min('age').over(window)).sort(desc("age")).show()
+---+-----+----+---+
|age| name|rank|min|
+---+-----+----+---+
Expand Down
12 changes: 7 additions & 5 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,8 @@ def alias(self, alias):
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
>>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age") \
.sort(desc("df_as1.name")).collect()
[Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)]
"""
assert isinstance(alias, basestring), "alias should be a string"
Expand Down Expand Up @@ -1057,11 +1058,12 @@ def join(self, other, on=None, how=None):
``anti``, ``leftanti`` and ``left_anti``.

The following performs a full outer join between ``df1`` and ``df2``.
>>> from pyspark.sql.functions import desc
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height) \
.sort(desc("name")).collect()
[Row(name=u'Bob', height=85), Row(name=u'Alice', height=None), Row(name=None, height=80)]

>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]

>>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
>>> df.join(df2, 'name', 'outer').select('name', 'height').sort(desc("name")).collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]

>>> cond = [df.name == df3.name, df.age == df3.age]
Expand Down
26 changes: 14 additions & 12 deletions python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,18 @@ def test_complex_groupby(self):
expected4 = df.groupby(plus_one(df.id)).agg(sum(df.v))

# groupby one scalar pandas UDF
result5 = df.groupby(plus_two(df.id)).agg(sum_udf(df.v))
expected5 = df.groupby(plus_two(df.id)).agg(sum(df.v))
result5 = df.groupby(plus_two(df.id)).agg(sum_udf(df.v)).sort('sum(v)')
expected5 = df.groupby(plus_two(df.id)).agg(sum(df.v)).sort('sum(v)')

# groupby one expression and one python UDF
result6 = df.groupby(df.v % 2, plus_one(df.id)).agg(sum_udf(df.v))
expected6 = df.groupby(df.v % 2, plus_one(df.id)).agg(sum(df.v))

# groupby one expression and one scalar pandas UDF
result7 = df.groupby(df.v % 2, plus_two(df.id)).agg(sum_udf(df.v)).sort('sum(v)')
expected7 = df.groupby(df.v % 2, plus_two(df.id)).agg(sum(df.v)).sort('sum(v)')
result7 = (df.groupby(df.v % 2, plus_two(df.id))
.agg(sum_udf(df.v)).sort(['sum(v)', 'plus_two(id)']))
expected7 = (df.groupby(df.v % 2, plus_two(df.id))
.agg(sum(df.v)).sort(['sum(v)', 'plus_two(id)']))

assert_frame_equal(expected1.toPandas(), result1.toPandas())
assert_frame_equal(expected2.toPandas(), result2.toPandas())
Expand All @@ -354,8 +356,8 @@ def test_complex_expressions(self):
sum_udf(col('v2')) + 5,
plus_one(sum_udf(col('v1'))),
sum_udf(plus_one(col('v2'))))
.sort('id')
.toPandas())
.sort(['id', '(v % 2)'])
.toPandas().sort_index(by=['id', '(v % 2)']))

expected1 = (df.withColumn('v1', df.v + 1)
.withColumn('v2', df.v + 2)
Expand All @@ -365,8 +367,8 @@ def test_complex_expressions(self):
sum(col('v2')) + 5,
plus_one(sum(col('v1'))),
sum(plus_one(col('v2'))))
.sort('id')
.toPandas())
.sort(['id', '(v % 2)'])
.toPandas().sort_index(by=['id', '(v % 2)']))

# Test complex expressions with sql expression, scala pandas UDF and
# group aggregate pandas UDF
Expand All @@ -378,8 +380,8 @@ def test_complex_expressions(self):
sum_udf(col('v2')) + 5,
plus_two(sum_udf(col('v1'))),
sum_udf(plus_two(col('v2'))))
.sort('id')
.toPandas())
.sort(['id', '(v % 2)'])
.toPandas().sort_index(by=['id', '(v % 2)']))

expected2 = (df.withColumn('v1', df.v + 1)
.withColumn('v2', df.v + 2)
Expand All @@ -389,8 +391,8 @@ def test_complex_expressions(self):
sum(col('v2')) + 5,
plus_two(sum(col('v1'))),
sum(plus_two(col('v2'))))
.sort('id')
.toPandas())
.sort(['id', '(v % 2)'])
.toPandas().sort_index(by=['id', '(v % 2)']))

# Test sequential groupby aggregate
result3 = (df.groupby('id')
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ def registerJavaUDAF(self, name, javaClassName):
>>> spark.udf.registerJavaUDAF("javaUDAF", "test.org.apache.spark.sql.MyDoubleAvg")
>>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "a")],["id", "name"])
>>> df.createOrReplaceTempView("df")
>>> spark.sql("SELECT name, javaUDAF(id) as avg from df group by name").collect()
>>> spark.sql("SELECT name, javaUDAF(id) as avg from df group by name order by name desc") \
.collect()
[Row(name=u'b', avg=102.0), Row(name=u'a', avg=102.0)]
"""

Expand Down
16 changes: 8 additions & 8 deletions python/pyspark/sql/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ def rowsBetween(start, end):
>>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
>>> df = sqlContext.createDataFrame(tup, ["id", "category"])
>>> window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).show()
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| b| 3|
| 2| b| 5|
| 3| b| 3|
| 1| a| 2|
| 1| a| 3|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+

:param start: boundary start, inclusive.
Expand Down Expand Up @@ -168,16 +168,16 @@ def rangeBetween(start, end):
>>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
>>> df = sqlContext.createDataFrame(tup, ["id", "category"])
>>> window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
>>> df.withColumn("sum", func.sum("id").over(window)).show()
>>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()
+---+--------+---+
| id|category|sum|
+---+--------+---+
| 1| b| 3|
| 2| b| 5|
| 3| b| 3|
| 1| a| 4|
| 1| a| 4|
| 1| b| 3|
| 2| a| 2|
| 2| b| 5|
| 3| b| 3|
+---+--------+---+

:param start: boundary start, inclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -80,12 +81,20 @@ class CacheManager extends Logging {
} else {
val sparkSession = query.sparkSession
val qe = sparkSession.sessionState.executePlan(planToCache)
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
qe.executedPlan,
tableName,
optimizedPlan = qe.optimizedPlan)
val originalValue = sparkSession.sessionState.conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
val inMemoryRelation = try {
// Avoiding changing the output partitioning, here disable AQE.
sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
qe.executedPlan,
tableName,
optimizedPlan = qe.optimizedPlan)
} finally {
sparkSession.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, originalValue)
}

this.synchronized {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Data has already been cached.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ case class AdaptiveSparkPlanExec(
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(
new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
new SparkException(s"Failed to materialize query stage: ${stage.treeString}." +
s" and the cause is ${ex.getMessage}", ex))
}

// In case of errors, we cancel all running stages and throw exception.
Expand Down Expand Up @@ -506,7 +507,8 @@ case class AdaptiveSparkPlanExec(
}
} finally {
val ex = new SparkException(
"Adaptive execution failed due to stage materialization failures.", errors.head)
"Adaptive execution failed due to stage materialization failures." +
s" and the cause is ${errors.head.getMessage}", errors.head)
errors.tail.foreach(ex.addSuppressed)
cancelErrors.foreach(ex.addSuppressed)
throw ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ trait AdaptiveSparkPlanHelper {
* node in this tree in a preorder traversal.
* @param f the function to be applied.
*/
def map[A](p: SparkPlan)(f: SparkPlan => A): Seq[A] = {
def mapPlans[A](p: SparkPlan)(f: SparkPlan => A): Seq[A] = {
val ret = new collection.mutable.ArrayBuffer[A]()
foreach(p)(ret += f(_))
ret
Expand Down Expand Up @@ -127,4 +127,12 @@ trait AdaptiveSparkPlanHelper {
case s: QueryStageExec => Seq(s.plan)
case _ => p.children
}
}

/**
* Strip the executePlan of AdaptiveSparkPlanExec leaf node.
*/
def stripAQEPlan(p: SparkPlan): SparkPlan = p match {
case a: AdaptiveSparkPlanExec => a.executedPlan
case other => other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.internal.SQLConf
case class DemoteBroadcastHashJoin(conf: SQLConf) extends Rule[LogicalPlan] {

private def shouldDemote(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined =>
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.isDefined
&& stage.resultOption.get != null =>
val mapOutputStatistics = stage.resultOption.get.asInstanceOf[MapOutputStatistics]
val partitionCnt = mapOutputStatistics.bytesByPartitionId.length
val nonZeroCnt = mapOutputStatistics.bytesByPartitionId.count(_ > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package org.apache.spark.sql.execution.adaptive
import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, DynamicPruningSubquery, ListQuery, Literal}
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, DynamicPruningSubquery, ListQuery, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -39,11 +41,26 @@ case class InsertAdaptiveSparkPlan(

private val conf = adaptiveExecutionContext.session.sessionState.conf

def containShuffle(plan: SparkPlan): Boolean = {
plan.find {
case _: Exchange => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember which case would fail because of this? We should not see an Exchange without a logical link in sparkPlan, right? If there is one, we need to find out the reason and fix it.

Copy link
Contributor

@cloud-fan cloud-fan Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not about "no logical plan link", it's just: we should do AQE if there are already exchanges in the physical plan before EnsureRequirements

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's test failure related, we need to look into it, otherwise can we pull these changes out into a separate PR, and control the bypass with a flag, so at least we can still cover this code path in testing?

case s: SparkPlan => !s.requiredChildDistribution.forall(_ == UnspecifiedDistribution)
}.isDefined
}

def containSubQuery(plan: SparkPlan): Boolean = {
plan.find(_.expressions.exists(_.find {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is absolutely ridiculous to do so here. We deliberately added SubqueryAdaptiveNotSupportedException to make sure that it's all-in or all-out for the main query and all its subqueries in order to ensure subquery reuse in AQE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still all-in or all-out. The complete check is isSubquery || containShuffle(plan) || containSubQuery(plan), so we only call containSubQuery for the main query.

case _: SubqueryExpression => true
case _ => false
}.isDefined)).isDefined
}

override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
case _: ExecutedCommandExec => plan
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) =>
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan)
&& (isSubquery || containShuffle(plan) || containSubQuery(plan)) =>
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
// back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ object OptimizeLocalShuffleReader {
}
}

def canUseLocalShuffleReader(plan: SparkPlan): Boolean = {
plan.isInstanceOf[ShuffleQueryStageExec] ||
plan.isInstanceOf[CoalescedShuffleReaderExec]
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions
case CoalescedShuffleReaderExec(s: ShuffleQueryStageExec, _) => s.shuffle.canChangeNumPartitions
case _ => false
}
}

Expand Down
3 changes: 3 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/explain.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
--SET spark.sql.codegen.wholeStage = true
--SET spark.sql.adaptive.enabled = false

-- Test tables
CREATE table explain_temp1 (key int, val int) USING PARQUET;
CREATE table explain_temp2 (key int, val int) USING PARQUET;
Expand Down
Loading