diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index ba450828ad9..7f7bee400b2 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.engine.flink.operation import java.io.IOException +import java.time.ZoneId import scala.collection.JavaConverters.collectionAsScalaIterableConverter @@ -95,9 +96,15 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio case FETCH_FIRST => resultSet.getData.fetchAbsolute(0); } val token = resultSet.getData.take(rowSetSize) + val timeZone = Option(flinkSession.getSessionConfig.get("table.local-time-zone")) + val zoneId = timeZone match { + case Some(tz) => ZoneId.of(tz) + case None => ZoneId.systemDefault() + } val resultRowSet = RowSet.resultSetToTRowSet( token.toList, resultSet, + zoneId, getProtocolVersion) resultRowSet.setStartRowOffset(resultSet.getData.getPosition) resultRowSet diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala index 13cf5e717ae..c446396d5bb 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala @@ -21,7 +21,9 @@ import java.{lang, util} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.time.{LocalDate, LocalDateTime} +import java.time.{Instant, LocalDate, LocalDateTime, ZonedDateTime, ZoneId} +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, TextStyle} +import java.time.temporal.ChronoField import java.util.Collections import scala.collection.JavaConverters._ @@ -42,15 +44,16 @@ object RowSet { def resultSetToTRowSet( rows: Seq[Row], resultSet: ResultSet, + zoneId: ZoneId, protocolVersion: TProtocolVersion): TRowSet = { if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { - toRowBaseSet(rows, resultSet) + toRowBaseSet(rows, resultSet, zoneId) } else { - toColumnBasedSet(rows, resultSet) + toColumnBasedSet(rows, resultSet, zoneId) } } - def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = { + def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = { val rowSize = rows.size val tRows = new util.ArrayList[TRow](rowSize) var i = 0 @@ -60,7 +63,7 @@ object RowSet { val columnSize = row.getArity var j = 0 while (j < columnSize) { - val columnValue = toTColumnValue(j, row, resultSet) + val columnValue = toTColumnValue(j, row, resultSet, zoneId) tRow.addToColVals(columnValue) j += 1 } @@ -71,14 +74,14 @@ object RowSet { new TRowSet(0, tRows) } - def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = { + def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = { val size = rows.length val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size)) val columnSize = resultSet.getColumns.size() var i = 0 while (i < columnSize) { val field = resultSet.getColumns.get(i) - val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType) + val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType, zoneId) tRowSet.addToColumns(tColumn) i += 1 } @@ -88,7 +91,8 @@ object RowSet { private def toTColumnValue( ordinal: Int, row: Row, - resultSet: ResultSet): TColumnValue = { + resultSet: ResultSet, + zoneId: ZoneId): TColumnValue = { val column = resultSet.getColumns.get(ordinal) val logicalType = column.getDataType.getLogicalType @@ -153,6 +157,12 @@ object RowSet { s"for type ${t.getClass}.") } TColumnValue.stringVal(tStringValue) + case _: LocalZonedTimestampType => + val tStringValue = new TStringValue + val fieldValue = row.getField(ordinal) + tStringValue.setValue(TIMESTAMP_LZT_FORMATTER.format( + ZonedDateTime.ofInstant(fieldValue.asInstanceOf[Instant], zoneId))) + TColumnValue.stringVal(tStringValue) case t => val tStringValue = new TStringValue if (row.getField(ordinal) != null) { @@ -166,7 +176,11 @@ object RowSet { ByteBuffer.wrap(bitSet.toByteArray) } - private def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: LogicalType): TColumn = { + private def toTColumn( + rows: Seq[Row], + ordinal: Int, + logicalType: LogicalType, + zoneId: ZoneId): TColumn = { val nulls = new java.util.BitSet() // for each column, determine the conversion class by sampling the first non-value value // if there's no row, set the entire column empty @@ -211,6 +225,12 @@ object RowSet { s"for type ${t.getClass}.") } TColumn.stringVal(new TStringColumn(values, nulls)) + case _: LocalZonedTimestampType => + val values = getOrSetAsNull[Instant](rows, ordinal, nulls, Instant.EPOCH) + .toArray().map(v => + TIMESTAMP_LZT_FORMATTER.format( + ZonedDateTime.ofInstant(v.asInstanceOf[Instant], zoneId))) + TColumn.stringVal(new TStringColumn(values.toList.asJava, nulls)) case _ => var i = 0 val rowSize = rows.length @@ -303,13 +323,14 @@ object RowSet { case _: DecimalType => TTypeId.DECIMAL_TYPE case _: DateType => TTypeId.DATE_TYPE case _: TimestampType => TTypeId.TIMESTAMP_TYPE + case _: LocalZonedTimestampType => TTypeId.TIMESTAMPLOCALTZ_TYPE case _: ArrayType => TTypeId.ARRAY_TYPE case _: MapType => TTypeId.MAP_TYPE case _: RowType => TTypeId.STRUCT_TYPE case _: BinaryType => TTypeId.BINARY_TYPE case _: VarBinaryType => TTypeId.BINARY_TYPE case _: TimeType => TTypeId.STRING_TYPE - case t @ (_: ZonedTimestampType | _: LocalZonedTimestampType | _: MultisetType | + case t @ (_: ZonedTimestampType | _: MultisetType | _: YearMonthIntervalType | _: DayTimeIntervalType) => throw new IllegalArgumentException( "Flink data type `%s` is not supported currently".format(t.asSummaryString()), @@ -377,4 +398,26 @@ object RowSet { other.toString } } + + /** should stay in sync with org.apache.kyuubi.jdbc.hive.common.TimestampTZUtil */ + var TIMESTAMP_LZT_FORMATTER: DateTimeFormatter = { + val builder = new DateTimeFormatterBuilder + // Date part + builder.append(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + // Time part + builder + .optionalStart + .appendLiteral(" ") + .append(DateTimeFormatter.ofPattern("HH:mm:ss")) + .optionalStart + .appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true) + .optionalEnd + .optionalEnd + + // Zone part + builder.optionalStart.appendLiteral(" ").optionalEnd + builder.optionalStart.appendZoneText(TextStyle.NARROW).optionalEnd + + builder.toFormatter + } } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 6c9ad5134a6..4fa9c280e9d 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -756,6 +756,23 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { } } + test("execute statement - select timestamp with local time zone") { + withJdbcStatement() { statement => + statement.executeQuery("CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3)") + statement.executeQuery("SET 'table.local-time-zone' = 'UTC'") + val resultSetUTC = statement.executeQuery("SELECT * FROM T1") + val metaData = resultSetUTC.getMetaData + assert(metaData.getColumnType(1) === java.sql.Types.OTHER) + assert(resultSetUTC.next()) + assert(resultSetUTC.getString(1) === "1970-01-01 00:00:04.001 UTC") + + statement.executeQuery("SET 'table.local-time-zone' = 'America/Los_Angeles'") + val resultSetPST = statement.executeQuery("SELECT * FROM T1") + assert(resultSetPST.next()) + assert(resultSetPST.getString(1) === "1969-12-31 16:00:04.001 America/Los_Angeles") + } + } + test("execute statement - select time") { withJdbcStatement() { statement => val resultSet = diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala index 9190456b32b..9ee5c658bc9 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.flink.result +import java.time.ZoneId + import org.apache.flink.table.api.{DataTypes, ResultKind} import org.apache.flink.table.catalog.Column import org.apache.flink.table.data.StringData @@ -44,9 +46,10 @@ class ResultSetSuite extends KyuubiFunSuite { .data(rowsNew) .build - assert(RowSet.toRowBaseSet(rowsNew, resultSetNew) - === RowSet.toRowBaseSet(rowsOld, resultSetOld)) - assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew) - === RowSet.toColumnBasedSet(rowsOld, resultSetOld)) + val timeZone = ZoneId.of("America/Los_Angeles") + assert(RowSet.toRowBaseSet(rowsNew, resultSetNew, timeZone) + === RowSet.toRowBaseSet(rowsOld, resultSetOld, timeZone)) + assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew, timeZone) + === RowSet.toColumnBasedSet(rowsOld, resultSetOld, timeZone)) } }