Skip to content

Commit

Permalink
fix #153 hive serde orc file reader has a thread pool for parallel sp…
Browse files Browse the repository at this point in the history
…lits gathering which case token issue in kerberized clusters (#155)

* fix #153 hive serde orc file reader has a threadpool for parallel splits gathering which case token issue in kerberized clusters

* ut

*  fix ut
  • Loading branch information
yaooqinn committed Feb 28, 2019
1 parent 51e7a5d commit 185d60a
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,15 @@ object KyuubiSparkUtil extends Logging {
val MULTIPLE_CONTEXTS: String = SPARK_PREFIX + DRIVER_PREFIX + "allowMultipleContexts"
val MULTIPLE_CONTEXTS_DEFAULT = "true"

// Spark SQL
val CATALOG_IMPL: String = SPARK_PREFIX + SQL_PREFIX + "catalogImplementation"
val CATALOG_IMPL_DEFAULT = "hive"
val CONVERT_METASTORE_ORC: String =
SPARK_PREFIX + SQL_PREFIX + HIVE_PREFIX + "convertMetastoreOrc"
val ORC_IMPL: String = SPARK_PREFIX + SQL_PREFIX + "orc.impl"
val ORC_IMPL_DEFAULT = "native"
val ORC_VECTORIZED_READER_ENABLED: String =
SPARK_PREFIX + SQL_PREFIX + "orc.enableVectorizedReader"

val DEPLOY_MODE: String = SPARK_PREFIX + "submit.deployMode"
val DEPLOY_MODE_DEFAULT = "client"
Expand Down Expand Up @@ -289,6 +296,20 @@ object KyuubiSparkUtil extends Logging {

conf.setIfMissing(SPARK_LOCAL_DIR, conf.get(KyuubiConf.BACKEND_SESSION_LOCAL_DIR.key))
conf.setIfMissing(GC_INTERVAL, GC_INTERVAL_DEFAULT)
if (UserGroupInformation.isSecurityEnabled) {
// Spark SQL does not use its internal ORC implementation to r/w hive orc tables by default,
// but use hive serde which will cause token expiration issues while getting file splits.
// So we force to set CONVERT_METASTORE_ORC to true if we are using Kyuubi with kerberized
// hadoop clusters.
// see https://github.com/yaooqinn/kyuubi/issues/153
conf.set(CONVERT_METASTORE_ORC, "true")
} else {
// If the cluster is not secured, firstly we respect user's choice if the explicitly set, and
// take true as default value for better performance if users are not concerned.
conf.setIfMissing(CONVERT_METASTORE_ORC, "true")
}
conf.setIfMissing(ORC_IMPL, ORC_IMPL_DEFAULT)
conf.setIfMissing(ORC_VECTORIZED_READER_ENABLED, "true")

if (UserGroupInformation.isSecurityEnabled) {
conf.setIfMissing(HDFS_CLIENT_CACHE, HDFS_CLIENT_CACHE_DEFAULT)
Expand Down
27 changes: 6 additions & 21 deletions kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/Handle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package yaooqinn.kyuubi.cli

import java.util.Objects

import org.apache.hive.service.cli.thrift.THandleIdentifier

abstract class Handle(val handleId: HandleIdentifier) {
Expand All @@ -25,29 +27,12 @@ abstract class Handle(val handleId: HandleIdentifier) {

def getHandleIdentifier: HandleIdentifier = handleId

override def hashCode: Int = {
val prime = 31
var result = 1
result = prime * result + (if (handleId == null) 0 else handleId.hashCode)
result
}
override def hashCode: Int = 31 * 1 + Objects.hashCode(handleId)

override def equals(obj: Any): Boolean = {
if (obj == null) return false

if (!obj.isInstanceOf[Handle]) return false

val other = obj.asInstanceOf[Handle]
if (this eq other) return true

if (handleId == null) {
if (other.handleId != null) {
return false
}
} else if (handleId != other.handleId) {
return false
obj match {
case o: Handle => Objects.equals(handleId, o.handleId)
case _ => false
}

true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package yaooqinn.kyuubi.cli

import java.nio.ByteBuffer
import java.util.UUID
import java.util.{Objects, UUID}

import org.apache.hive.service.cli.thrift.THandleIdentifier

class HandleIdentifier(val publicId: UUID, val secretId: UUID) {
case class HandleIdentifier(publicId: UUID, secretId: UUID) {

def this() = this(UUID.randomUUID(), UUID.randomUUID())

Expand Down Expand Up @@ -56,28 +56,11 @@ class HandleIdentifier(val publicId: UUID, val secretId: UUID) {
}

override def equals(obj: Any): Boolean = {
if (obj == null) return false
if (!obj.isInstanceOf[HandleIdentifier]) return false

val other = obj.asInstanceOf[HandleIdentifier]
if (this eq other) return true

if (publicId == null) {
if (other.publicId != null) {
return false
}
} else if (!(publicId == other.publicId)) {
return false
}

if (secretId == null) {
if (other.secretId != null) {
return false
}
} else if (!(secretId == other.secretId)) {
return false
obj match {
case HandleIdentifier(pid, sid)
if Objects.equals(publicId, pid) && Objects.equals(secretId, sid) => true
case _ => false
}
true
}

override def toString: String = Option(publicId).map(_.toString).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package yaooqinn.kyuubi.operation

import java.util.Objects

import org.apache.hive.service.cli.thrift.{TOperationHandle, TProtocolVersion}

import yaooqinn.kyuubi.cli.{Handle, HandleIdentifier}

class OperationHandle private(
val opType: OperationType,
opType: OperationType,
protocol: TProtocolVersion,
handleId: HandleIdentifier) extends Handle(handleId) {

private[this] var hasResultSet: Boolean = false
private var hasResultSet: Boolean = false

def this(opType: OperationType, protocol: TProtocolVersion) =
this(opType, protocol, new HandleIdentifier)
Expand Down Expand Up @@ -60,22 +62,13 @@ class OperationHandle private(

def getProtocolVersion: TProtocolVersion = protocol

override def hashCode: Int = {
val prime = 31
var result = super.hashCode
result = prime * result + (if (opType == null) 0 else opType.hashCode)
result
}
override def hashCode: Int = 31 * super.hashCode + Objects.hashCode(opType)

override def equals(obj: Any): Boolean = {
if (!super.equals(obj)) return false
if (!obj.isInstanceOf[OperationHandle]) return false
val other = obj.asInstanceOf[OperationHandle]
if (this eq other) return true
if (opType != other.opType) {
return false
obj match {
case o: OperationHandle if opType == o.getOperationType && super.equals(o) => true
case _ => false
}
true
}

override def toString: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,114 @@ package yaooqinn.kyuubi.operation
import org.apache.hive.service.cli.thrift.{TOperationType, TProtocolVersion}
import org.apache.spark.SparkFunSuite

import yaooqinn.kyuubi.cli.HandleIdentifier

class OperationHandleSuite extends SparkFunSuite {

import TProtocolVersion._

test("operation handle basic tests") {
val handle1 = new OperationHandle(
EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
val handle1 = new OperationHandle(EXECUTE_STATEMENT, HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(!handle1.isHasResultSet)
handle1.setHasResultSet(true)
assert(handle1.isHasResultSet)
assert(handle1.toTOperationHandle.isHasResultSet)
assert(handle1.opType === EXECUTE_STATEMENT)
assert(handle1.getOperationType === EXECUTE_STATEMENT)
assert(handle1.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(handle1.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8)

val handle2 = new OperationHandle(
handle1.toTOperationHandle, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
handle1.toTOperationHandle, HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(handle2.isHasResultSet)
assert(handle2.toTOperationHandle.isHasResultSet)
assert(handle2.opType === EXECUTE_STATEMENT)
assert(handle2.getOperationType === EXECUTE_STATEMENT)
assert(handle2.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(handle1 === handle2)
assert(handle2.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V8)
assert(handle1 equals handle2)

val handle3 = new OperationHandle(handle2.toTOperationHandle)
assert(handle3.isHasResultSet)
assert(handle3.toTOperationHandle.isHasResultSet)
assert(handle3.opType === EXECUTE_STATEMENT)
assert(handle3.getOperationType === EXECUTE_STATEMENT)
assert(handle3.getProtocolVersion === TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
assert(handle3.getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1)
assert(handle3 === handle2)

assert(handle1.toTOperationHandle.getOperationType === TOperationType.EXECUTE_STATEMENT)
assert(handle1.toString === handle3.toString)
assert(handle1.toTOperationHandle.getOperationId ===
handle1.getHandleIdentifier.toTHandleIdentifier)
}

test("operation handle to string") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
val hStr = handle.toString
assert(hStr.startsWith(classOf[OperationHandle].getSimpleName))
assert(hStr.contains(opType.toString))
assert(hStr.contains(handle.getOperationType.toString))
assert(hStr.contains(handle.getHandleIdentifier.toString))
}

test("operation handle equals") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
assert(!handle.equals(new Object()))
assert(handle.equals(handle))
val handle2 = new OperationHandle(opType, protocol)
assert(!handle.equals(handle2))
val handle3 = new OperationHandle(handle.toTOperationHandle)
assert(handle.equals(handle3))
val handle4 = new OperationHandle(handle.toTOperationHandle,
HIVE_CLI_SERVICE_PROTOCOL_V7)
assert(handle.equals(handle4))
val ctor = classOf[OperationHandle].getDeclaredConstructor(
classOf[OperationType], classOf[TProtocolVersion], classOf[HandleIdentifier])
ctor.setAccessible(true)
val handle5 = ctor.newInstance(GET_TYPE_INFO, protocol, handle.getHandleIdentifier)
assert(handle5.isInstanceOf[OperationHandle])
assert(handle5.getOperationType !== handle.getOperationType)
assert(handle5.getHandleIdentifier === handle.getHandleIdentifier)
assert(handle5.getProtocolVersion === handle.getProtocolVersion)
assert(!handle.equals(handle5))
}

test("operation handle hash code") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
val prime = 31
assert(handle.hashCode ===
prime * (prime * 1 + handle.getHandleIdentifier.hashCode) + opType.hashCode())
}

test("operation handle get protocol version") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
assert(handle.getProtocolVersion === protocol)
assert(new OperationHandle(handle.toTOperationHandle).getProtocolVersion ===
HIVE_CLI_SERVICE_PROTOCOL_V1)
assert(new OperationHandle(handle.toTOperationHandle,
HIVE_CLI_SERVICE_PROTOCOL_V1).getProtocolVersion === HIVE_CLI_SERVICE_PROTOCOL_V1)
}

test("operation handle has result set") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
assert(!handle.isHasResultSet)
handle.setHasResultSet(false)
assert(!handle.isHasResultSet)
handle.setHasResultSet(true)
assert(handle.isHasResultSet)
}

test("operation handle to tOperationType") {
val opType = EXECUTE_STATEMENT
val protocol = HIVE_CLI_SERVICE_PROTOCOL_V8
val handle = new OperationHandle(opType, protocol)
assert(!handle.toTOperationHandle.isHasResultSet)
handle.setHasResultSet(true)
assert(handle.toTOperationHandle.isHasResultSet)
}
}

0 comments on commit 185d60a

Please sign in to comment.