Skip to content

Commit

Permalink
Support DecimalType.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 14, 2015
1 parent 2379eeb commit c9373c8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import java.io._
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteBuffer
import java.sql.Timestamp

Expand Down Expand Up @@ -143,7 +144,6 @@ private[sql] object SparkSqlSerializer2 {
case array: ArrayType => return false
case map: MapType => return false
case struct: StructType => return false
case decimal: DecimalType => return false
case _ =>
}
i += 1
Expand Down Expand Up @@ -223,6 +223,21 @@ private[sql] object SparkSqlSerializer2 {
out.writeDouble(row.getDouble(i))
}

case decimal: DecimalType =>
if (row.isNullAt(i)) {
out.writeByte(NULL)
} else {
out.writeByte(NOT_NULL)
val value = row.apply(i).asInstanceOf[Decimal]
val javaBigDecimal = value.toJavaBigDecimal
// First, write out the unscaled value.
val bytes: Array[Byte] = javaBigDecimal.unscaledValue().toByteArray
out.writeInt(bytes.length)
out.write(bytes)
// Then, write out the scale.
out.writeInt(javaBigDecimal.scale())
}

case DateType =>
if (row.isNullAt(i)) {
out.writeByte(NULL)
Expand Down Expand Up @@ -334,6 +349,21 @@ private[sql] object SparkSqlSerializer2 {
mutableRow.setDouble(i, in.readDouble())
}

case decimal: DecimalType =>
if (in.readByte() == NULL) {
mutableRow.setNullAt(i)
} else {
// First, read in the unscaled value.
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
val unscaledVal = new BigInteger(bytes)
// Then, read the scale.
val scale = in.readInt()
// Finally, create the Decimal object and set it in the row.
mutableRow.update(i, Decimal(new BigDecimal(unscaledVal, scale)))
}

case DateType =>
if (in.readByte() == NULL) {
mutableRow.setNullAt(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,9 @@ class SparkSqlSerializer2DataTypeSuite extends FunSuite {
checkSupported(TimestampType, isSupported = true)
checkSupported(StringType, isSupported = true)
checkSupported(BinaryType, isSupported = true)
checkSupported(DecimalType(10, 5), isSupported = true)
checkSupported(DecimalType.Unlimited, isSupported = true)

// Because at the runtime we accepts three kinds of Decimals
// (Java BigDecimal, Scala BigDecimal, and Spark SQL's Decimal), we do support DecimalType
// right now. We will support it once we fixed the internal type.
checkSupported(DecimalType(10, 5), isSupported = false)
checkSupported(DecimalType.Unlimited, isSupported = false)
// For now, ArrayType, MapType, and StructType are not supported.
checkSupported(ArrayType(DoubleType, true), isSupported = false)
checkSupported(ArrayType(StringType, false), isSupported = false)
Expand All @@ -84,7 +81,8 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
val supportedTypes =
Seq(StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DateType, TimestampType)
FloatType, DoubleType, DecimalType.Unlimited, DecimalType(6, 5),
DateType, TimestampType)

val fields = supportedTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, true)
Expand All @@ -103,9 +101,11 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
i.toByte,
i.toShort,
i,
i.toLong,
Long.MaxValue - i.toLong,
(i + 0.25).toFloat,
(i + 0.75),
BigDecimal(Long.MaxValue.toString + ".12345"),
new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"),
new Date(i),
new Timestamp(i))
}
Expand Down Expand Up @@ -159,7 +159,7 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
checkSerializer(df.queryExecution.executedPlan, serializerClass)
checkAnswer(
df,
Row(1000, 1000, 0, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000))
Row(1000, 1000, 0, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000))
}
}

Expand Down

0 comments on commit c9373c8

Please sign in to comment.