From 5be1df1215299758d6e684ec9a5f6ba12693280e Mon Sep 17 00:00:00 2001 From: Ajith Date: Thu, 30 Aug 2018 09:11:36 +0530 Subject: [PATCH 1/2] [SPARK-25276] Redundant constrains when using alias --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../sql/catalyst/plans/ConstraintPropagationSuite.scala | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0e4456ac0e6a9..e9dc77588ae78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -162,7 +162,7 @@ abstract class UnaryNode extends LogicalPlan { allConstraints += EqualTo(a.toAttribute, l) case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - allConstraints ++= allConstraints.map(_ transform { + allConstraints = allConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 5ad748b6113d6..102720610d57f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -135,9 +135,8 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), - resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), - resolveColumn(aliasedRelation.analyze, "z") > 10, - IsNotNull(resolveColumn(aliasedRelation.analyze, "z"))))) + resolveColumn(aliasedRelation.analyze, "z") <=> + resolveColumn(aliasedRelation.analyze, "x")))) val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) verifyConstraints(multiAlias.analyze.constraints, From fcee767aad6b84bedba767c9deb2bf1b7fd44b01 Mon Sep 17 00:00:00 2001 From: Ajith Date: Mon, 10 Sep 2018 11:44:10 +0530 Subject: [PATCH 2/2] Added test --- .../org/apache/spark/sql/SQLQuerySuite.scala | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 01dc28d70184e..16b14b1e070ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -22,12 +22,13 @@ import java.net.{MalformedURLException, URL} import java.sql.Timestamp import java.util.concurrent.atomic.AtomicBoolean +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} + import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -35,8 +36,9 @@ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -class SQLQuerySuite extends QueryTest with SharedSQLContext { +class SQLQuerySuite extends QueryTest with SharedSQLContext with TimeLimits { import testImplicits._ + implicit val defaultSignaler: Signaler = ThreadSignaler setupTestData() @@ -2858,6 +2860,53 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val result = ds.flatMap(_.bar).distinct result.rdd.isEmpty } + + test("SPARK-25276: Redundant constrains when using alias to avoid GC overhead exception") { + import org.scalatest.time.SpanSugar._ + failAfter(30 seconds) { + withDatabase("spark_25276") { + sql("create database spark_25276") + sql("use spark_25276") + + val columnCount = 20 + val tables = Array("tableSpark_25276_0", "tableSpark_25276_1", "tableSpark_25276_2", + "tableSpark_25276_3") + + // drop tables + tables.foreach(table => sql(s"drop table if exists $table")) + + assert(sql("show tables like 'tableSpark_25276*'").count === 0) + + // create table0 + sql(s"create table ${tables(0)} (key_id string) using parquet") + + // create table1 + val table1Columns = (1 to columnCount).map(index => s"a$index string").mkString(",") + sql(s"create table ${tables(1)} ($table1Columns) using parquet") + + val otherColumns = (1 to columnCount).map(index => + s"case when a$index is null then '' else cast (a$index as string) end as a$index") + .mkString(",") + val keyColumnSelection = (1 to columnCount).map(index => + s"case when a$index is null then '' else cast (a$index as string) end , '|~|'") + .mkString(",") + val keyCol = s"concat($keyColumnSelection)) as key_id" + val joinCondition = "left join %s B on A.key_id = B.key_id where B.key_id is null" + + // create table2 + val createTable2 = "create table %s using parquet as select A.* from " + + s"(select $otherColumns, ($keyCol from %s ) A $joinCondition" + sql(String.format(createTable2, tables(2), tables(1), tables(0))) + + // create table3 + val createTable3 = "create table %s using parquet as select A.* from " + + s"(select ($keyCol , $otherColumns from %s ) A $joinCondition" + sql(String.format(createTable3, tables(3), tables(1), tables(0))) + + assert(sql("show tables like 'tableSpark_25276*'").count === 4) + } + } + } } case class Foo(bar: Option[String])