Skip to content

Commit

Permalink
[SPARK-27871][SQL] LambdaVariable should use per-query unique IDs ins…
Browse files Browse the repository at this point in the history
…tead of globally unique IDs

## What changes were proposed in this pull request?

For simplicity, all `LambdaVariable`s are globally unique, to avoid any potential conflicts. However, this causes a perf problem: we can never hit codegen cache for encoder expressions that deal with collections (which means they contain `LambdaVariable`).

To overcome this problem, `LambdaVariable` should have per-query unique IDs. This PR does 2 things:
1. refactor `LambdaVariable` to carry an ID, so that it's easier to change the ID.
2. add an optimizer rule to reassign `LambdaVariable` IDs, which are per-query unique.

## How was this patch tested?

new tests

Closes #24735 from cloud-fan/dataset.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jun 27, 2019
1 parent a7e1619 commit cded421
Show file tree
Hide file tree
Showing 12 changed files with 342 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag}

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection, WalkedTypePath}
import org.apache.spark.sql.catalyst.{InternalRow, JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, GetColumnByOrdinal, SimpleAnalyzer, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation}
import org.apache.spark.sql.catalyst.optimizer.{ReassignLambdaVariableID, SimplifyCasts}
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LeafNode, LocalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -301,13 +301,25 @@ case class ExpressionEncoder[T](
}

@transient
private lazy val extractProjection = GenerateUnsafeProjection.generate(serializer)
private lazy val extractProjection = GenerateUnsafeProjection.generate({
// When using `ExpressionEncoder` directly, we will skip the normal query processing steps
// (analyzer, optimizer, etc.). Here we apply the ReassignLambdaVariableID rule, as it's
// important to codegen performance.
val optimizedPlan = ReassignLambdaVariableID.apply(DummyExpressionHolder(serializer))
optimizedPlan.asInstanceOf[DummyExpressionHolder].exprs
})

@transient
private lazy val inputRow = new GenericInternalRow(1)

@transient
private lazy val constructProjection = SafeProjection.create(deserializer :: Nil)
private lazy val constructProjection = SafeProjection.create({
// When using `ExpressionEncoder` directly, we will skip the normal query processing steps
// (analyzer, optimizer, etc.). Here we apply the ReassignLambdaVariableID rule, as it's
// important to codegen performance.
val optimizedPlan = ReassignLambdaVariableID.apply(DummyExpressionHolder(Seq(deserializer)))
optimizedPlan.asInstanceOf[DummyExpressionHolder].exprs
})

/**
* Returns a new set (with unique ids) of [[NamedExpression]] that represent the serialized form
Expand Down Expand Up @@ -371,3 +383,9 @@ case class ExpressionEncoder[T](

override def toString: String = s"class[$schemaString]"
}

// A dummy logical plan that can hold expressions and go through optimizer rules.
case class DummyExpressionHolder(exprs: Seq[Expression]) extends LeafNode {
override lazy val resolved = true
override def output: Seq[Attribute] = Nil
}
Loading

0 comments on commit cded421

Please sign in to comment.