From eaef6b374f86835bb08b9abf6d09d28aec1da9a8 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 23 Apr 2018 14:29:25 +0900 Subject: [PATCH] Fix --- .../sql/catalyst/JavaTypeInference.scala | 4 +- .../expressions/ObjectExpressionsSuite.scala | 81 +++++++++++++++---- 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index f7649d85b7734..3ecc137c8cd7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -357,9 +357,7 @@ object JavaTypeInference { } } - private[catalyst] def serializerFor( - inputObject: Expression, - typeToken: TypeToken[_]): Expression = { + private def serializerFor(inputObject: Expression, typeToken: TypeToken[_]): Expression = { def toCatalystArray(input: Expression, elementType: TypeToken[_]): Expression = { val (dataType, nullable) = inferDataType(elementType) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index d1c99f5333256..730b36c32333c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -21,10 +21,9 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag import scala.util.Random -import com.google.common.reflect.TypeToken - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.sql.{RandomDataGenerator, Row} @@ -475,8 +474,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkObjectExprEvaluation(deserializer, expected = data) } -<<<<<<< f3c35bf1540eeb90bd0e75aa34139333ede280c4 -<<<<<<< c48085aa91c60615a4de3b391f019f46f3fcdbe3 test("SPARK-23595 ValidateExternalType should support interpreted execution") { val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true) Seq( @@ -506,10 +503,62 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "java.lang.Integer is not a valid external type for schema of double") } - private def javaSerializerFor(beanClass: Class[_])(inputObject: Expression): CreateNamedStruct = { - JavaTypeInference.serializerFor(inputObject, TypeToken.of(beanClass)) match { - case e => CreateNamedStruct(Literal("value") :: e :: Nil) + private def javaMapSerializerFor( + keyClazz: Class[_], + valueClazz: Class[_])(inputObject: Expression): Expression = { + + def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match { + case c if c == classOf[java.lang.Integer] => + Invoke(inputObject, "intValue", IntegerType) + case c if c == classOf[java.lang.String] => + StaticInvoke( + classOf[UTF8String], + StringType, + "fromString", + inputObject :: Nil, + returnNullable = false) + } + + ExternalMapToCatalyst( + inputObject, + ObjectType(keyClazz), + kvSerializerFor(_, keyClazz), + keyNullable = true, + ObjectType(valueClazz), + kvSerializerFor(_, valueClazz), + valueNullable = true + ) + } + + private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = { + import org.apache.spark.sql.catalyst.ScalaReflection._ + + val curId = new java.util.concurrent.atomic.AtomicInteger() + + def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression = + localTypeOf[V].dealias match { + case t if t <:< localTypeOf[java.lang.Integer] => + Invoke(inputObject, "intValue", IntegerType) + case t if t <:< localTypeOf[String] => + StaticInvoke( + classOf[UTF8String], + StringType, + "fromString", + inputObject :: Nil, + returnNullable = false) + case _ => + inputObject } + + ExternalMapToCatalyst( + inputObject, + dataTypeFor[T], + kvSerializerFor[T], + keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive, + dataTypeFor[U], + kvSerializerFor[U], + valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive + ) } test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") { @@ -526,14 +575,12 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val expected = CatalystTypeConverters.convertToCatalyst(scalaMap) // Java Map - val serializer1 = GetStructField( - javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMap)), 0) + val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])( + Literal.fromObject(javaMap)) checkEvaluation(serializer1, expected) // Scala Map - val serializer2 = GetStructField( - ScalaReflection.serializerFor[scala.collection.Map[Int, String]]( - Literal.fromObject(scalaMap)), 0) + val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap)) checkEvaluation(serializer2, expected) // NULL key test @@ -547,15 +594,15 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } // Java Map - val serializer3 = GetStructField( - javaSerializerFor(javaMap.getClass)(Literal.fromObject(javaMapHasNullKey)), 0) + val serializer3 = + javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])( + Literal.fromObject(javaMapHasNullKey)) checkExceptionInExpression[RuntimeException]( serializer3, EmptyRow, "Cannot use null as map key!") // Scala Map - val serializer4 = GetStructField( - ScalaReflection.serializerFor[scala.collection.Map[java.lang.Integer, String]]( - Literal.fromObject(scalaMapHasNullKey)), 0) + val serializer4 = scalaMapSerializerFor[java.lang.Integer, String]( + Literal.fromObject(scalaMapHasNullKey)) checkExceptionInExpression[RuntimeException]( serializer4, EmptyRow, "Cannot use null as map key!")