Skip to content

Commit

Permalink
[SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric…
Browse files Browse the repository at this point in the history
… Type and Spark DecimalType

### What changes were proposed in this pull request?
 - Implemented getCatalystType method in TeradataDialect
 - Handle Types.NUMERIC explicitly

### Why are the changes needed?
Load table from Teradata, if the type of column in Teradata is `Number`, it will be converted to `DecimalType(38,0)` which will lose the fractional part of original data.

### Does this PR introduce _any_ user-facing change?
Yes, it will convert Number type to DecimalType(38,18) if the scale is 0, so that keep the fractional part in some way.

### How was this patch tested?
UT is added to JDBCSuite.scala.

Closes #36499 from Eugene-Mark/teradata-loading.

Lead-authored-by: Eugene-Mark <eugene.ma.twenty@gmail.com>
Co-authored-by: Eugene <eugene.ma@intel.com>
Co-authored-by: Eugene <eugene.ma.twenty@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
2 people authored and srowen committed Jun 20, 2022
1 parent e500121 commit e31d072
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.3 to 3.4

- Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.

## Upgrading from Spark SQL 3.2 to 3.3

- Since Spark 3.3, the `histogram_numeric` function in Spark SQL returns an output type of an array of structs (x, y), where the type of the 'x' field in the return value is propagated from the input values consumed in the aggregate function. In Spark 3.2 or earlier, 'x' always had double type. Optionally, use the configuration `spark.sql.legacy.histogramNumericPropagateInputType` since Spark 3.3 to revert back to the previous behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ object DecimalType extends AbstractDataType {

val MAX_PRECISION = 38
val MAX_SCALE = 38
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)
val DEFAULT_SCALE = 18
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, DEFAULT_SCALE)
val USER_DEFAULT: DecimalType = DecimalType(10, 0)
val MINIMUM_ADJUSTED_SCALE = 6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
Expand Down Expand Up @@ -96,4 +97,31 @@ private case object TeradataDialect extends JdbcDialect {
override def getLimitClause(limit: Integer): String = {
""
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
case Types.NUMERIC =>
if (md == null) {
Some(DecimalType.SYSTEM_DEFAULT)
} else {
val scale = md.build().getLong("scale")
// In Teradata, define Number without parameter means precision and scale is flexible.
// However, in this case, the scale returned from JDBC is 0, which will lead to
// fractional part loss. And the precision returned from JDBC is 40, which conflicts to
// DecimalType.MAX_PRECISION.
// Handle this special case by adding explicit conversion to system default decimal type.
if (size == 40) {
if (scale == 0) Some(DecimalType.SYSTEM_DEFAULT)
// In Teradata, Number(*, scale) is valid but in this case, the precision
// returned from JDBC is also 40, which conflicts to DecimalType.MAX_PRECISION.
else Some(DecimalType(DecimalType.MAX_PRECISION, scale.toInt))
} else {
// Normal case, Number(precision, scale) is explicitly set in Teradata
Some(DecimalType(size, scale.toInt))
}
}
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,32 @@ class JDBCSuite extends QueryTest
map(_.databaseTypeDefinition).get == "CHAR(1)")
}

test("Checking metrics correctness with JDBC") {
test("SPARK-38846: TeradataDialect catalyst type mapping") {
val teradataDialect = JdbcDialects.get("jdbc:teradata")
val metadata = new MetadataBuilder().putString("name", "test_column").putLong("scale", 0)
// When Number(*)/Number is specified, default DecimalType should be returned
val flexiblePrecision = 40
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType.SYSTEM_DEFAULT))
val specifiedScale = 10
val specifiedPrecision = 10
metadata.putLong("scale", specifiedScale)
// Both precision and scale is set explicitly
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, specifiedScale)))
// When precision is not specified, MAX_PRECISION should be used
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType(DecimalType.MAX_PRECISION, specifiedScale)))
// When precision and scale is set explicitly and scale is 0
metadata.putLong("scale", 0)
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, 0)))
// When MetadataBuilder is null, default DecimalType should be returned
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, null) == Some(DecimalType.SYSTEM_DEFAULT))
}

test("Checking metrics correctness with JDBC") {
val foobarCnt = spark.table("foobar").count()
val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
Expand Down

0 comments on commit e31d072

Please sign in to comment.