From 5f9da94fc89aa268781674909d5c3d049b6859af Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 1 May 2015 16:51:40 +0800 Subject: [PATCH 1/2] Set up Decimal's precision and scale according to table schema instead of returned BigDecimal. --- .../main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 2f6ba48dbc3d9..11dd17ae8c557 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -294,7 +294,7 @@ private[sql] class JDBCRDD( abstract class JDBCConversion case object BooleanConversion extends JDBCConversion case object DateConversion extends JDBCConversion - case object DecimalConversion extends JDBCConversion + case class DecimalConversion(precisionInfo: Option[(Int, Int)]) extends JDBCConversion case object DoubleConversion extends JDBCConversion case object FloatConversion extends JDBCConversion case object IntegerConversion extends JDBCConversion @@ -311,8 +311,8 @@ private[sql] class JDBCRDD( schema.fields.map(sf => sf.dataType match { case BooleanType => BooleanConversion case DateType => DateConversion - case DecimalType.Unlimited => DecimalConversion - case DecimalType.Fixed(d) => DecimalConversion + case DecimalType.Unlimited => DecimalConversion(None) + case DecimalType.Fixed(d) => DecimalConversion(Some(d)) case DoubleType => DoubleConversion case FloatType => FloatConversion case IntegerType => IntegerConversion @@ -363,7 +363,9 @@ private[sql] class JDBCRDD( case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos)) case DateConversion => mutableRow.update(i, DateUtils.fromJavaDate(rs.getDate(pos))) - case DecimalConversion => mutableRow.update(i, rs.getBigDecimal(pos)) + case DecimalConversion(d) if d != None => + mutableRow.update(i, Decimal(rs.getBigDecimal(pos), d.map(_._1).get, d.map(_._2).get)) + case DecimalConversion(n) => mutableRow.update(i, Decimal(rs.getBigDecimal(pos))) case DoubleConversion => mutableRow.setDouble(i, rs.getDouble(pos)) case FloatConversion => mutableRow.setFloat(i, rs.getFloat(pos)) case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos)) From 928f864c494265f1c0bcbc501c87738cb2c421ff Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 2 May 2015 15:37:51 +0800 Subject: [PATCH 2/2] Add comments. --- .../scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 11dd17ae8c557..661549a599203 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -363,9 +363,17 @@ private[sql] class JDBCRDD( case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos)) case DateConversion => mutableRow.update(i, DateUtils.fromJavaDate(rs.getDate(pos))) - case DecimalConversion(d) if d != None => - mutableRow.update(i, Decimal(rs.getBigDecimal(pos), d.map(_._1).get, d.map(_._2).get)) - case DecimalConversion(n) => mutableRow.update(i, Decimal(rs.getBigDecimal(pos))) + // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal + // object returned by ResultSet.getBigDecimal is not correctly matched to the table + // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale. + // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through + // a BigDecimal object with scale as 0. But the dataframe schema has correct type as + // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then + // retrieve it, you will get wrong result 199.99. + // So it is needed to set precision and scale for Decimal based on JDBC metadata. + case DecimalConversion(Some((p, s))) => + mutableRow.update(i, Decimal(rs.getBigDecimal(pos), p, s)) + case DecimalConversion(None) => mutableRow.update(i, Decimal(rs.getBigDecimal(pos))) case DoubleConversion => mutableRow.setDouble(i, rs.getDouble(pos)) case FloatConversion => mutableRow.setFloat(i, rs.getFloat(pos)) case IntegerConversion => mutableRow.setInt(i, rs.getInt(pos))