Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric Type and Spark DecimalType #36499

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ object DecimalType extends AbstractDataType {

val MAX_PRECISION = 38
val MAX_SCALE = 38
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18)
val DEFAULT_SCALE = 18
val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, DEFAULT_SCALE)
val USER_DEFAULT: DecimalType = DecimalType(10, 0)
val MINIMUM_ADJUSTED_SCALE = 6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc}
Expand Down Expand Up @@ -96,4 +97,31 @@ private case object TeradataDialect extends JdbcDialect {
override def getLimitClause(limit: Integer): String = {
""
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
case Types.NUMERIC =>
if (md == null) {
Some(DecimalType.SYSTEM_DEFAULT)
} else {
val scale = md.build().getLong("scale")
// In Teradata, define Number without parameter means precision and scale is flexible.
// However, in this case, the scale returned from JDBC is 0, which will lead to
// fractional part loss. And the precision returned from JDBC is 40, which conflicts to
// DecimalType.MAX_PRECISION.
// Handle this special case by adding explicit conversion to system default decimal type.
if (size == 40) {
if (scale == 0) Some(DecimalType.SYSTEM_DEFAULT)
// In Teradata, Number(*, scale) is valid but in this case, the precision
// returned from JDBC is also 40, which conflicts to DecimalType.MAX_PRECISION.
else Some(DecimalType(DecimalType.MAX_PRECISION, scale.toInt))
} else {
// Normal case, Number(precision, scale) is explicitly set in Teradata
Some(DecimalType(size, scale.toInt))
}
}
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,32 @@ class JDBCSuite extends QueryTest
map(_.databaseTypeDefinition).get == "CHAR(1)")
}

test("Checking metrics correctness with JDBC") {
test("SPARK-38846: TeradataDialect catalyst type mapping") {
val teradataDialect = JdbcDialects.get("jdbc:teradata")
val metadata = new MetadataBuilder().putString("name", "test_column").putLong("scale", 0)
// When Number(*)/Number is specified, default DecimalType should be returned
val flexiblePrecision = 40
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType.SYSTEM_DEFAULT))
val specifiedScale = 10
val specifiedPrecision = 10
metadata.putLong("scale", specifiedScale)
// Both precision and scale is set explicitly
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, specifiedScale)))
// When precision is not specified, MAX_PRECISION should be used
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
flexiblePrecision, metadata) == Some(DecimalType(DecimalType.MAX_PRECISION, specifiedScale)))
// When precision and scale is set explicitly and scale is 0
metadata.putLong("scale", 0)
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, 0)))
// When MetadataBuilder is null, default DecimalType should be returned
assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER",
specifiedPrecision, null) == Some(DecimalType.SYSTEM_DEFAULT))
}

test("Checking metrics correctness with JDBC") {
val foobarCnt = spark.table("foobar").count()
val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF())
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
Expand Down