Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1597,7 +1597,19 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic
*/
object EmbedSerializerInFilter extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject)) =>
case s @ SerializeFromObject(_, Filter(condition, d: DeserializeToObject))
// SPARK-15632: Conceptually, filter operator should never introduce schema change. This
// optimization rule also relies on this assumption. However, Dataset typed filter operator
// does introduce schema changes in some cases. Thus, we only enable this optimization when
//
// 1. either input and output schemata are exactly the same, or
// 2. both input and output schemata are single-field schema and share the same type.
//
// The 2nd case is included because encoders for primitive types always have only a single
// field with hard-coded field name "value".
// TODO Cleans this up after fixing SPARK-15632.
if s.schema == d.child.schema || samePrimitiveType(s.schema, d.child.schema) =>

val numObjects = condition.collect {
case a: Attribute if a == d.output.head => a
}.length
Expand All @@ -1622,6 +1634,13 @@ object EmbedSerializerInFilter extends Rule[LogicalPlan] {
Project(objAttrs, filter)
}
}

def samePrimitiveType(lhs: StructType, rhs: StructType): Boolean = {
(lhs, rhs) match {
case (StructType(Array(f1)), StructType(Array(f2))) => f1.dataType == f2.dataType
case _ => false
}
}
}

/**
Expand Down
16 changes: 15 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val dataset = Seq(1, 2, 3).toDS()
dataset.createOrReplaceTempView("tempView")

// Overrrides the existing temporary view with same name
// Overrides the existing temporary view with same name
// No exception should be thrown here.
dataset.createOrReplaceTempView("tempView")

Expand Down Expand Up @@ -763,6 +763,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {

checkShowString(ds, expected)
}

test(
"SPARK-15112: EmbedDeserializerInFilter should not optimize plan fragment that changes schema"
) {
val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData]

assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) {
ds.collect().toSeq
}

assertResult(Seq(ClassData("bar", 2))) {
ds.filter(_.b > 1).collect().toSeq
}
}
}

case class Generic[T](id: T, value: Double)
Expand Down