Skip to content

Commit d0c92ca

Browse files
Nick-0723yaooqinn
authored andcommitted
[KYUUBI #2207] Support newly added spark data types: DayTimeIntervalType/YearMonthIntervalType
### _Why are the changes needed?_ #2207 Support newly added datatypes spark3.2 : DayTimeIntervalType/YearMonthIntervalType ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2212 from Nick-0723/2207. Closes #2207 c08ee99 [Nick Song] revert DayTimeIntervalType's part abc56be [Nick Song] support newly data types DayTimeIntervalType/YearMonthIntervalType/TimestampNTZType Authored-by: Nick Song <chun2184@163.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent fb3e123 commit d0c92ca

File tree

5 files changed

+105
-91
lines changed

5 files changed

+105
-91
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/IntervalQualifier.scala

Lines changed: 0 additions & 66 deletions
This file was deleted.

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.schema
2020
import java.nio.ByteBuffer
2121
import java.nio.charset.StandardCharsets
2222
import java.sql.Timestamp
23-
import java.time.{Duration, Instant, LocalDate, ZoneId}
23+
import java.time._
2424
import java.util.Date
2525

2626
import scala.collection.JavaConverters._
@@ -29,7 +29,6 @@ import org.apache.hive.service.rpc.thrift._
2929
import org.apache.spark.sql.Row
3030
import org.apache.spark.sql.types._
3131

32-
import org.apache.kyuubi.engine.spark.schema.IntervalQualifier.{DAY, HOUR, MINUTE, SECOND}
3332
import org.apache.kyuubi.util.RowSetUtils._
3433

3534
object RowSet {
@@ -255,16 +254,9 @@ object RowSet {
255254
// Only match string in nested type values
256255
"\"" + s + "\""
257256

258-
case (d: Duration, dt) =>
259-
if (dt.simpleString == DAY.toString()) {
260-
IntervalQualifier.toDayTimeIntervalString(d, DAY)
261-
} else if (dt.simpleString == HOUR.toString()) {
262-
IntervalQualifier.toDayTimeIntervalString(d, HOUR)
263-
} else if (dt.simpleString == MINUTE.toString()) {
264-
IntervalQualifier.toDayTimeIntervalString(d, MINUTE)
265-
} else {
266-
IntervalQualifier.toDayTimeIntervalString(d, SECOND)
267-
}
257+
case (d: Duration, _) => toDayTimeIntervalString(d)
258+
259+
case (p: Period, _) => toYearMonthIntervalString(p)
268260

269261
case (seq: scala.collection.Seq[_], ArrayType(typ, _)) =>
270262
seq.map(v => (v, typ)).map(e => toHiveString(e, timeZone)).mkString("[", ",", "]")

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ object SchemaHelper {
4141
case TimestampType => TTypeId.TIMESTAMP_TYPE
4242
case BinaryType => TTypeId.BINARY_TYPE
4343
case CalendarIntervalType => TTypeId.STRING_TYPE
44-
case dt if dt.simpleString.startsWith("interval day") =>
44+
case dt if dt.getClass.getSimpleName.equals("DayTimeIntervalType") =>
4545
TTypeId.INTERVAL_DAY_TIME_TYPE
46+
case ym if ym.getClass.getSimpleName.equals("YearMonthIntervalType") =>
47+
TTypeId.INTERVAL_YEAR_MONTH_TYPE
4648
case _: ArrayType => TTypeId.ARRAY_TYPE
4749
case _: MapType => TTypeId.MAP_TYPE
4850
case _: StructType => TTypeId.STRUCT_TYPE

kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,24 @@ package org.apache.kyuubi.util
1919

2020
import java.nio.ByteBuffer
2121
import java.sql.Timestamp
22-
import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
22+
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
2323
import java.time.chrono.IsoChronology
2424
import java.time.format.DateTimeFormatter
2525
import java.time.format.DateTimeFormatterBuilder
2626
import java.time.temporal.ChronoField
2727
import java.util.{Date, Locale}
28+
import java.util.concurrent.TimeUnit
2829

2930
import scala.language.implicitConversions
3031

3132
import org.apache.commons.lang3.time.FastDateFormat
3233

3334
private[kyuubi] object RowSetUtils {
3435

36+
final private val SECOND_PER_MINUTE: Long = 60L
37+
final private val SECOND_PER_HOUR: Long = SECOND_PER_MINUTE * 60L
38+
final private val SECOND_PER_DAY: Long = SECOND_PER_HOUR * 24L
39+
3540
private lazy val dateFormatter = {
3641
createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd")
3742
.toFormatter(Locale.US)
@@ -79,4 +84,27 @@ private[kyuubi] object RowSetUtils {
7984
implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
8085
ByteBuffer.wrap(bitSet.toByteArray)
8186
}
87+
88+
def toDayTimeIntervalString(d: Duration): String = {
89+
var rest = d.getSeconds
90+
var sign = ""
91+
if (d.getSeconds < 0) {
92+
sign = "-"
93+
rest = -rest
94+
}
95+
val days = TimeUnit.SECONDS.toDays(rest)
96+
rest %= SECOND_PER_DAY
97+
val hours = TimeUnit.SECONDS.toHours(rest)
98+
rest %= SECOND_PER_HOUR
99+
val minutes = TimeUnit.SECONDS.toMinutes(rest)
100+
val seconds = rest % SECOND_PER_MINUTE
101+
f"$sign$days $hours%02d:$minutes%02d:$seconds%02d.${d.getNano}%09d"
102+
}
103+
104+
def toYearMonthIntervalString(d: Period): String = {
105+
val years = d.getYears
106+
val months = d.getMonths
107+
val sign = if (years < 0 || months < 0) "-" else ""
108+
s"$sign${Math.abs(years)}-${Math.abs(months)}"
109+
}
82110
}

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import scala.collection.JavaConverters._
2424
import org.apache.commons.lang3.StringUtils
2525
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq, TStatusCode}
2626

27-
import org.apache.kyuubi.KYUUBI_VERSION
27+
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
2828

2929
trait SparkQueryTests extends HiveJDBCTestHelper {
3030

31+
protected lazy val SPARK_ENGINE_MAJOR_MINOR_VERSION: (Int, Int) = sparkEngineMajorMinorVersion
32+
3133
test("execute statement - select null") {
3234
withJdbcStatement() { statement =>
3335
val resultSet = statement.executeQuery("SELECT NULL AS col")
@@ -175,25 +177,70 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
175177
test("execute statement - select daytime interval") {
176178
withJdbcStatement() { statement =>
177179
Map(
178-
"-interval 2 day" -> "-2 00:00:00.000000000",
179-
"-interval 200 day" -> "-200 00:00:00.000000000",
180-
"interval 1 day 1 hour" -> "1 01:00:00.000000000",
181-
"interval 1 day 1 hour -60 minutes" -> "1 00:00:00.000000000",
182-
"interval 1 day 1 hour -60 minutes 30 seconds" -> "1 00:00:30.000000000",
180+
"interval 1 day 1 hour -60 minutes 30 seconds" ->
181+
Tuple2("1 00:00:30.000000000", "1 days 30 seconds"),
182+
"interval 30 seconds 12345 milliseconds" ->
183+
Tuple2("0 00:00:42.345000000", "42.345 seconds"),
184+
"interval 1 hour 59 minutes 30 seconds 12345 milliseconds" ->
185+
Tuple2("0 01:59:42.345000000", "1 hours 59 minutes 42.345 seconds"),
186+
"-interval 2 day" -> Tuple2("-2 00:00:00.000000000", "-2 days"),
187+
"interval 59 minutes 30 seconds 12345 milliseconds" ->
188+
Tuple2("0 00:59:42.345000000", "59 minutes 42.345 seconds"),
189+
"interval 25 hour" -> Tuple2("1 01:00:00.000000000", "25 hours"),
190+
"interval 1 hour 62 minutes" -> Tuple2("0 02:02:00.000000000", "2 hours 2 minutes"),
183191
"interval 1 day 1 hour 59 minutes 30 seconds 12345 milliseconds" ->
184-
"1 01:59:42.345000000").foreach { kv => // value -> result pair
192+
Tuple2("1 01:59:42.345000000", "1 days 1 hours 59 minutes 42.345 seconds"),
193+
"interval 1 day 1 hour -60 minutes" -> Tuple2("1 00:00:00.000000000", "1 days"),
194+
"INTERVAL 30 SECONDS" -> Tuple2("0 00:00:30.000000000", "30 seconds"),
195+
"interval -60 minutes 30 seconds" ->
196+
Tuple2("-0 00:59:30.000000000", "-59 minutes -30 seconds"),
197+
"-interval 200 day" -> Tuple2("-200 00:00:00.000000000", "-200 days"),
198+
"interval 1 hour -60 minutes 30 seconds" -> Tuple2("0 00:00:30.000000000", "30 seconds"),
199+
"interval 62 minutes" -> Tuple2("0 01:02:00.000000000", "1 hours 2 minutes"),
200+
"interval 1 day 1 hour" -> Tuple2("1 01:00:00.000000000", "1 days 1 hours")).foreach {
201+
kv => // value -> result pair
202+
val resultSet = statement.executeQuery(s"SELECT ${kv._1} AS col")
203+
assert(resultSet.next())
204+
val result = resultSet.getString("col")
205+
val metaData = resultSet.getMetaData
206+
if (SPARK_ENGINE_MAJOR_MINOR_VERSION._1 == 3
207+
&& SPARK_ENGINE_MAJOR_MINOR_VERSION._2 < 2) {
208+
// for spark 3.1 and backwards
209+
assert(result === kv._2._2)
210+
assert(metaData.getPrecision(1) === Int.MaxValue)
211+
assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.VARCHAR)
212+
} else {
213+
assert(result === kv._2._1)
214+
assert(metaData.getPrecision(1) === 29)
215+
assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER)
216+
}
217+
assert(metaData.getScale(1) === 0)
218+
}
219+
}
220+
}
221+
222+
test("execute statement - select year/month interval") {
223+
withJdbcStatement() { statement =>
224+
Map(
225+
"INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"),
226+
"INTERVAL '2021-07' YEAR TO MONTH" -> Tuple2("2021-7", "2021 years 7 months"),
227+
"INTERVAL 3 MONTH" -> Tuple2("0-3", "3 months"),
228+
"INTERVAL 241 MONTH" -> Tuple2("20-1", "20 years 1 months"),
229+
"INTERVAL -1 year -25 MONTH" -> Tuple2("-3-1", "-3 years -1 months"),
230+
"INTERVAL 3 year -25 MONTH" -> Tuple2("0-11", "11 months")).foreach { kv =>
185231
val resultSet = statement.executeQuery(s"SELECT ${kv._1} AS col")
186232
assert(resultSet.next())
187233
val result = resultSet.getString("col")
188234
val metaData = resultSet.getMetaData
189-
if (result.contains("days")) {
235+
if (SPARK_ENGINE_MAJOR_MINOR_VERSION._1 == 3
236+
&& SPARK_ENGINE_MAJOR_MINOR_VERSION._2 < 2) {
190237
// for spark 3.1 and backwards
191-
assert(result.split("days").head.trim === kv._2.split(" ").head)
238+
assert(result === kv._2._2)
192239
assert(metaData.getPrecision(1) === Int.MaxValue)
193240
assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.VARCHAR)
194241
} else {
195-
assert(result === kv._2)
196-
assert(metaData.getPrecision(1) === 29)
242+
assert(result === kv._2._1)
243+
assert(metaData.getPrecision(1) === 11)
197244
assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.OTHER)
198245
}
199246
assert(metaData.getScale(1) === 0)
@@ -545,4 +592,15 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
545592
assert(foundOperationLangItem)
546593
}
547594
}
595+
596+
def sparkEngineMajorMinorVersion: (Int, Int) = {
597+
var sparkRuntimeVer = ""
598+
withJdbcStatement() { stmt =>
599+
val result = stmt.executeQuery("SELECT version()")
600+
assert(result.next())
601+
sparkRuntimeVer = result.getString(1)
602+
assert(!result.next())
603+
}
604+
Utils.majorMinorVersion(sparkRuntimeVer)
605+
}
548606
}

0 commit comments

Comments
 (0)