Skip to content

Commit

Permalink
[KYUUBI #3915] Client support detecting ResultSet codec
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

to close #3915

This pr adds support for jdbc client detecting result set codec

1. in this PR, hints are added in the `TStatus.getInfoMessages()` to return, and the hints were added when the client retrieves the result set schema from the server
2. the hints mechanism is a general extension when we need to change the client behavior, e.g. adding support for  result set compression

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3916 from cfmcgrady/arrow-detect-codec.

Closes #3915

90495c3 [Fu Chen] style
bbeada0 [Fu Chen] address comment
825bc0d [Fu Chen] minor refactor
d0a01ff [Fu Chen] address comment
08d21a1 [Fu Chen] fix ut
690126c [Fu Chen] add hint ut
fd32a31 [Fu Chen] style
a1c2bb6 [Fu Chen] simplify KyuubiConnection
f81336d [Fu Chen] refactor
500e766 [Fu Chen] unused import
221bc92 [Fu Chen] fix ut
cf564d0 [Fu Chen] refactor
4b895e4 [Fu Chen] fix compile
3efcc33 [Fu Chen] clean up
95ea29c [Fu Chen] Client support detecting ResultSet codec

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
cfmcgrady authored and pan3793 committed Dec 13, 2022
1 parent b2831d7 commit 827ae40
Show file tree
Hide file tree
Showing 27 changed files with 242 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema}

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.flink.result.ResultSet
Expand Down Expand Up @@ -74,12 +74,15 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
}
}

override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = new TTableSchema()
resultSet.getColumns.asScala.zipWithIndex.foreach { case (f, i) =>
tTableSchema.addToColumns(RowSet.toTColumnDesc(f, i))
}
tTableSchema
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.Future

import org.apache.hive.service.cli.operation.{Operation, OperationManager}
import org.apache.hive.service.cli.session.{HiveSession, SessionManager => HiveSessionManager}
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
Expand Down Expand Up @@ -84,8 +84,12 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
Option(status.getOperationException).map(KyuubiSQLException(_)))
}

override def getResultSetSchema: TTableSchema = {
internalHiveOperation.getResultSetSchema.toTTableSchema
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = internalHiveOperation.getResultSetSchema.toTTableSchema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
resp
}

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.operation

import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
Expand Down Expand Up @@ -100,9 +100,13 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
getProtocolVersion)
}

override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schemaHelper = dialect.getSchemaHelper()
schemaHelper.toTTTableSchema(schema.columns)
val tTableSchema = schemaHelper.toTTTableSchema(schema.columns)
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}

override def shouldRunAsync: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,7 @@ class ExecuteStatement(
df
}
}

override def getResultSetMetadataHints(): Seq[String] =
Seq(s"__kyuubi_operation_result_codec__=$resultCodec")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId

import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.StructType
Expand All @@ -37,7 +37,7 @@ import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationS
import org.apache.kyuubi.operation.FetchOrientation._
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.{AbstractSession, Session}
import org.apache.kyuubi.session.Session

abstract class SparkOperation(session: Session)
extends AbstractOperation(session) {
Expand Down Expand Up @@ -160,8 +160,15 @@ abstract class SparkOperation(session: Session)
}
}

override def getResultSetSchema: TTableSchema =
SchemaHelper.toTTableSchema(resultSchema, timeZone.toString)
def getResultSetMetadataHints(): Seq[String] = Seq.empty

override def getResultSetMetadata: TGetResultSetMetadataResp = {
val resp = new TGetResultSetMetadataResp
val schema = SchemaHelper.toTTableSchema(resultSchema, timeZone.toString)
resp.setSchema(schema)
resp.setStatus(okStatusWithHints(getResultSetMetadataHints()))
resp
}

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
withLocalProperties {
Expand Down Expand Up @@ -200,14 +207,16 @@ abstract class SparkOperation(session: Session)
override def shouldRunAsync: Boolean = false

protected def arrowEnabled(): Boolean = {
// normalized config is required, to pass unit test
session.asInstanceOf[AbstractSession].normalizedConf
.getOrElse("kyuubi.operation.result.codec", "simple")
.equalsIgnoreCase("arrow") &&
resultCodec().equalsIgnoreCase("arrow") &&
// TODO: (fchen) make all operation support arrow
getClass.getCanonicalName == classOf[ExecuteStatement].getCanonicalName
}

protected def resultCodec(): String = {
// TODO: respect the config of the operation ExecuteStatement, if it was set.
spark.conf.get("kyuubi.operation.result.codec", "simple")
}

protected def setSessionUserSign(): Unit = {
(
session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.spark.operation

import java.sql.Statement

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.SparkDataTypeTests
Expand All @@ -33,15 +35,21 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp

override def resultCodec: String = "arrow"

test("make sure kyuubi.operation.result.codec=arrow") {
test("detect resultSet codec") {
withJdbcStatement() { statement =>
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === "arrow")
checkResultSetCodec(statement, "arrow")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_CODEC.key}=simple")
checkResultSetCodec(statement, "simple")
}
}

def checkResultSetCodec(statement: Statement, expectCodec: String): Unit = {
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === expectCodec)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io.IOException

import io.trino.client.Column
import io.trino.client.StatementClient
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.TTableSchema
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
Expand All @@ -47,7 +46,13 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio

protected var iter: FetchIterator[List[Any]] = _

override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(schema)
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = SchemaHelper.toTTableSchema(schema)
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.kyuubi.operation

import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TProgressUpdateResp, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
Expand Down Expand Up @@ -173,7 +175,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin

protected def getProtocolVersion: TProtocolVersion = session.protocol

override def getResultSetSchema: TTableSchema
override def getResultSetMetadata: TGetResultSetMetadataResp

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet

Expand Down Expand Up @@ -227,4 +229,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}

final val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS)

def okStatusWithHints(hints: Seq[String]): TStatus = {
val ok = new TStatus(TStatusCode.SUCCESS_STATUS)
ok.setInfoMessages(hints.asJava)
ok
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation

import java.util.concurrent.Future

import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}

import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -31,7 +31,7 @@ trait Operation {
def cancel(): Unit
def close(): Unit

def getResultSetSchema: TTableSchema
def getResultSetMetadata: TGetResultSetMetadataResp
def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet

def getSession: Session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
operation.close()
}

final def getOperationResultSetSchema(opHandle: OperationHandle): TTableSchema = {
getOperation(opHandle).getResultSetSchema
final def getOperationResultSetSchema(opHandle: OperationHandle): TGetResultSetMetadataResp = {
getOperation(opHandle).getResultSetMetadata
}

final def getOperationNextRowSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}

import scala.concurrent.CancellationException

import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet}

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{OperationHandle, OperationStatus}
Expand Down Expand Up @@ -188,7 +188,7 @@ abstract class AbstractBackendService(name: String)
.getOperation(operationHandle).getSession.closeOperation(operationHandle)
}

override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
override def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp = {
sessionManager.operationManager
.getOperation(operationHandle).getSession.getResultSetMetadata(operationHandle)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ trait BackendService {
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema
def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp
def fetchResults(
operationHandle: OperationHandle,
orientation: FetchOrientation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,17 +507,15 @@ abstract class TFrontendService(name: String)

override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
debug(req.toString)
val resp = new TGetResultSetMetadataResp
try {
val schema = be.getResultSetMetadata(OperationHandle(req.getOperationHandle))
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
be.getResultSetMetadata(OperationHandle(req.getOperationHandle))
} catch {
case e: Exception =>
error("Error getting result set metadata: ", e)
val resp = new TGetResultSetMetadataResp
resp.setStatus(KyuubiSQLException.toTStatus(e))
resp
}
resp
}

override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
Expand Down Expand Up @@ -647,4 +645,10 @@ private[kyuubi] object TFrontendService {

def getSessionHandle: SessionHandle = sessionHandle
}

def okStatusWithHint(hint: Seq[String]): TStatus = {
val ok = new TStatus(TStatusCode.SUCCESS_STATUS)
ok.setInfoMessages(hint.asJava)
ok
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ abstract class AbstractSession(
}

override def getResultSetMetadata(
operationHandle: OperationHandle): TTableSchema = withAcquireRelease() {
operationHandle: OperationHandle): TGetResultSetMetadataResp = withAcquireRelease() {
sessionManager.operationManager.getOperationResultSetSchema(operationHandle)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.session

import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet}

import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationHandle
Expand Down Expand Up @@ -86,7 +86,7 @@ trait Session {

def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema
def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp
def fetchResults(
operationHandle: OperationHandle,
orientation: FetchOrientation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.ByteBuffer

import scala.collection.JavaConverters._

import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
Expand Down Expand Up @@ -60,7 +60,7 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
setState(OperationState.CLOSED)
}

override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName("noop")
val desc = new TTypeDesc
Expand All @@ -70,7 +70,10 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
tColumnDesc.setPosition(0)
val schema = new TTableSchema()
schema.addToColumns(tColumnDesc)
schema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
resp
}

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
Expand Down

0 comments on commit 827ae40

Please sign in to comment.