Skip to content

Commit

Permalink
Add new FrontendService
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jul 21, 2020
1 parent eacaa60 commit 4d2ec8a
Show file tree
Hide file tree
Showing 63 changed files with 2,265 additions and 426 deletions.
4 changes: 3 additions & 1 deletion kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
import scala.util.{Success, Try}

import org.apache.hadoop.security.UserGroupInformation

private[kyuubi] object Utils extends Logging {

import org.apache.kyuubi.config.KyuubiConf._
Expand Down Expand Up @@ -65,7 +67,6 @@ private[kyuubi] object Utils extends Logging {
}.getOrElse(Map.empty)
}


/**
* Return a well-formed URI for the file described by a user input string.
*
Expand Down Expand Up @@ -121,4 +122,5 @@ private[kyuubi] object Utils extends Logging {
dir
}

def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.kyuubi.config

import java.time.Duration
import java.time.format.DateTimeParseException

private[kyuubi] case class ConfigBuilder(key: String) {

private[config] var _doc = ""
Expand Down Expand Up @@ -66,6 +69,25 @@ private[kyuubi] case class ConfigBuilder(key: String) {
def stringConf: TypedConfigBuilder[String] = {
new TypedConfigBuilder(this, identity)
}

def timeConf: TypedConfigBuilder[Long] = {
def timeFromStr(str: String): Long = {
try {
Duration.parse(str).toMillis
} catch {
case e: DateTimeParseException =>
throw new IllegalArgumentException(s"The formats accepted are based on the ISO-8601" +
s" duration format `PnDTnHnMn.nS` with days considered to be exactly 24 hours." +
s" $str for $key is not valid", e)
}
}

def timeToStr(v: Long): String = {
Duration.ofMillis(v).toString
}

new TypedConfigBuilder[Long](this, timeFromStr, timeToStr)
}
}

private[kyuubi] case class TypedConfigBuilder[T](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.kyuubi.config

import java.time.Duration
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.service.authentication.AuthTypes

case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
private val settings = new ConcurrentHashMap[String, String]()
Expand Down Expand Up @@ -72,6 +77,23 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
def unset(entry: ConfigEntry[_]): KyuubiConf = {
unset(entry.key)
}

/** Get all parameters as map */
def getAll: Map[String, String] = {
settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toMap[String, String]
}

/**
* Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to
* the result map with the `dropped` of key being dropped.
* @param dropped first part of prefix which will dropped for the new key
* @param remainder second part of the prefix which will be remained in the key
*/
def getAllWithPrefix(dropped: String, remainder: String): Map[String, String] = {
getAll.filter { case (k, _) => k.startsWith(s"$dropped.$remainder")}.map {
case (k, v) => (k.substring(dropped.length), v)
}
}
}

object KyuubiConf {
Expand Down Expand Up @@ -108,4 +130,66 @@ object KyuubiConf {
.stringConf
.createOptional

val OPERATION_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("operation.idle.timeout")
.doc("Operation will be closed when it's not accessed for this duration of time")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofHours(3).toMillis)


/////////////////////////////////////////////////////////////////////////////////////////////////
// Frontend Service Configuration //
/////////////////////////////////////////////////////////////////////////////////////////////////

val FRONTEND_BIND_HOST: OptionalConfigEntry[String] = buildConf("frontend.bind.host")
.doc("Hostname or IP of the machine on which to run the frontend service.")
.version("1.0.0")
.stringConf
.createOptional

val FRONTEND_BIND_PORT: ConfigEntry[Int] = buildConf("frontend.bind.port")
.doc("Port of the machine on which to run the frontend service.")
.version("1.0.0")
.intConf
.checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
.createWithDefault(10009)

val FRONTEND_MIN_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.min.worker.threads")
.doc("Minimum number of threads in the of frontend worker thread pool for the frontend" +
" service")
.version("1.0.0")
.intConf
.createWithDefault(9)

val FRONTEND_MAX_WORKER_THREADS: ConfigEntry[Int] = buildConf("frontend.max.worker.threads")
.doc("Maximum number of threads in the of frontend worker thread pool for the frontend" +
" service")
.version("1.0.0")
.intConf
.createWithDefault(99)

val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("frontend.worker.keepalive.time")
.doc("Keep-alive time (in milliseconds) for an idle worker thread")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(60).toMillis)

val FRONTEND_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
buildConf("frontend.max.message.size")
.doc("Maximum message size in bytes a Kyuubi server will accept.")
.intConf
.createWithDefault(104857600)

val FRONTEND_LOGIN_TIMEOUT: ConfigEntry[Long] =
buildConf("frontend.login.timeout")
.doc("Timeout for Thrift clients during login to the frontend service.")
.timeConf
.createWithDefault(Duration.ofSeconds(20).toMillis)

val FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH: ConfigEntry[Long] =
buildConf("frontend.backoff.slot.length")
.doc("Time to back off during login to the frontend service.")
.timeConf
.createWithDefault(Duration.ofMillis(100).toMillis)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.operation

import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.session.Session

abstract class AbstractOperation(opType: OperationType, session: Session) extends Operation {
import OperationState._
private final val handle = OperationHandle(opType, session.protocol)
private final val operationTimeout: Long = session.conf.get(KyuubiConf.OPERATION_IDLE_TIMEOUT)

@volatile private var state: OperationState = INITIALIZED
@volatile protected var startTime: Long = _
@volatile protected var completedTime: Long = _
@volatile protected var lastAccessTime: Long = System.currentTimeMillis()

@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false

protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
handle.setHasResultSet(hasResultSet)
}

protected def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
}

protected def setState(newState: OperationState): Unit = {
OperationState.validateTransition(state, newState)
state = newState

state match {
case RUNNING => startTime = System.currentTimeMillis()
case ERROR | FINISHED | CANCELED => completedTime = System.currentTimeMillis()
case _ =>
}

lastAccessTime = System.currentTimeMillis()
}

protected def isClosedOrCanceled: Boolean = {
state == OperationState.CLOSED || state == OperationState.CANCELED
}

protected def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new IllegalStateException(s"Expected state $state, but found ${this.state}")
}
lastAccessTime = System.currentTimeMillis()
}

/**
* Verify if the given fetch orientation is part of the default orientation types.
*/
protected def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
validateFetchOrientation(orientation, Operation.DEFAULT_FETCH_ORIENTATION_SET)
}

/**
* Verify if the given fetch orientation is part of the supported orientation types.
*/
private def validateFetchOrientation(
orientation: FetchOrientation,
supportedOrientations: Set[FetchOrientation]): Unit = {
if (!supportedOrientations.contains(orientation)) {
throw KyuubiSQLException(s"The fetch type $orientation is not supported for this resultset")
}
}

protected def runInternal(): Unit

protected def beforeRun(): Unit

protected def afterRun(): Unit

override def run(): Unit = {
beforeRun()
try {
runInternal()
} finally {
afterRun()
}
}

override def cancel(): Unit

override def close(): Unit

override def getProtocolVersion: TProtocolVersion = handle.protocol

override def getResultSetSchema: TTableSchema

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet

override def getSession: Session = session

override def getHandle: OperationHandle

override def getStatus: OperationStatus = {
OperationStatus(state, startTime, completedTime, hasResultSet, Option(operationException))
}

override def shouldRunAsync: Boolean

override def isTimedOut: Boolean = {
if (operationTimeout <= 0) {
false
} else {
OperationState.isTerminal(state) &&
lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,18 @@ trait Operation {

def getProtocolVersion: TProtocolVersion
def getResultSetSchema: TTableSchema
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): TRowSet
def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet

def getSession: Session
def getHandle: OperationHandle
def getStatus: OperationStatus

def shouldRunAsync: Boolean
def isTimedOut: Boolean

}

object Operation {
val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ case class OperationHandle(

def setHasResultSet(hasResultSet: Boolean): Unit = _hasResultSet = hasResultSet

def toTOperationHandle: TOperationHandle = {
implicit def toTOperationHandle: TOperationHandle = {
val tOperationHandle = new TOperationHandle
tOperationHandle.setOperationId(identifier.toTHandleIdentifier)
tOperationHandle.setOperationType(OperationType.toTOperationType(typ))
Expand Down Expand Up @@ -70,4 +70,12 @@ object OperationHandle {
def apply(tOperationHandle: TOperationHandle): OperationHandle = {
apply(tOperationHandle, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
}

implicit def toTOperationHandle(handle: OperationHandle): TOperationHandle = {
val tOperationHandle = new TOperationHandle
tOperationHandle.setOperationId(handle.identifier.toTHandleIdentifier)
tOperationHandle.setOperationType(OperationType.toTOperationType(handle.typ))
tOperationHandle.setHasResultSet(handle._hasResultSet)
tOperationHandle
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.operation

import scala.language.implicitConversions

import org.apache.hive.service.rpc.thrift.TOperationState

import org.apache.kyuubi.KyuubiSQLException
Expand All @@ -28,7 +30,7 @@ object OperationState extends Enumeration {

val INITIALIZED, PENDING, RUNNING, FINISHED, TIMEOUT, CANCELED, CLOSED, ERROR, UNKNOWN = Value

def toTOperationState(from: OperationState): TOperationState = from match {
implicit def toTOperationState(from: OperationState): TOperationState = from match {
case INITIALIZED => INITIALIZED_STATE
case PENDING => PENDING_STATE
case RUNNING => RUNNING_STATE
Expand Down

0 comments on commit 4d2ec8a

Please sign in to comment.