From 86f6cb735a5ee7bcec26d90eba1f8321523ed61e Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Mon, 13 Feb 2023 22:24:41 +0800 Subject: [PATCH 01/10] arrow based rowset timestamp as string --- .../spark/operation/ExecuteStatement.scala | 13 ++--- .../spark/operation/SparkOperation.scala | 4 ++ .../kyuubi/engine/spark/schema/RowSet.scala | 8 ++- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 18 +++++- .../SparkArrowbasedOperationSuite.scala | 55 ++++++++++++++++++- .../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++ .../jdbc/hive/KyuubiArrowBasedResultSet.java | 3 +- .../jdbc/hive/KyuubiArrowQueryResultSet.java | 54 ++++++++++-------- .../kyuubi/jdbc/hive/KyuubiStatement.java | 16 +++++- .../hive/arrow/ArrowColumnarBatchRow.java | 16 ++++-- 10 files changed, 149 insertions(+), 46 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index fac90f7ead1..2b90525c1ec 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -162,17 +162,12 @@ class ExecuteStatement( } } - // TODO:(fchen) make this configurable - val kyuubiBeelineConvertToString = true - def convertComplexType(df: DataFrame): DataFrame = { - if (kyuubiBeelineConvertToString) { - SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df) - } else { - df - } + SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString) } override def getResultSetMetadataHints(): Seq[String] = - Seq(s"__kyuubi_operation_result_format__=$resultFormat") + Seq( + s"__kyuubi_operation_result_format__=$resultFormat", + s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString") } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 06884534d9c..4edae989873 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -279,6 +279,10 @@ abstract class SparkOperation(session: Session) spark.conf.get("kyuubi.operation.result.format", "thrift") } + protected def timestampAsString: Boolean = { + spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "true").toBoolean + } + protected def setSessionUserSign(): Unit = { ( session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY), diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index 7be70403d5d..204dcc98c66 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -25,15 +25,19 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.HiveResult +import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.types._ import org.apache.kyuubi.util.RowSetUtils._ object RowSet { - def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = { + def toHiveString( + valueAndType: (Any, DataType), + nested: Boolean = false, + timeFormatters: TimeFormatters = HiveResult.getTimeFormatters): String = { // compatible w/ Spark 3.1 and above - val timeFormatters = HiveResult.getTimeFormatters +// val timeFormatters = HiveResult.getTimeFormatters HiveResult.toHiveString(valueAndType, nested, timeFormatters) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 46c3bce4d73..19373beea2d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -21,6 +21,8 @@ import java.time.ZoneId import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -31,12 +33,14 @@ object SparkDatasetHelper { ds.toArrowBatchRdd } - def convertTopLevelComplexTypeToHiveString(df: DataFrame): DataFrame = { + def convertTopLevelComplexTypeToHiveString( + df: DataFrame, + timestampAsString: Boolean): DataFrame = { val timeZone = ZoneId.of(df.sparkSession.sessionState.conf.sessionLocalTimeZone) val quotedCol = (name: String) => col(quoteIfNeeded(name)) - // an udf to call `RowSet.toHiveString` on complex types(struct/array/map). + // an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type. val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => { val dt = DataType.fromDDL(schemaDDL) dt match { @@ -46,6 +50,8 @@ object SparkDatasetHelper { RowSet.toHiveString((row.toSeq.head, at), nested = true) case StructType(Array(StructField(_, mt: MapType, _, _))) => RowSet.toHiveString((row.toSeq.head, mt), nested = true) + case StructType(Array(StructField(_, tt: TimestampType, _, _))) => + RowSet.toHiveString((row.toSeq.head, tt), nested = true, getTimeFormatters(timeZone)) case _ => throw new UnsupportedOperationException } @@ -56,6 +62,8 @@ object SparkDatasetHelper { toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name) case sf @ StructField(name, _: MapType | _: ArrayType, _, _) => toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) + case sf @ StructField(name, _: TimestampType, _, _) if timestampAsString => + toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) case StructField(name, _, _, _) => quotedCol(name) } df.select(cols: _*) @@ -72,4 +80,10 @@ object SparkDatasetHelper { s"`${part.replace("`", "``")}`" } } + + private def getTimeFormatters(timeZone: ZoneId): TimeFormatters = { + val dateFormatter = DateFormatter() + val timestampFormatter = TimestampFormatter.getFractionFormatter(timeZone) + TimeFormatters(dateFormatter, timestampFormatter) + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index e464569147c..5d8758f2a49 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -43,7 +43,48 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp } } - def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = { + test("Spark session timezone format") { + withJdbcStatement() { statement => + def check(expect: String): Unit = { + val query = + """ + |SELECT + | from_utc_timestamp( + | from_unixtime( + | 1670404535000 / 1000, 'yyyy-MM-dd HH:mm:ss' + | ), + | 'GMT+08:00' + | ) + |""".stripMargin + val resultSet = statement.executeQuery(query) + assert(resultSet.next()) + assert(resultSet.getString(1) == expect) + } + + def setTimeZone(timeZone: String): Unit = { + val rs = statement.executeQuery(s"set spark.sql.session.timeZone=$timeZone") + assert(rs.next()) + } + + // timestampAsString = true + statement.executeQuery(s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=true") + checkArrowBasedRowSetTimestampAsString(statement, "true") + setTimeZone("UTC") + check("2022-12-07 17:15:35.0") + setTimeZone("GMT+8") + check("2022-12-08 01:15:35.0") + + // timestampAsString = false + statement.executeQuery(s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=false") + checkArrowBasedRowSetTimestampAsString(statement, "false") + setTimeZone("UTC") + check("2022-12-08 01:15:35.0") + setTimeZone("GMT+8") + check("2022-12-08 01:15:35.0") + } + } + + private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = { val query = s""" |SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col @@ -52,4 +93,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp assert(resultSet.next()) assert(resultSet.getString("col") === expectFormat) } + + private def checkArrowBasedRowSetTimestampAsString( + statement: Statement, + expect: String): Unit = { + val query = + s""" + |SELECT '$${hivevar:${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}}' AS col + |""".stripMargin + val resultSet = statement.executeQuery(query) + assert(resultSet.next()) + assert(resultSet.getString("col") === expect) + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 01bd46bd384..58cdcf7a495 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1676,6 +1676,14 @@ object KyuubiConf { .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("thrift") + val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] = + buildConf("kyuubi.operation.result.arrow.timestampAsString") + .doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" + + " transmission.") + .version("1.7.0") + .booleanConf + .createWithDefault(true) + val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = buildConf("kyuubi.operation.log.dir.root") .doc("Root directory for query operation log at server-side.") diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java index c3e75c0ea0e..43b1a11ae81 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java @@ -50,6 +50,7 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet { protected Schema arrowSchema; protected VectorSchemaRoot root; protected ArrowColumnarBatchRow row; + protected boolean timestampAsString = true; protected BufferAllocator allocator; @@ -312,7 +313,7 @@ private Object getColumnValue(int columnIndex) throws SQLException { if (wasNull) { return null; } else { - return row.get(columnIndex - 1, columnType); + return row.get(columnIndex - 1, columnType, timestampAsString); } } catch (Exception e) { e.printStackTrace(); diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java index 1f2af29dc16..fda70f463e9 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java @@ -58,9 +58,6 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet { private boolean isScrollable = false; private boolean fetchFirst = false; - // TODO:(fchen) make this configurable - protected boolean convertComplexTypeToString = true; - private final TProtocolVersion protocol; public static class Builder { @@ -87,6 +84,8 @@ public static class Builder { private boolean isScrollable = false; private ReentrantLock transportLock = null; + private boolean timestampAsString = true; + public Builder(Statement statement) throws SQLException { this.statement = statement; this.connection = statement.getConnection(); @@ -153,6 +152,11 @@ public Builder setScrollable(boolean setScrollable) { return this; } + public Builder setTimestampAsString(boolean timestampAsString) { + this.timestampAsString = timestampAsString; + return this; + } + public Builder setTransportLock(ReentrantLock transportLock) { this.transportLock = transportLock; return this; @@ -189,10 +193,10 @@ protected KyuubiArrowQueryResultSet(Builder builder) throws SQLException { this.maxRows = builder.maxRows; } this.isScrollable = builder.isScrollable; + this.timestampAsString = builder.timestampAsString; this.protocol = builder.getProtocolVersion(); arrowSchema = - ArrowUtils.toArrowSchema( - columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes); + ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes); if (allocator == null) { initArrowSchemaAndAllocator(); } @@ -271,8 +275,7 @@ private void retrieveSchema() throws SQLException { columnAttributes.add(getColumnAttributes(primitiveTypeEntry)); } arrowSchema = - ArrowUtils.toArrowSchema( - columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes); + ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes); } catch (SQLException eS) { throw eS; // rethrow the SQLException as is } catch (Exception ex) { @@ -480,22 +483,25 @@ public boolean isClosed() { return isClosed; } - private List convertComplexTypeToStringType(List colTypes) { - if (convertComplexTypeToString) { - return colTypes.stream() - .map( - type -> { - if (type == TTypeId.ARRAY_TYPE - || type == TTypeId.MAP_TYPE - || type == TTypeId.STRUCT_TYPE) { - return TTypeId.STRING_TYPE; - } else { - return type; - } - }) - .collect(Collectors.toList()); - } else { - return colTypes; - } + /** + * 1. the complex types (map/array/struct) are always converted to string type to transport 2. if + * the user set `timestampAsString = true`, then the timestamp type will be converted to string + * type too. + */ + private List convertToStringType(List colTypes) { + return colTypes.stream() + .map( + type -> { + if ((type == TTypeId.ARRAY_TYPE + || type == TTypeId.MAP_TYPE + || type == TTypeId.STRUCT_TYPE) // complex type (map/array/struct) + // timestamp type + || (type == TTypeId.TIMESTAMP_TYPE && timestampAsString)) { + return TTypeId.STRING_TYPE; + } else { + return type; + } + }) + .collect(Collectors.toList()); } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index ab7c06a5589..6d3bf1fa09b 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -37,6 +37,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName()); public static final int DEFAULT_FETCH_SIZE = 1000; public static final String DEFAULT_RESULT_FORMAT = "thrift"; + public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "true"; private final KyuubiConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; @@ -45,7 +46,8 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { private int fetchSize = DEFAULT_FETCH_SIZE; private boolean isScrollableResultset = false; private boolean isOperationComplete = false; - private Map properties = new HashMap<>(); + + private Map properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); /** * We need to keep a reference to the result set to support the following: * statement.execute(String sql); @@ -213,6 +215,11 @@ private boolean executeWithConfOverlay(String sql, Map confOverl LOG.info("kyuubi.operation.result.format: " + resultFormat); switch (resultFormat) { case "arrow": + boolean timestampAsString = + Boolean.parseBoolean( + properties.getOrDefault( + "__kyuubi_operation_result_arrow_timestampAsString__", + DEFAULT_ARROW_TIMESTAMP_AS_STRING)); resultSet = new KyuubiArrowQueryResultSet.Builder(this) .setClient(client) @@ -222,6 +229,7 @@ private boolean executeWithConfOverlay(String sql, Map confOverl .setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .setSchema(columnNames, columnTypes, columnAttributes) + .setTimestampAsString(timestampAsString) .build(); break; default: @@ -270,6 +278,11 @@ public boolean executeAsync(String sql) throws SQLException { LOG.info("kyuubi.operation.result.format: " + resultFormat); switch (resultFormat) { case "arrow": + boolean timestampAsString = + Boolean.parseBoolean( + properties.getOrDefault( + "__kyuubi_operation_result_arrow_timestampAsString__", + DEFAULT_ARROW_TIMESTAMP_AS_STRING)); resultSet = new KyuubiArrowQueryResultSet.Builder(this) .setClient(client) @@ -279,6 +292,7 @@ public boolean executeAsync(String sql) throws SQLException { .setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .setSchema(columnNames, columnTypes, columnAttributes) + .setTimestampAsString(timestampAsString) .build(); break; default: diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index fa914ce5d7d..4caadcd29d6 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -104,7 +104,7 @@ public Object getMap(int ordinal) { throw new UnsupportedOperationException(); } - public Object get(int ordinal, TTypeId dataType) { + public Object get(int ordinal, TTypeId dataType, boolean timestampAsString) { long seconds; long milliseconds; long microseconds; @@ -131,11 +131,15 @@ public Object get(int ordinal, TTypeId dataType) { case STRING_TYPE: return getString(ordinal); case TIMESTAMP_TYPE: - microseconds = getLong(ordinal); - nanos = (int) (microseconds % 1000000) * 1000; - Timestamp timestamp = new Timestamp(microseconds / 1000); - timestamp.setNanos(nanos); - return timestamp; + if (timestampAsString) { + return Timestamp.valueOf(getString(ordinal)); + } else { + microseconds = getLong(ordinal); + nanos = (int) (microseconds % 1000000) * 1000; + Timestamp timestamp = new Timestamp(microseconds / 1000); + timestamp.setNanos(nanos); + return timestamp; + } case DATE_TYPE: return DateUtils.internalToDate(getInt(ordinal)); case INTERVAL_DAY_TIME_TYPE: From 87c6f9ef80b89a6f25e3b6b384e9eb7f3056c001 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Tue, 14 Feb 2023 16:22:12 +0800 Subject: [PATCH 02/10] update docs --- docs/deployment/settings.md | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 7391f424169..c3c90d47c5c 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -429,22 +429,23 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Operation -| Key | Default | Meaning | Type | Since | -|-----------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 | -| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 | -| kyuubi.operation.language | SQL | Choose a programing language for the following inputs
  • SQL: (Default) Run all following statements as SQL queries.
  • SCALA: Run all following input as scala codes
  • PYTHON: (Experimental) Run all following input as Python codes with Spark engine
| string | 1.5.0 | -| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 | -| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 | -| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 | -| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | -| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | -| kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | -| kyuubi.operation.result.format | thrift | Specify the result format, available configs are:
  • THRIFT: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
| string | 1.7.0 | -| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | -| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | -| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | -| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 | +| Key | Default | Meaning | Type | Since | +|-------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 | +| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 | +| kyuubi.operation.language | SQL | Choose a programing language for the following inputs
  • SQL: (Default) Run all following statements as SQL queries.
  • SCALA: Run all following input as scala codes
  • PYTHON: (Experimental) Run all following input as Python codes with Spark engine
| string | 1.5.0 | +| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 | +| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 | +| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 | +| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | +| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | +| kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | +| kyuubi.operation.result.arrow.timestampAsString | true | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 | +| kyuubi.operation.result.format | thrift | Specify the result format, available configs are:
  • THRIFT: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
| string | 1.7.0 | +| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | +| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | +| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | +| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 | ### Server From b8e4b2886fc19f8f53ccc3a54be07896e72e9da2 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 15 Feb 2023 16:00:28 +0800 Subject: [PATCH 03/10] fix ut --- docs/deployment/settings.md | 2 +- .../engine/spark/operation/SparkOperation.scala | 4 ++-- .../apache/kyuubi/engine/spark/schema/RowSet.scala | 7 ++----- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 13 +------------ .../scala/org/apache/kyuubi/config/KyuubiConf.scala | 3 ++- 5 files changed, 8 insertions(+), 21 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c3c90d47c5c..9f173b3b039 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -440,7 +440,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | | kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | | kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | -| kyuubi.operation.result.arrow.timestampAsString | true | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 | +| kyuubi.operation.result.arrow.timestampAsString | true | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. Note that the timestamp column has a different behavior compared to thrift when it's false, but it brings better transfer performance. | boolean | 1.7.0 | | kyuubi.operation.result.format | thrift | Specify the result format, available configs are:
  • THRIFT: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
| string | 1.7.0 | | kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | | kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 4edae989873..0c178bb4777 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -24,7 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} @@ -136,7 +136,7 @@ abstract class SparkOperation(session: Session) spark.sparkContext.setLocalProperty protected def withLocalProperties[T](f: => T): T = { - SQLConf.withExistingConf(spark.sessionState.conf) { + SQLExecution.withSQLConfPropagated(spark) { val originalSession = SparkSession.getActiveSession try { SparkSession.setActiveSession(spark) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index 204dcc98c66..ec39cc178c8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ import org.apache.spark.sql.Row import org.apache.spark.sql.execution.HiveResult -import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.types._ import org.apache.kyuubi.util.RowSetUtils._ @@ -34,11 +33,9 @@ object RowSet { def toHiveString( valueAndType: (Any, DataType), - nested: Boolean = false, - timeFormatters: TimeFormatters = HiveResult.getTimeFormatters): String = { + nested: Boolean = false): String = { // compatible w/ Spark 3.1 and above -// val timeFormatters = HiveResult.getTimeFormatters - HiveResult.toHiveString(valueAndType, nested, timeFormatters) + HiveResult.toHiveString(valueAndType, nested, HiveResult.getTimeFormatters) } def toTRowSet( diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 19373beea2d..1a542937338 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -17,12 +17,8 @@ package org.apache.spark.sql.kyuubi -import java.time.ZoneId - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -36,7 +32,6 @@ object SparkDatasetHelper { def convertTopLevelComplexTypeToHiveString( df: DataFrame, timestampAsString: Boolean): DataFrame = { - val timeZone = ZoneId.of(df.sparkSession.sessionState.conf.sessionLocalTimeZone) val quotedCol = (name: String) => col(quoteIfNeeded(name)) @@ -51,7 +46,7 @@ object SparkDatasetHelper { case StructType(Array(StructField(_, mt: MapType, _, _))) => RowSet.toHiveString((row.toSeq.head, mt), nested = true) case StructType(Array(StructField(_, tt: TimestampType, _, _))) => - RowSet.toHiveString((row.toSeq.head, tt), nested = true, getTimeFormatters(timeZone)) + RowSet.toHiveString((row.toSeq.head, tt), nested = true) case _ => throw new UnsupportedOperationException } @@ -80,10 +75,4 @@ object SparkDatasetHelper { s"`${part.replace("`", "``")}`" } } - - private def getTimeFormatters(timeZone: ZoneId): TimeFormatters = { - val dateFormatter = DateFormatter() - val timestampFormatter = TimestampFormatter.getFractionFormatter(timeZone) - TimeFormatters(dateFormatter, timestampFormatter) - } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 58cdcf7a495..3693e60ffd6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1679,7 +1679,8 @@ object KyuubiConf { val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] = buildConf("kyuubi.operation.result.arrow.timestampAsString") .doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" + - " transmission.") + " transmission. Note that the timestamp column has a different behavior compared to" + + " thrift when it's false, but it brings better transfer performance.") .version("1.7.0") .booleanConf .createWithDefault(true) From f5601356e06c8cc8d980072313a86d6a9a1ef16c Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Thu, 16 Feb 2023 16:12:18 +0800 Subject: [PATCH 04/10] debug info --- .../spark/operation/SparkArrowbasedOperationSuite.scala | 7 +++++++ .../kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java | 9 +++++---- .../org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java | 1 + 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index 5d8758f2a49..5324ceb6997 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -35,6 +35,13 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp override def resultFormat: String = "arrow" + override def beforeEach(): Unit = { + super.beforeEach() + withJdbcStatement() { statement => + checkResultSetFormat(statement, "arrow") + } + } + test("detect resultSet format") { withJdbcStatement() { statement => checkResultSetFormat(statement, "arrow") diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index 4caadcd29d6..cd8a0aec056 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -135,8 +135,9 @@ public Object get(int ordinal, TTypeId dataType, boolean timestampAsString) { return Timestamp.valueOf(getString(ordinal)); } else { microseconds = getLong(ordinal); - nanos = (int) (microseconds % 1000000) * 1000; - Timestamp timestamp = new Timestamp(microseconds / 1000); + System.out.println("microseconds: " + microseconds); + nanos = (int) (microseconds % 1_000_000) * 1000; + Timestamp timestamp = new Timestamp(microseconds / 1_000); timestamp.setNanos(nanos); return timestamp; } @@ -144,8 +145,8 @@ public Object get(int ordinal, TTypeId dataType, boolean timestampAsString) { return DateUtils.internalToDate(getInt(ordinal)); case INTERVAL_DAY_TIME_TYPE: microseconds = getLong(ordinal); - seconds = microseconds / 1000000; - nanos = (int) (microseconds % 1000000) * 1000; + seconds = microseconds / 1_000_000; + nanos = (int) (microseconds % 1_000_000) * 1_000; return new HiveIntervalDayTime(seconds, nanos); case INTERVAL_YEAR_MONTH_TYPE: return new HiveIntervalYearMonth(getInt(ordinal)); diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java index 9a777d4c240..26189dc242b 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java @@ -86,6 +86,7 @@ public static ArrowType toArrowType(TTypeId ttype, JdbcColumnAttributes jdbcColu return new ArrowType.Date(DateUnit.DAY); case TIMESTAMP_TYPE: if (jdbcColumnAttributes != null) { + System.out.println("session timeZone: " + jdbcColumnAttributes.timeZone); return new ArrowType.Timestamp(TimeUnit.MICROSECOND, jdbcColumnAttributes.timeZone); } else { throw new IllegalStateException("Missing timezoneId where it is mandatory."); From 78b7cabadef63712804206333c8b95e6bb0ff99e Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Thu, 16 Feb 2023 16:48:27 +0800 Subject: [PATCH 05/10] fix --- .../SparkArrowbasedOperationSuite.scala | 2 +- .../jdbc/hive/KyuubiArrowBasedResultSet.java | 8 ++++++- .../hive/arrow/ArrowColumnarBatchRow.java | 21 ++++++++++++------- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index 5324ceb6997..277a6bd1698 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -85,7 +85,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp statement.executeQuery(s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=false") checkArrowBasedRowSetTimestampAsString(statement, "false") setTimeZone("UTC") - check("2022-12-08 01:15:35.0") + check("2022-12-07 17:15:35.0") setTimeZone("GMT+8") check("2022-12-08 01:15:35.0") } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java index 43b1a11ae81..1336bf84074 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java @@ -312,8 +312,14 @@ private Object getColumnValue(int columnIndex) throws SQLException { wasNull = row.isNullAt(columnIndex - 1); if (wasNull) { return null; + } else if (columnType == TTypeId.TIMESTAMP_TYPE) { + return row.get( + columnIndex - 1, + columnType, + columnAttributes.get(columnIndex - 1).timeZone, + timestampAsString); } else { - return row.get(columnIndex - 1, columnType, timestampAsString); + return row.get(columnIndex - 1, columnType, null, timestampAsString); } } catch (Exception e) { e.printStackTrace(); diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index cd8a0aec056..3033c87403a 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -19,6 +19,11 @@ import java.math.BigDecimal; import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import org.apache.arrow.vector.util.DateUtility; import org.apache.hive.service.rpc.thrift.TTypeId; import org.apache.kyuubi.jdbc.hive.common.DateUtils; import org.apache.kyuubi.jdbc.hive.common.HiveIntervalDayTime; @@ -104,7 +109,7 @@ public Object getMap(int ordinal) { throw new UnsupportedOperationException(); } - public Object get(int ordinal, TTypeId dataType, boolean timestampAsString) { + public Object get(int ordinal, TTypeId dataType, String timeZone, boolean timestampAsString) { long seconds; long milliseconds; long microseconds; @@ -134,12 +139,14 @@ public Object get(int ordinal, TTypeId dataType, boolean timestampAsString) { if (timestampAsString) { return Timestamp.valueOf(getString(ordinal)); } else { - microseconds = getLong(ordinal); - System.out.println("microseconds: " + microseconds); - nanos = (int) (microseconds % 1_000_000) * 1000; - Timestamp timestamp = new Timestamp(microseconds / 1_000); - timestamp.setNanos(nanos); - return timestamp; + LocalDateTime localDateTime = + DateUtility.getLocalDateTimeFromEpochMicro(getLong(ordinal), timeZone); + long millis = TimeUnit.MICROSECONDS.toMillis(getLong(ordinal)); + TimeZone zone = TimeZone.getTimeZone(timeZone); + localDateTime = + localDateTime.minus( + zone.getOffset(millis) - zone.getOffset(millis), ChronoUnit.MILLIS); + return Timestamp.valueOf(localDateTime); } case DATE_TYPE: return DateUtils.internalToDate(getInt(ordinal)); From 289b60074fce9c3b87d5d2e9928fefa431f175da Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 17 Feb 2023 10:07:57 +0800 Subject: [PATCH 06/10] timstampAsString = false by default --- docs/deployment/settings.md | 2 +- .../spark/operation/SparkOperation.scala | 2 +- .../SparkArrowbasedOperationSuite.scala | 24 +++++++------------ .../org/apache/kyuubi/config/KyuubiConf.scala | 5 ++-- .../kyuubi/jdbc/hive/KyuubiStatement.java | 2 +- .../kyuubi/jdbc/hive/arrow/ArrowUtils.java | 1 - 6 files changed, 14 insertions(+), 22 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 9f173b3b039..75fc3d91b14 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -440,7 +440,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | | kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | | kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | -| kyuubi.operation.result.arrow.timestampAsString | true | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. Note that the timestamp column has a different behavior compared to thrift when it's false, but it brings better transfer performance. | boolean | 1.7.0 | +| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 | | kyuubi.operation.result.format | thrift | Specify the result format, available configs are:
  • THRIFT: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
| string | 1.7.0 | | kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | | kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 0c178bb4777..a6a7fc896af 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -280,7 +280,7 @@ abstract class SparkOperation(session: Session) } protected def timestampAsString: Boolean = { - spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "true").toBoolean + spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "false").toBoolean } protected def setSessionUserSign(): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index 277a6bd1698..60cc528912d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -73,21 +73,15 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp assert(rs.next()) } - // timestampAsString = true - statement.executeQuery(s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=true") - checkArrowBasedRowSetTimestampAsString(statement, "true") - setTimeZone("UTC") - check("2022-12-07 17:15:35.0") - setTimeZone("GMT+8") - check("2022-12-08 01:15:35.0") - - // timestampAsString = false - statement.executeQuery(s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=false") - checkArrowBasedRowSetTimestampAsString(statement, "false") - setTimeZone("UTC") - check("2022-12-07 17:15:35.0") - setTimeZone("GMT+8") - check("2022-12-08 01:15:35.0") + Seq("true", "false").foreach { timestampAsString => + statement.executeQuery( + s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=$timestampAsString") + checkArrowBasedRowSetTimestampAsString(statement, timestampAsString) + setTimeZone("UTC") + check("2022-12-07 17:15:35.0") + setTimeZone("GMT+8") + check("2022-12-08 01:15:35.0") + } } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 3693e60ffd6..0c6a30c24e2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1679,11 +1679,10 @@ object KyuubiConf { val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] = buildConf("kyuubi.operation.result.arrow.timestampAsString") .doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" + - " transmission. Note that the timestamp column has a different behavior compared to" + - " thrift when it's false, but it brings better transfer performance.") + " transmission.") .version("1.7.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = buildConf("kyuubi.operation.log.dir.root") diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index 6d3bf1fa09b..b452ca6aa36 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -37,7 +37,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName()); public static final int DEFAULT_FETCH_SIZE = 1000; public static final String DEFAULT_RESULT_FORMAT = "thrift"; - public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "true"; + public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false"; private final KyuubiConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java index 26189dc242b..9a777d4c240 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowUtils.java @@ -86,7 +86,6 @@ public static ArrowType toArrowType(TTypeId ttype, JdbcColumnAttributes jdbcColu return new ArrowType.Date(DateUnit.DAY); case TIMESTAMP_TYPE: if (jdbcColumnAttributes != null) { - System.out.println("session timeZone: " + jdbcColumnAttributes.timeZone); return new ArrowType.Timestamp(TimeUnit.MICROSECOND, jdbcColumnAttributes.timeZone); } else { throw new IllegalStateException("Missing timezoneId where it is mandatory."); From 6c4eb507227c56e478abd9255c7644fb813f8d15 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 17 Feb 2023 10:27:05 +0800 Subject: [PATCH 07/10] minor --- .../kyuubi/jdbc/hive/JdbcColumnAttributes.java | 2 +- .../kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java index 06fb398999a..b0257cfff09 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java @@ -20,7 +20,7 @@ public class JdbcColumnAttributes { public int precision = 0; public int scale = 0; - public String timeZone = ""; + public String timeZone = null; public JdbcColumnAttributes() {} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java index 1336bf84074..ef5008503aa 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java @@ -312,18 +312,19 @@ private Object getColumnValue(int columnIndex) throws SQLException { wasNull = row.isNullAt(columnIndex - 1); if (wasNull) { return null; - } else if (columnType == TTypeId.TIMESTAMP_TYPE) { + } else { + JdbcColumnAttributes attributes = columnAttributes.get(columnIndex - 1); return row.get( columnIndex - 1, columnType, - columnAttributes.get(columnIndex - 1).timeZone, + attributes == null ? null : attributes.timeZone, timestampAsString); - } else { - return row.get(columnIndex - 1, columnType, null, timestampAsString); } } catch (Exception e) { - e.printStackTrace(); - throw new KyuubiSQLException("Unrecognized column type:", e); + throw new KyuubiSQLException( + String.format( + "Error getting row of type %s at column index %d", columnType, columnIndex - 1), + e); } } From b714b3eedade21624d75a137068b325826dc4b27 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 17 Feb 2023 10:31:58 +0800 Subject: [PATCH 08/10] revert externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala --- .../org/apache/kyuubi/engine/spark/schema/RowSet.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index ec39cc178c8..7be70403d5d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -31,11 +31,10 @@ import org.apache.kyuubi.util.RowSetUtils._ object RowSet { - def toHiveString( - valueAndType: (Any, DataType), - nested: Boolean = false): String = { + def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = { // compatible w/ Spark 3.1 and above - HiveResult.toHiveString(valueAndType, nested, HiveResult.getTimeFormatters) + val timeFormatters = HiveResult.getTimeFormatters + HiveResult.toHiveString(valueAndType, nested, timeFormatters) } def toTRowSet( From d864db000053a5b086be6a2be8e8714f6d9e6156 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 17 Feb 2023 18:46:03 +0800 Subject: [PATCH 09/10] address comment --- .../apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index 3033c87403a..b0321c7556c 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -141,11 +141,6 @@ public Object get(int ordinal, TTypeId dataType, String timeZone, boolean timest } else { LocalDateTime localDateTime = DateUtility.getLocalDateTimeFromEpochMicro(getLong(ordinal), timeZone); - long millis = TimeUnit.MICROSECONDS.toMillis(getLong(ordinal)); - TimeZone zone = TimeZone.getTimeZone(timeZone); - localDateTime = - localDateTime.minus( - zone.getOffset(millis) - zone.getOffset(millis), ChronoUnit.MILLIS); return Timestamp.valueOf(localDateTime); } case DATE_TYPE: From 38c7fc9b7d62b911cedf1ff5836a3c6466c5ffb1 Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Fri, 17 Feb 2023 19:00:55 +0800 Subject: [PATCH 10/10] fix style --- .../apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index b0321c7556c..373867069b4 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -20,9 +20,6 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; import org.apache.arrow.vector.util.DateUtility; import org.apache.hive.service.rpc.thrift.TTypeId; import org.apache.kyuubi.jdbc.hive.common.DateUtils;