From 436bb4b80f264fc010a21751975a13031ee10487 Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Fri, 1 Aug 2014 22:19:57 +0800 Subject: [PATCH 1/7] initial commit --- .../spark/sql/catalyst/types/dataTypes.scala | 15 +++++++++++++++ .../apache/spark/sql/parquet/ParquetTypes.scala | 2 ++ 2 files changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 70c6d06cf2534..320a004c853b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -44,11 +44,17 @@ object DataType extends RegexParsers { "DecimalType" ^^^ DecimalType | "TimestampType" ^^^ TimestampType + protected lazy val arrayType: Parser[DataType] = "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ { case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull) } + protected lazy val fixedLenBinaryType: Parser[DataType] = + "FixedLenBinaryType" ~> "(" ~> intVal <~ ")" ^^ { + case t => FixedLenBinaryType(t) + } + protected lazy val mapType: Parser[DataType] = "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ { case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull) @@ -60,6 +66,11 @@ object DataType extends RegexParsers { StructField(name, tpe, nullable = nullable) } + protected lazy val intVal: Parser[Integer] = + "[0-9]+" ^^ { + case t => t.toInt + } + protected lazy val boolVal: Parser[Boolean] = "true" ^^^ true | "false" ^^^ false @@ -151,6 +162,10 @@ case object BinaryType extends DataType with PrimitiveType { def simpleString: String = "binary" } +case class FixedLenBinaryType( length:Int ) extends DataType with PrimitiveType { + type JvmType = Array[Byte] +} + case object BooleanType extends NativeType with PrimitiveType { private[sql] type JvmType = Boolean @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 1a52377651737..7370cd800dd0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -59,6 +59,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetPrimitiveTypeName.INT96 => // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? sys.error("Potential loss of precision: cannot convert INT96") + case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => FixedLenBinaryType(parquetType.getTypeLength) case _ => sys.error( s"Unsupported parquet datatype $parquetType") } @@ -195,6 +196,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ShortType => Some(ParquetPrimitiveTypeName.INT32, None) case ByteType => Some(ParquetPrimitiveTypeName.INT32, None) case LongType => Some(ParquetPrimitiveTypeName.INT64, None) + case FixedLenBinaryType(_) => Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, None) case _ => None } From d15d967d064c6189e7decd23c1145b7a3156bd2d Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Fri, 1 Aug 2014 22:44:15 +0800 Subject: [PATCH 2/7] properly handle convert --- .../org/apache/spark/sql/catalyst/types/dataTypes.scala | 5 +++-- .../scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 320a004c853b1..5016f741572d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -42,7 +42,8 @@ object DataType extends RegexParsers { "BinaryType" ^^^ BinaryType | "BooleanType" ^^^ BooleanType | "DecimalType" ^^^ DecimalType | - "TimestampType" ^^^ TimestampType + "TimestampType" ^^^ TimestampType | + fixedLenBinaryType protected lazy val arrayType: Parser[DataType] = @@ -67,7 +68,7 @@ object DataType extends RegexParsers { } protected lazy val intVal: Parser[Integer] = - "[0-9]+" ^^ { + "[0-9]+".r ^^ { case t => t.toInt } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 7370cd800dd0e..729a1c192f9d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -248,9 +248,13 @@ private[parquet] object ParquetTypesConverter extends Logging { if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED } val primitiveType = fromPrimitiveDataType(ctype) + val typeLength = ctype match { + case FixedLenBinaryType(_) => ctype.asInstanceOf[FixedLenBinaryType].length + case _ => 0 + } primitiveType.map { case (primitiveType, originalType) => - new ParquetPrimitiveType(repetition, primitiveType, name, originalType.orNull) + new ParquetPrimitiveType(repetition, primitiveType, typeLength, name, originalType.orNull) }.getOrElse { ctype match { case ArrayType(elementType, false) => { From f319d2f04793b583ae947c26d6e3b542a2c8c90c Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Fri, 1 Aug 2014 23:15:31 +0800 Subject: [PATCH 3/7] fixed length array cast to string --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 0ad2b30cf9c1f..269720c71e4a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -42,6 +42,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { // UDFToString private[this] def castToString: Any => Any = child.dataType match { case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8")) + case FixedLenBinaryType(_) => buildCast[Array[Byte]](_, new String(_, "UTF-8")) case TimestampType => buildCast[Timestamp](_, timestampToString) case _ => buildCast[Any](_, _.toString) } @@ -51,6 +52,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case StringType => buildCast[String](_, _.getBytes("UTF-8")) } + // FixedLenBinaryConverter + private[this] def castToFixedLenBinary(x:Int): Any => Any = child.dataType match { + case StringType => buildCast[String](_, _.getBytes("UTF-8")) + } + // UDFToBoolean private[this] def castToBoolean: Any => Any = child.dataType match { case StringType => @@ -256,6 +262,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case FloatType => castToFloat case LongType => castToLong case DoubleType => castToDouble + case FixedLenBinaryType(_) => castToFixedLenBinary(dataType.asInstanceOf[FixedLenBinaryType].length) } override def eval(input: Row): Any = { From de9c43f036e7c661a558c2f7949a265fd539ecb6 Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Sat, 2 Aug 2014 15:16:37 +0800 Subject: [PATCH 4/7] scala style --- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 3 ++- .../main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 269720c71e4a2..21423c421d87b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -262,7 +262,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case FloatType => castToFloat case LongType => castToLong case DoubleType => castToDouble - case FixedLenBinaryType(_) => castToFixedLenBinary(dataType.asInstanceOf[FixedLenBinaryType].length) + case FixedLenBinaryType(_) => + castToFixedLenBinary(dataType.asInstanceOf[FixedLenBinaryType].length) } override def eval(input: Row): Any = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 729a1c192f9d3..e7c927cc40cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -59,7 +59,8 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetPrimitiveTypeName.INT96 => // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? sys.error("Potential loss of precision: cannot convert INT96") - case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => FixedLenBinaryType(parquetType.getTypeLength) + case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => + FixedLenBinaryType(parquetType.getTypeLength) case _ => sys.error( s"Unsupported parquet datatype $parquetType") } From c4bb207776061ae79e8ba5e750f5830ae8d7c175 Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Sat, 2 Aug 2014 15:43:21 +0800 Subject: [PATCH 5/7] cast between binary and fixedlenbinary --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 21423c421d87b..295f1e51897cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -50,11 +50,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { // BinaryConverter private[this] def castToBinary: Any => Any = child.dataType match { case StringType => buildCast[String](_, _.getBytes("UTF-8")) + case FixedLenBinaryType(_) => buildCast[Array[Byte]](_, a => a) } // FixedLenBinaryConverter - private[this] def castToFixedLenBinary(x:Int): Any => Any = child.dataType match { - case StringType => buildCast[String](_, _.getBytes("UTF-8")) + private[this] def castToFixedLenBinary(length:Int): Any => Any = child.dataType match { + case StringType => buildCast[String](_, _.getBytes("UTF-8").slice(0 ,length)) + case BinaryType => buildCast[Array[Byte]](_, _.slice(0, length)) } // UDFToBoolean From 7dc32ba0028744daeea06f27c3b96337a3c62295 Mon Sep 17 00:00:00 2001 From: ChiaYung Su Date: Sat, 2 Aug 2014 16:12:50 +0800 Subject: [PATCH 6/7] change type name --- .../spark/sql/catalyst/expressions/Cast.scala | 8 ++++---- .../apache/spark/sql/catalyst/types/dataTypes.scala | 13 ++++++------- .../org/apache/spark/sql/parquet/ParquetTypes.scala | 6 +++--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 295f1e51897cc..6a89ee30d1c82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -42,7 +42,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { // UDFToString private[this] def castToString: Any => Any = child.dataType match { case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8")) - case FixedLenBinaryType(_) => buildCast[Array[Byte]](_, new String(_, "UTF-8")) + case FixedLenByteArrayType(_) => buildCast[Array[Byte]](_, new String(_, "UTF-8")) case TimestampType => buildCast[Timestamp](_, timestampToString) case _ => buildCast[Any](_, _.toString) } @@ -50,7 +50,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { // BinaryConverter private[this] def castToBinary: Any => Any = child.dataType match { case StringType => buildCast[String](_, _.getBytes("UTF-8")) - case FixedLenBinaryType(_) => buildCast[Array[Byte]](_, a => a) + case FixedLenByteArrayType(_) => buildCast[Array[Byte]](_, a => a) } // FixedLenBinaryConverter @@ -264,8 +264,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { case FloatType => castToFloat case LongType => castToLong case DoubleType => castToDouble - case FixedLenBinaryType(_) => - castToFixedLenBinary(dataType.asInstanceOf[FixedLenBinaryType].length) + case FixedLenByteArrayType(_) => + castToFixedLenBinary(dataType.asInstanceOf[FixedLenByteArrayType].length) } override def eval(input: Row): Any = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 5016f741572d7..371069d43829e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -43,19 +43,18 @@ object DataType extends RegexParsers { "BooleanType" ^^^ BooleanType | "DecimalType" ^^^ DecimalType | "TimestampType" ^^^ TimestampType | - fixedLenBinaryType + fixedLenByteArrayType + protected lazy val fixedLenByteArrayType: Parser[DataType] = + "FixedLenByteArrayType" ~> "(" ~> intVal <~ ")" ^^ { + case t => FixedLenByteArrayType(t) + } protected lazy val arrayType: Parser[DataType] = "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ { case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull) } - protected lazy val fixedLenBinaryType: Parser[DataType] = - "FixedLenBinaryType" ~> "(" ~> intVal <~ ")" ^^ { - case t => FixedLenBinaryType(t) - } - protected lazy val mapType: Parser[DataType] = "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ { case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull) @@ -163,7 +162,7 @@ case object BinaryType extends DataType with PrimitiveType { def simpleString: String = "binary" } -case class FixedLenBinaryType( length:Int ) extends DataType with PrimitiveType { +case class FixedLenByteArrayType( length:Int ) extends DataType with PrimitiveType { type JvmType = Array[Byte] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index e7c927cc40cea..aebc0b145cb1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -60,7 +60,7 @@ private[parquet] object ParquetTypesConverter extends Logging { // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? sys.error("Potential loss of precision: cannot convert INT96") case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => - FixedLenBinaryType(parquetType.getTypeLength) + FixedLenByteArrayType(parquetType.getTypeLength) case _ => sys.error( s"Unsupported parquet datatype $parquetType") } @@ -197,7 +197,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ShortType => Some(ParquetPrimitiveTypeName.INT32, None) case ByteType => Some(ParquetPrimitiveTypeName.INT32, None) case LongType => Some(ParquetPrimitiveTypeName.INT64, None) - case FixedLenBinaryType(_) => Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, None) + case FixedLenByteArrayType(_) => Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, None) case _ => None } @@ -250,7 +250,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } val primitiveType = fromPrimitiveDataType(ctype) val typeLength = ctype match { - case FixedLenBinaryType(_) => ctype.asInstanceOf[FixedLenBinaryType].length + case FixedLenByteArrayType(_) => ctype.asInstanceOf[FixedLenByteArrayType].length case _ => 0 } primitiveType.map { From f66e658729b3ca84cd3fe62daf7e503a4f40ec7b Mon Sep 17 00:00:00 2001 From: Chia-Yung Su Date: Wed, 6 Aug 2014 00:16:57 +0800 Subject: [PATCH 7/7] compatiable with latest version --- .../scala/org/apache/spark/sql/catalyst/types/dataTypes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 371069d43829e..f957e2e90a7f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -164,6 +164,7 @@ case object BinaryType extends DataType with PrimitiveType { case class FixedLenByteArrayType( length:Int ) extends DataType with PrimitiveType { type JvmType = Array[Byte] + def simpleString: String = "fixed_len_byte_array(%d)".format(length) } case object BooleanType extends NativeType with PrimitiveType {