From 27217416429c2e9a745ea1e9babdb7e82b350604 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Nov 2015 14:17:43 +0800 Subject: [PATCH 1/4] Add UserDefinedType support to RowEncoder. --- .../main/scala/org/apache/spark/sql/Row.scala | 14 +++- .../sql/catalyst/encoders/RowEncoder.scala | 24 ++++++- .../catalyst/encoders/RowEncoderSuite.scala | 69 ++++++++++++++++++- 3 files changed, 101 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index ed2fdf9f2f7cf..0f0f200122c34 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -152,7 +152,7 @@ trait Row extends Serializable { * BinaryType -> byte array * ArrayType -> scala.collection.Seq (use getList for java.util.List) * MapType -> scala.collection.Map (use getJavaMap for java.util.Map) - * StructType -> org.apache.spark.sql.Row + * StructType -> org.apache.spark.sql.Row (or Product) * }}} */ def apply(i: Int): Any = get(i) @@ -177,7 +177,7 @@ trait Row extends Serializable { * BinaryType -> byte array * ArrayType -> scala.collection.Seq (use getList for java.util.List) * MapType -> scala.collection.Map (use getJavaMap for java.util.Map) - * StructType -> org.apache.spark.sql.Row + * StructType -> org.apache.spark.sql.Row (or Product) * }}} */ def get(i: Int): Any @@ -306,7 +306,15 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getStruct(i: Int): Row = getAs[Row](i) + def getStruct(i: Int): Row = { + // Product and Row both are recoginized as StructType in a Row + val t = get(i) + if (t.isInstanceOf[Product]) { + Row.fromTuple(t.asInstanceOf[Product]) + } else { + t.asInstanceOf[Row] + } + } /** * Returns the value at position i. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index e0be896bb3548..9bb1602494b68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -50,6 +50,14 @@ object RowEncoder { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => inputObject + case udt: UserDefinedType[_] => + val obj = NewInstance( + udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), + Nil, + false, + dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) + Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) + case TimestampType => StaticInvoke( DateTimeUtils, @@ -109,11 +117,16 @@ object RowEncoder { case StructType(fields) => val convertedFields = fields.zipWithIndex.map { case (f, i) => + val method = if (f.dataType.isInstanceOf[StructType]) { + "getStruct" + } else { + "get" + } If( Invoke(inputObject, "isNullAt", BooleanType, Literal(i) :: Nil), Literal.create(null, f.dataType), extractorsFor( - Invoke(inputObject, "get", externalDataTypeFor(f.dataType), Literal(i) :: Nil), + Invoke(inputObject, method, externalDataTypeFor(f.dataType), Literal(i) :: Nil), f.dataType)) } CreateStruct(convertedFields) @@ -137,6 +150,7 @@ object RowEncoder { case _: ArrayType => ObjectType(classOf[scala.collection.Seq[_]]) case _: MapType => ObjectType(classOf[scala.collection.Map[_, _]]) case _: StructType => ObjectType(classOf[Row]) + case udt: UserDefinedType[_] => ObjectType(udt.userClass) } private def constructorFor(schema: StructType): Expression = { @@ -155,6 +169,14 @@ object RowEncoder { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | BinaryType => input + case udt: UserDefinedType[_] => + val obj = NewInstance( + udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), + Nil, + false, + dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) + Invoke(obj, "deserialize", ObjectType(udt.userClass), input :: Nil) + case TimestampType => StaticInvoke( DateTimeUtils, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index e8301e8e06b52..100856bbce182 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -19,12 +19,59 @@ package org.apache.spark.sql.catalyst.encoders import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +@SQLUserDefinedType(udt = classOf[ExamplePointUDT]) +class ExamplePoint(val x: Double, val y: Double) extends Serializable { + override def hashCode = 41 * (41 + x.toInt) + y.toInt + override def equals(that: Any): Boolean = { + if (that.isInstanceOf[ExamplePoint]) { + val e = that.asInstanceOf[ExamplePoint] + (this.x == e.x || (this.x.isNaN && e.x.isNaN) || (this.x.isInfinity && e.x.isInfinity)) && + (this.y == e.y || (this.y.isNaN && e.y.isNaN) || (this.y.isInfinity && e.y.isInfinity)) + } else { + false + } + } +} + +/** + * User-defined type for [[ExamplePoint]]. + */ +class ExamplePointUDT extends UserDefinedType[ExamplePoint] { + + override def sqlType: DataType = ArrayType(DoubleType, false) + + override def pyUDT: String = "pyspark.sql.tests.ExamplePointUDT" + + override def serialize(obj: Any): GenericArrayData = { + obj match { + case p: ExamplePoint => + val output = new Array[Any](2) + output(0) = p.x + output(1) = p.y + new GenericArrayData(output) + } + } + + override def deserialize(datum: Any): ExamplePoint = { + datum match { + case values: ArrayData => + new ExamplePoint(values.getDouble(0), values.getDouble(1)) + } + } + + override def userClass: Class[ExamplePoint] = classOf[ExamplePoint] + + private[spark] override def asNullable: ExamplePointUDT = this +} + class RowEncoderSuite extends SparkFunSuite { private val structOfString = new StructType().add("str", StringType) + private val structOfUDT = new StructType().add("udt", new ExamplePointUDT, false) private val arrayOfString = ArrayType(StringType) private val mapOfString = MapType(StringType, StringType) @@ -41,7 +88,8 @@ class RowEncoderSuite extends SparkFunSuite { .add("string", StringType) .add("binary", BinaryType) .add("date", DateType) - .add("timestamp", TimestampType)) + .add("timestamp", TimestampType) + .add("udt", new ExamplePointUDT, false)) encodeDecodeTest( new StructType() @@ -68,7 +116,24 @@ class RowEncoderSuite extends SparkFunSuite { .add("structOfArray", new StructType().add("array", arrayOfString)) .add("structOfMap", new StructType().add("map", mapOfString)) .add("structOfArrayAndMap", - new StructType().add("array", arrayOfString).add("map", mapOfString))) + new StructType().add("array", arrayOfString).add("map", mapOfString)) + .add("structOfUDT", structOfUDT)) + + test(s"encode/decode: Product") { + val schema = new StructType() + .add("structAsProduct", + new StructType() + .add("int", IntegerType) + .add("string", StringType) + .add("double", DoubleType)) + + val encoder = RowEncoder(schema) + + val input: Row = Row((100, "test", 0.123)) + val row = encoder.toRow(input) + val convertedBack = encoder.fromRow(row) + assert(input.getStruct(0) == convertedBack.getStruct(0)) + } private def encodeDecodeTest(schema: StructType): Unit = { test(s"encode/decode: ${schema.simpleString}") { From bab5c5ba8f388e466b9d56af20b2fb2f8fae43ce Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Nov 2015 15:29:27 +0800 Subject: [PATCH 2/4] Fix scala style. --- .../apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 100856bbce182..af1b8f75c9d75 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.unsafe.types.UTF8String @SQLUserDefinedType(udt = classOf[ExamplePointUDT]) class ExamplePoint(val x: Double, val y: Double) extends Serializable { - override def hashCode = 41 * (41 + x.toInt) + y.toInt + override def hashCode: Int = 41 * (41 + x.toInt) + y.toInt override def equals(that: Any): Boolean = { if (that.isInstanceOf[ExamplePoint]) { val e = that.asInstanceOf[ExamplePoint] From db644fb2be3a3fd32f7f62993575dc6d8cef594e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 14 Nov 2015 22:42:37 +0800 Subject: [PATCH 3/4] Add arrayOfUDT. --- .../sql/catalyst/expressions/objects.scala | 43 +++++++++++++------ .../catalyst/encoders/RowEncoderSuite.scala | 13 ++++++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index 4f58464221b4b..dde58143feb76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -113,7 +113,7 @@ case class Invoke( arguments: Seq[Expression] = Nil) extends Expression { override def nullable: Boolean = true - override def children: Seq[Expression] = targetObject :: Nil + override def children: Seq[Expression] = arguments.+:(targetObject) override def eval(input: InternalRow): Any = throw new UnsupportedOperationException("Only code-generated evaluation is supported.") @@ -343,33 +343,50 @@ case class MapObjects( private lazy val loopAttribute = AttributeReference("loopVar", elementType)() private lazy val completeFunction = function(loopAttribute) + private def itemAccessorMethod(dataType: DataType): String => String = dataType match { + case IntegerType => (i: String) => s".getInt($i)" + case LongType => (i: String) => s".getLong($i)" + case FloatType => (i: String) => s".getFloat($i)" + case DoubleType => (i: String) => s".getDouble($i)" + case ByteType => (i: String) => s".getByte($i)" + case ShortType => (i: String) => s".getShort($i)" + case BooleanType => (i: String) => s".getBoolean($i)" + case StringType => (i: String) => s".getUTF8String($i)" + case s: StructType => (i: String) => s".getStruct($i, ${s.size})" + case a: ArrayType => (i: String) => s".getArray($i)" + case _: MapType => (i: String) => s".getMap($i)" + case udt: UserDefinedType[_] => itemAccessorMethod(udt.sqlType) + } + private lazy val (lengthFunction, itemAccessor, primitiveElement) = inputData.dataType match { case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => (".size()", (i: String) => s".apply($i)", false) case ObjectType(cls) if cls.isArray => (".length", (i: String) => s"[$i]", false) case ArrayType(s: StructType, _) => - (".numElements()", (i: String) => s".getStruct($i, ${s.size})", false) + (".numElements()", itemAccessorMethod(s), false) case ArrayType(a: ArrayType, _) => - (".numElements()", (i: String) => s".getArray($i)", true) + (".numElements()", itemAccessorMethod(a), true) case ArrayType(IntegerType, _) => - (".numElements()", (i: String) => s".getInt($i)", true) + (".numElements()", itemAccessorMethod(IntegerType), true) case ArrayType(LongType, _) => - (".numElements()", (i: String) => s".getLong($i)", true) + (".numElements()", itemAccessorMethod(LongType), true) case ArrayType(FloatType, _) => - (".numElements()", (i: String) => s".getFloat($i)", true) + (".numElements()", itemAccessorMethod(FloatType), true) case ArrayType(DoubleType, _) => - (".numElements()", (i: String) => s".getDouble($i)", true) + (".numElements()", itemAccessorMethod(DoubleType), true) case ArrayType(ByteType, _) => - (".numElements()", (i: String) => s".getByte($i)", true) + (".numElements()", itemAccessorMethod(ByteType), true) case ArrayType(ShortType, _) => - (".numElements()", (i: String) => s".getShort($i)", true) + (".numElements()", itemAccessorMethod(ShortType), true) case ArrayType(BooleanType, _) => - (".numElements()", (i: String) => s".getBoolean($i)", true) + (".numElements()", itemAccessorMethod(BooleanType), true) case ArrayType(StringType, _) => - (".numElements()", (i: String) => s".getUTF8String($i)", false) - case ArrayType(_: MapType, _) => - (".numElements()", (i: String) => s".getMap($i)", false) + (".numElements()", itemAccessorMethod(StringType), false) + case ArrayType(m: MapType, _) => + (".numElements()", itemAccessorMethod(m), false) + case ArrayType(udt: UserDefinedType[_], _) => + (".numElements()", itemAccessorMethod(udt.sqlType), false) } override def nullable: Boolean = true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index af1b8f75c9d75..c868ddec1bab2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -74,6 +74,7 @@ class RowEncoderSuite extends SparkFunSuite { private val structOfUDT = new StructType().add("udt", new ExamplePointUDT, false) private val arrayOfString = ArrayType(StringType) private val mapOfString = MapType(StringType, StringType) + private val arrayOfUDT = ArrayType(new ExamplePointUDT, false) encodeDecodeTest( new StructType() @@ -119,6 +120,18 @@ class RowEncoderSuite extends SparkFunSuite { new StructType().add("array", arrayOfString).add("map", mapOfString)) .add("structOfUDT", structOfUDT)) + test(s"encode/decode: arrayOfUDT") { + val schema = new StructType() + .add("arrayOfUDT", arrayOfUDT) + + val encoder = RowEncoder(schema) + + val input: Row = Row(Seq(new ExamplePoint(0.1, 0.2), new ExamplePoint(0.3, 0.4))) + val row = encoder.toRow(input) + val convertedBack = encoder.fromRow(row) + assert(input.getSeq[ExamplePoint](0) == convertedBack.getSeq[ExamplePoint](0)) + } + test(s"encode/decode: Product") { val schema = new StructType() .add("structAsProduct", From a5fdbcec468926134e32ea7c35d9bfc7a28debfa Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 16 Nov 2015 18:06:16 +0800 Subject: [PATCH 4/4] Merge the branches of ArrayType. --- .../sql/catalyst/expressions/objects.scala | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index dde58143feb76..5cd19de68391c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -363,30 +363,15 @@ case class MapObjects( (".size()", (i: String) => s".apply($i)", false) case ObjectType(cls) if cls.isArray => (".length", (i: String) => s"[$i]", false) - case ArrayType(s: StructType, _) => - (".numElements()", itemAccessorMethod(s), false) - case ArrayType(a: ArrayType, _) => - (".numElements()", itemAccessorMethod(a), true) - case ArrayType(IntegerType, _) => - (".numElements()", itemAccessorMethod(IntegerType), true) - case ArrayType(LongType, _) => - (".numElements()", itemAccessorMethod(LongType), true) - case ArrayType(FloatType, _) => - (".numElements()", itemAccessorMethod(FloatType), true) - case ArrayType(DoubleType, _) => - (".numElements()", itemAccessorMethod(DoubleType), true) - case ArrayType(ByteType, _) => - (".numElements()", itemAccessorMethod(ByteType), true) - case ArrayType(ShortType, _) => - (".numElements()", itemAccessorMethod(ShortType), true) - case ArrayType(BooleanType, _) => - (".numElements()", itemAccessorMethod(BooleanType), true) - case ArrayType(StringType, _) => - (".numElements()", itemAccessorMethod(StringType), false) - case ArrayType(m: MapType, _) => - (".numElements()", itemAccessorMethod(m), false) - case ArrayType(udt: UserDefinedType[_], _) => - (".numElements()", itemAccessorMethod(udt.sqlType), false) + case ArrayType(t, _) => + val (sqlType, primitiveElement) = t match { + case m: MapType => (m, false) + case s: StructType => (s, false) + case s: StringType => (s, false) + case udt: UserDefinedType[_] => (udt.sqlType, false) + case o => (o, true) + } + (".numElements()", itemAccessorMethod(sqlType), primitiveElement) } override def nullable: Boolean = true