Skip to content

Commit

Permalink
Generate a call to specialized method for Dataset.filter()
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Mar 10, 2017
1 parent 1fb2933 commit 35ba2c6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

object CatalystSerde {
def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {
Expand Down Expand Up @@ -210,13 +211,48 @@ case class TypedFilter(
def typedCondition(input: Expression): Expression = {
val (funcClass, methodName) = func match {
case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call"
case _ => classOf[Any => Boolean] -> "apply"
case _ => FunctionUtils.getFunctionOneName(BooleanType, input.dataType)
}
val funcObj = Literal.create(func, ObjectType(funcClass))
Invoke(funcObj, methodName, BooleanType, input :: Nil)
}
}

object FunctionUtils {
private def getMethodType(dt: DataType, isOutput: Boolean): Option[String] = {
dt match {
case BooleanType if isOutput => Some("Z")
case IntegerType => Some("I")
case LongType => Some("J")
case FloatType => Some("F")
case DoubleType => Some("D")
case _ => None
}
}

def getFunctionOneName(outputDT: DataType, inputDT: DataType): (Class[_], String) = {
// load "scala.Function1" using Java API to avoid requirements of type parameters
Utils.classForName("scala.Function1") -> {
// if a pair of an argument and return types is one of specific types
// whose specialized method (apply$mc..$sp) is generated by scalac,
// Catalyst generated a direct method call to the specialized method.
// The followings are references for this specialization:
// http://www.scala-lang.org/api/2.12.0/scala/Function1.html
// https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/
// SpecializeTypes.scala
// http://www.cakesolutions.net/teamblogs/scala-dissection-functions
// http://axel22.github.io/2013/11/03/specialization-quirks.html
val inputType = getMethodType(inputDT, false)
val outputType = getMethodType(outputDT, true)
if (inputType.isDefined && outputType.isDefined) {
s"apply$$mc${outputType.get}${inputType.get}$$sp"
} else {
"apply"
}
}
}
}

/** Factory for constructing new `AppendColumn` nodes. */
object AppendColumns {
def apply[T : Encoder, U : Encoder](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.logical.FunctionUtils
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
Expand Down Expand Up @@ -217,39 +218,10 @@ case class MapElementsExec(
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

private def getMethodType(dt: DataType, isOutput: Boolean): String = {
dt match {
case BooleanType if isOutput => "Z"
case IntegerType => "I"
case LongType => "J"
case FloatType => "F"
case DoubleType => "D"
case _ => null
}
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val (funcClass, methodName) = func match {
case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call"
// load "scala.Function1" using Java API to avoid requirements of type parameters
case _ => Utils.classForName("scala.Function1") -> {
// if a pair of an argument and return types is one of specific types
// whose specialized method (apply$mc..$sp) is generated by scalac,
// Catalyst generated a direct method call to the specialized method.
// The followings are references for this specialization:
// http://www.scala-lang.org/api/2.12.0/scala/Function1.html
// https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/
// SpecializeTypes.scala
// http://www.cakesolutions.net/teamblogs/scala-dissection-functions
// http://axel22.github.io/2013/11/03/specialization-quirks.html
val inputType = getMethodType(child.output(0).dataType, false)
val outputType = getMethodType(outputObjAttr.dataType, true)
if (inputType != null && outputType != null) {
s"apply$$mc$outputType$inputType$$sp"
} else {
"apply"
}
}
case _ => FunctionUtils.getFunctionOneName(outputObjAttr.dataType, child.output(0).dataType)
}
val funcObj = Literal.create(func, ObjectType(funcClass))
val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output)
Expand Down

0 comments on commit 35ba2c6

Please sign in to comment.