Skip to content

Commit

Permalink
[KYUUBI-120]define interface class IKyuubiOperation to allow differen…
Browse files Browse the repository at this point in the history
…t implementation of kyuubiOperation

---

---
fix #181 fix #120
---
Squashed commit of the following:

commit 3e7f81f
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Thu May 23 18:32:08 2019 +0800

    fix unit test

commit 73763d7
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Thu May 23 09:48:21 2019 +0800

    set statementId to a val

commit 128effa
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Wed May 22 16:06:45 2019 +0800

    set isClosedOrCanceled to a protected method

commit 267b55f
Author: hustfeiwang <wangfei3@corp.netease.com>
Date:   Wed May 22 10:10:48 2019 +0800

    KYUUBI-120: define interface class IKyuubiOperation to allow different implementation of kyuubiOperation
  • Loading branch information
hustfeiwang authored and yaooqinn committed May 24, 2019
1 parent 4161226 commit 5ad2235
Show file tree
Hide file tree
Showing 13 changed files with 626 additions and 290 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.operation

import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.concurrent.{Future, RejectedExecutionException}

import scala.util.control.NonFatal

import org.apache.hive.service.cli.thrift.TProtocolVersion

import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.session.KyuubiSession

abstract class AbstractKyuubiOperation(session: KyuubiSession, statement: String)
extends IKyuubiOperation with Logging{

protected var state: OperationState = INITIALIZED
protected val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
protected val operationTimeout: Long
protected var lastAccessTime = System.currentTimeMillis()

protected var hasResultSet: Boolean = false
protected var operationException: KyuubiSQLException = _
protected var backgroundHandle: Future[_] = _
protected val statementId = UUID.randomUUID().toString

protected val DEFAULT_FETCH_ORIENTATION_SET: Set[FetchOrientation] =
Set(FetchOrientation.FETCH_NEXT, FetchOrientation.FETCH_FIRST)

def getBackgroundHandle: Future[_] = backgroundHandle

def setBackgroundHandle(backgroundHandle: Future[_]): Unit = {
this.backgroundHandle = backgroundHandle
}

override def getSession: KyuubiSession = session

override def getHandle: OperationHandle = opHandle

override def getProtocolVersion: TProtocolVersion = opHandle.getProtocolVersion

override def getStatus: OperationStatus = new OperationStatus(state, operationException)

protected def setOperationException(opEx: KyuubiSQLException): Unit = {
this.operationException = opEx
}

@throws[KyuubiSQLException]
protected def setState(newState: OperationState): Unit = {
state.validateTransition(newState)
this.state = newState
this.lastAccessTime = System.currentTimeMillis()
}

protected def checkState(state: OperationState): Boolean = {
this.state == state
}

protected def isClosedOrCanceled: Boolean = {
checkState(CLOSED) || checkState(CANCELED)
}

@throws[KyuubiSQLException]
protected def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state)
}
this.lastAccessTime = System.currentTimeMillis()
}

override def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
debug(s"CLOSING $statementId")
cleanup(CLOSED)
}

override def cancel(): Unit = {
info(s"Cancel '$statement' with $statementId")
cleanup(CANCELED)
}

protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
opHandle.setHasResultSet(hasResultSet)
}

/**
* Verify if the given fetch orientation is part of the default orientation types.
*/
@throws[KyuubiSQLException]
protected def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
}

/**
* Verify if the given fetch orientation is part of the supported orientation types.
*/
@throws[KyuubiSQLException]
protected def validateFetchOrientation(
orientation: FetchOrientation,
supportedOrientations: Set[FetchOrientation]): Unit = {
if (!supportedOrientations.contains(orientation)) {
throw new KyuubiSQLException(
"The fetch type " + orientation.toString + " is not supported for this resultset", "HY106")
}
}

protected def runInternal(): Unit = {
setState(PENDING)
setHasResultSet(true)

// Runnable impl to call runInternal asynchronously, from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
try {
session.ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: KyuubiSQLException => setOperationException(e)
}
}
})
} catch {
case e: Exception => setOperationException(new KyuubiSQLException(e))
}
}
}

try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
session.getSessionMgr.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(ERROR)
throw new KyuubiSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
}
}

protected def execute(): Unit

protected def onStatementError(id: String, message: String, trace: String): Unit = {
error(
s"""
|Error executing query as ${session.getUserName},
|$statement
|Current operation state ${this.state},
|$trace
""".stripMargin)
setState(ERROR)
}

protected def cleanup(state: OperationState) {
if (this.state != CLOSED) {
setState(state)
}
val backgroundHandle = getBackgroundHandle
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}

override def isTimedOut: Boolean = {
if (operationTimeout <= 0) {
false
} else {
// check only when it's in terminal state
state.isTerminal && lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.operation

import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.sql.types.StructType

import yaooqinn.kyuubi.KyuubiSQLException
import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.KyuubiSession

/**
* Interface class of KyuubiOperation.
*/
trait IKyuubiOperation {

/**
* Get relative IKyuubiSession.
*/
def getSession: KyuubiSession

/**
* Get relative OperationHandle.
*/
def getHandle: OperationHandle

/**
* Get the protocol version of this IKyuubiOperation.
*/
def getProtocolVersion: TProtocolVersion

/**
* Get current status of this IKyuubiOperation.
*/
def getStatus: OperationStatus

/**
* Get operation log.
*/
def getOperationLog: OperationLog

/**
* Run this IKyuubiOperation.
* @throws KyuubiSQLException
*/
@throws[KyuubiSQLException]
def run(): Unit

/**
* Close this IKyuubiOperation.
*/
def close(): Unit

/**
* Cancel this IKyuubiOperation.
*/
def cancel(): Unit

/**
* Get the schema of operation result set.
*/
def getResultSetSchema: StructType

/**
* Get the operation result set.
* @param order the fetch orientation, FETCH_FIRST or FETCH_NEXT.
* @param rowSetSize limit of result set.
* @return
*/
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet

/**
* Check whether this IKyuubiOperation has run more than the configured timeout duration.
*/
def isTimedOut: Boolean
}

0 comments on commit 5ad2235

Please sign in to comment.