Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Apr 23, 2018
1 parent 10d89ff commit eaef6b3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,7 @@ object JavaTypeInference {
}
}

private[catalyst] def serializerFor(
inputObject: Expression,
typeToken: TypeToken[_]): Expression = {
private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = {

def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = {
val (dataType, nullable) = inferDataType(elementType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Random

import com.google.common.reflect.TypeToken

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.{RandomDataGenerator, Row}
Expand Down Expand Up @@ -475,8 +474,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkObjectExprEvaluation(deserializer, expected = data)
}

<<<<<<< f3c35bf1540eeb90bd0e75aa34139333ede280c4
<<<<<<< c48085aa91c60615a4de3b391f019f46f3fcdbe3
test("SPARK-23595 ValidateExternalType should support interpreted execution") {
val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true)
Seq(
Expand Down Expand Up @@ -506,10 +503,62 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"java.lang.Integer is not a valid external type for schema of double")
}

private def javaSerializerFor(beanClass: Class[_])(inputObject: Expression): CreateNamedStruct = {
JavaTypeInference.serializerFor(inputObject, TypeToken.of(beanClass)) match {
case e => CreateNamedStruct(Literal("value") :: e :: Nil)
private def javaMapSerializerFor(
keyClazz: Class[_],
valueClazz: Class[_])(inputObject: Expression): Expression = {

def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match {
case c if c == classOf[java.lang.Integer] =>
Invoke(inputObject, "intValue", IntegerType)
case c if c == classOf[java.lang.String] =>
StaticInvoke(
classOf[UTF8String],
StringType,
"fromString",
inputObject :: Nil,
returnNullable = false)
}

ExternalMapToCatalyst(
inputObject,
ObjectType(keyClazz),
kvSerializerFor(_, keyClazz),
keyNullable = true,
ObjectType(valueClazz),
kvSerializerFor(_, valueClazz),
valueNullable = true
)
}

private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
import org.apache.spark.sql.catalyst.ScalaReflection._

val curId = new java.util.concurrent.atomic.AtomicInteger()

def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
localTypeOf[V].dealias match {
case t if t <:< localTypeOf[java.lang.Integer] =>
Invoke(inputObject, "intValue", IntegerType)
case t if t <:< localTypeOf[String] =>
StaticInvoke(
classOf[UTF8String],
StringType,
"fromString",
inputObject :: Nil,
returnNullable = false)
case _ =>
inputObject
}

ExternalMapToCatalyst(
inputObject,
dataTypeFor[T],
kvSerializerFor[T],
keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive,
dataTypeFor[U],
kvSerializerFor[U],
valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive
)
}

test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") {
Expand All @@ -526,14 +575,12 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val expected = CatalystTypeConverters.convertToCatalyst(scalaMap)

// Java Map
val serializer1 = GetStructField(
javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMap)), 0)
val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
Literal.fromObject(javaMap))
checkEvaluation(serializer1, expected)

// Scala Map
val serializer2 = GetStructField(
ScalaReflection.serializerFor[scala.collection.Map[Int, String]](
Literal.fromObject(scalaMap)), 0)
val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap))
checkEvaluation(serializer2, expected)

// NULL key test
Expand All @@ -547,15 +594,15 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}

// Java Map
val serializer3 = GetStructField(
javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMapHasNullKey)), 0)
val serializer3 =
javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
Literal.fromObject(javaMapHasNullKey))
checkExceptionInExpression[RuntimeException](
serializer3, EmptyRow, "Cannot use null as map key!")

// Scala Map
val serializer4 = GetStructField(
ScalaReflection.serializerFor[scala.collection.Map[java.lang.Integer, String]](
Literal.fromObject(scalaMapHasNullKey)), 0)
val serializer4 = scalaMapSerializerFor[java.lang.Integer, String](
Literal.fromObject(scalaMapHasNullKey))

checkExceptionInExpression[RuntimeException](
serializer4, EmptyRow, "Cannot use null as map key!")
Expand Down

0 comments on commit eaef6b3

Please sign in to comment.