Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric Type and Spark DecimalType #36499

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.3 to 3.4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this note related tot his change? the second one is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for point it out!, just removed the first bullet which has been merged by mistake when resolving the doc conflicts.

- Since Spark 3.4, Spark disables `hive.stats.autogather` by default, which means Hive tables won't automatically update statistics that can be consumed by Hive (not Spark). To restore the behavior before Spark 3.4, you can set `spark.hadoop.hive.stats.autogather` to `true`.
- Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.

## Upgrading from Spark SQL 3.2 to 3.3

- Since Spark 3.3, the `histogram_numeric` function in Spark SQL returns an output type of an array of structs (x, y), where the type of the 'x' field in the return value is propagated from the input values consumed in the aggregate function. In Spark 3.2 or earlier, 'x' always had double type. Optionally, use the configuration `spark.sql.legacy.histogramNumericPropagateInputType` since Spark 3.3 to revert back to the previous behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ object DecimalType extends AbstractDataType {

val MAX_PRECISION = 38
val MAX_SCALE = 38
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)
val DEFAULT_SCALE = 18
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, DEFAULT_SCALE)
val USER_DEFAULT: DecimalType = DecimalType(10, 0)
val MINIMUM_ADJUSTED_SCALE = 6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
Expand Down Expand Up @@ -96,4 +97,31 @@ private case object TeradataDialect extends JdbcDialect {
override def getLimitClause(limit: Integer): String = {
""
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
case Types.NUMERIC =>
if (md == null) {
Some(DecimalType.SYSTEM_DEFAULT)
} else {
val scale = md.build().getLong("scale")
// In Teradata, define Number without parameter means precision and scale is flexible.
// However, in this case, the scale returned from JDBC is 0, which will lead to
// fractional part loss. And the precision returned from JDBC is 40, which conflicts to
// DecimalType.MAX_PRECISION.
// Handle this special case by adding explicit conversion to system default decimal type.
if (size == 40) {
if (scale == 0) Some(DecimalType.SYSTEM_DEFAULT)
// In Teradata, Number(*, scale) is valid but in this case, the precision
// returned from JDBC is also 40, which conflicts to DecimalType.MAX_PRECISION.
else Some(DecimalType(DecimalType.MAX_PRECISION, scale.toInt))
} else {
// Normal case, Number(precision, scale) is explicitly set in Teradata
Some(DecimalType(size, scale.toInt))
}
}
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,32 @@ class JDBCSuite extends QueryTest
map(_.databaseTypeDefinition).get == "CHAR(1)")
}

test("Checking metrics correctness with JDBC") {
test("SPARK-38846: TeradataDialect catalyst type mapping") {
val teradataDialect = JdbcDialects.get("jdbc:teradata")
val metadata = new MetadataBuilder().putString("name", "test_column").putLong("scale", 0)
// When Number(*)/Number is specified, default DecimalType should be returned
val flexiblePrecision = 40
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType.SYSTEM_DEFAULT))
val specifiedScale = 10
val specifiedPrecision = 10
metadata.putLong("scale", specifiedScale)
// Both precision and scale is set explicitly
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, specifiedScale)))
// When precision is not specified, MAX_PRECISION should be used
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType(DecimalType.MAX_PRECISION, specifiedScale)))
// When precision and scale is set explicitly and scale is 0
metadata.putLong("scale", 0)
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, 0)))
// When MetadataBuilder is null, default DecimalType should be returned
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, null) == Some(DecimalType.SYSTEM_DEFAULT))
}

test("Checking metrics correctness with JDBC") {
val foobarCnt = spark.table("foobar").count()
val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
Expand Down