Skip to content

Commit dcc71b3

Browse files
Nick-0723ulysses-you
authored andcommitted
[KYUUBI #2207]Support newly added spark data types: TimestampNTZType
### _Why are the changes needed?_ support newly added data types: TimestampNTZType since Spark3.3.0 ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2224 from Nick-0723/TimestampNTZType. Closes #2207 56dfb2d [Nick Song] replace to getSimpleName 10b12f3 [Nick Song] fix 90218ca [Nick Song] fix 348d7d0 [Nick Song] fix 54ef78d [Nick Song] support DayTimeIntervalType Authored-by: Nick Song <chun2184@163.com> Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent e16c728 commit dcc71b3

File tree

4 files changed

+40
-2
lines changed

4 files changed

+40
-2
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hive.service.rpc.thrift._
2929
import org.apache.spark.sql.Row
3030
import org.apache.spark.sql.types._
3131

32+
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
3233
import org.apache.kyuubi.util.RowSetUtils._
3334

3435
object RowSet {
@@ -241,6 +242,9 @@ object RowSet {
241242
case (t: Timestamp, TimestampType) =>
242243
formatTimestamp(t)
243244

245+
case (t: LocalDateTime, ntz) if ntz.getClass.getName.equals(TIMESTAMP_NTZ) =>
246+
formatLocalDateTime(t)
247+
244248
case (i: Instant, TimestampType) =>
245249
formatInstant(i, Option(timeZone))
246250

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ import org.apache.spark.sql.types._
2626

2727
object SchemaHelper {
2828

29+
/**
30+
* Spark 3.3.0 DataType TimestampNTZType's class name.
31+
*/
32+
final val TIMESTAMP_NTZ = "TimestampNTZType$"
33+
2934
def toTTypeId(typ: DataType): TTypeId = typ match {
3035
case NullType => TTypeId.NULL_TYPE
3136
case BooleanType => TTypeId.BOOLEAN_TYPE
@@ -39,6 +44,7 @@ object SchemaHelper {
3944
case _: DecimalType => TTypeId.DECIMAL_TYPE
4045
case DateType => TTypeId.DATE_TYPE
4146
case TimestampType => TTypeId.TIMESTAMP_TYPE
47+
case ntz if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => TTypeId.TIMESTAMP_TYPE
4248
case BinaryType => TTypeId.BINARY_TYPE
4349
case CalendarIntervalType => TTypeId.STRING_TYPE
4450
case dt if dt.getClass.getSimpleName.equals("DayTimeIntervalType") =>
@@ -104,6 +110,7 @@ object SchemaHelper {
104110
case _: DecimalType => java.sql.Types.DECIMAL
105111
case DateType => java.sql.Types.DATE
106112
case TimestampType => java.sql.Types.TIMESTAMP
113+
case ntz if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => java.sql.Types.TIMESTAMP
107114
case BinaryType => java.sql.Types.BINARY
108115
case _: ArrayType => java.sql.Types.ARRAY
109116
case _: MapType => java.sql.Types.JAVA_OBJECT
@@ -118,6 +125,7 @@ object SchemaHelper {
118125
* For array, map, string, and binaries, the column size is variable, return null as unknown.
119126
*/
120127
def getColumnSize(sparkType: DataType): Option[Int] = sparkType match {
128+
case ntz if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => Some(ntz.defaultSize)
121129
case dt @ (BooleanType | _: NumericType | DateType | TimestampType |
122130
CalendarIntervalType | NullType) =>
123131
Some(dt.defaultSize)
@@ -145,6 +153,7 @@ object SchemaHelper {
145153
case DoubleType => Some(15)
146154
case d: DecimalType => Some(d.scale)
147155
case TimestampType => Some(6)
156+
case ntz if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => Some(6)
148157
case _ => None
149158
}
150159

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
3131
import org.apache.spark.sql.types._
3232

3333
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
34+
import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
3435
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
3536
import org.apache.kyuubi.operation.{HiveMetadataTests, SparkQueryTests}
3637
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -76,6 +77,12 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
7677
.add("c16", "binary", nullable = false, "16")
7778
.add("c17", "struct<X: string>", nullable = true, "17")
7879

80+
// since spark3.3.0
81+
if (SPARK_ENGINE_MAJOR_MINOR_VERSION._1 > 3 ||
82+
(SPARK_ENGINE_MAJOR_MINOR_VERSION._1 == 3 && SPARK_ENGINE_MAJOR_MINOR_VERSION._2 >= 3)) {
83+
schema.add("c18", "timestamp_ntz", nullable = true, "18")
84+
}
85+
7986
val ddl =
8087
s"""
8188
|CREATE TABLE IF NOT EXISTS $defaultSchema.$tableName (
@@ -110,7 +117,8 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
110117
TIMESTAMP,
111118
STRUCT,
112119
BINARY,
113-
STRUCT)
120+
STRUCT,
121+
TIMESTAMP)
114122

115123
var pos = 0
116124

@@ -137,6 +145,8 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
137145
case FloatType => assert(decimalDigits === 7)
138146
case DoubleType => assert(decimalDigits === 15)
139147
case TimestampType => assert(decimalDigits === 6)
148+
case ntz if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) =>
149+
assert(decimalDigits === 6)
140150
case _ => assert(decimalDigits === 0) // nulls
141151
}
142152

@@ -154,7 +164,7 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
154164
pos += 1
155165
}
156166

157-
assert(pos === 18, "all columns should have been verified")
167+
assert(pos === schema.length, "all columns should have been verified")
158168
}
159169

160170
val rowSet = metaData.getColumns(null, "*", "not_exist", "not_exist")

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,21 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
174174
}
175175
}
176176

177+
test("execute statement - select timestamp_ntz") {
178+
assume(SPARK_ENGINE_MAJOR_MINOR_VERSION._1 >= 3
179+
&& SPARK_ENGINE_MAJOR_MINOR_VERSION._2 > 2)
180+
withJdbcStatement() { statement =>
181+
val resultSet = statement.executeQuery(
182+
"SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.800) AS col")
183+
assert(resultSet.next())
184+
assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.800"))
185+
val metaData = resultSet.getMetaData
186+
assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
187+
assert(metaData.getPrecision(1) === 29)
188+
assert(metaData.getScale(1) === 9)
189+
}
190+
}
191+
177192
test("execute statement - select daytime interval") {
178193
withJdbcStatement() { statement =>
179194
Map(

0 commit comments

Comments
 (0)