Skip to content

Commit

Permalink
[KYUUBI #903] Add event for statement
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_
- Through eventLog to write statementInfo into file

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #903 from zhang1002/branch-1.2_statementEvent.

Closes #903

27cc774 [张宇翔] 1. Add some introduce 2. change som UT
7cf73bd [张宇翔] Merge branch 'master' into branch-1.2_statementEvent
b662a98 [张宇翔] Merge remote-tracking branch 'upstream/master'
fc86860 [张宇翔] resolve conflicts
5dfcf86 [张宇翔] resolve conflicts
b04adee [张宇翔] remove some unused code
4c8f3b8 [张宇翔] Merge remote-tracking branch 'upstream/master'
2a4317a [张宇翔] remove some ut
0c474cc [张宇翔] Add statement event
4e05a39 [张宇翔] Add statement event
5f73e24 [张宇翔] Add statement event
8b68676 [张宇翔] Merge remote-tracking branch 'upstream/master'
cf99e30 [张宇翔] Merge remote-tracking branch 'upstream/master'
0afaa57 [张宇翔] Merge remote-tracking branch 'upstream/master'
b24fea0 [张宇翔] Merge remote-tracking branch 'upstream/master'
e517cfc [张宇翔] Merge remote-tracking branch 'upstream/master'
18aebe7 [张宇翔] Merge remote-tracking branch 'upstream/master'
f248bef [张宇翔] Merge remote-tracking branch 'upstream/master'
5ffb54f [张宇翔] Add kyuubi-spark-monitor module for nightly.yml

Authored-by: 张宇翔 <zhang1002@126.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit d0c9c8a)
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
zhang1002 authored and yaooqinn committed Aug 18, 2021
1 parent bb4d1e8 commit e4c35d5
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.events

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType

/**
*
* @param statementId: the identifier of operationHandler
* @param statement: the sql that you execute
* @param appId: application id a.k.a, the unique id for engine
* @param sessionId: the identifier of a session
* @param state: store each state that the sql has
* @param stateTime: the time that the sql's state change
* @param queryExecution: contains logicPlan and physicalPlan
* @param exeception: caught exeception if have
*/
case class StatementEvent(
statementId: String,
statement: String,
appId: String,
sessionId: String,
var state: String,
var stateTime: Long,
var queryExecution: String = "",
var exeception: String = "") extends KyuubiEvent {

override def schema: StructType = Encoders.product[StatementEvent].schema
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}

import scala.collection.mutable.Map

import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, StatementEvent}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
Expand Down Expand Up @@ -62,10 +59,10 @@ class ExecuteStatement(

private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)

private val kyuubiStatementInfo = KyuubiStatementInfo(
val statementEvent: StatementEvent = StatementEvent(
statementId, statement, spark.sparkContext.applicationId,
session.handle.identifier, Map(state -> lastAccessTime))
KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo)
session.handle.identifier.toString, state.toString, lastAccessTime)
EventLoggingService.onEvent(statementEvent)

override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
Expand Down Expand Up @@ -95,7 +92,8 @@ class ExecuteStatement(
result = spark.sql(statement)
// TODO( #921): COMPILED need consider eagerly executed commands
setState(OperationState.COMPILED)
kyuubiStatementInfo.queryExecution = result.queryExecution
statementEvent.queryExecution = result.queryExecution.toString()
EventLoggingService.onEvent(statementEvent)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
Expand Down Expand Up @@ -171,11 +169,14 @@ class ExecuteStatement(

override def setState(newState: OperationState): Unit = {
super.setState(newState)
kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime)
statementEvent.state = newState.toString
statementEvent.stateTime = lastAccessTime
EventLoggingService.onEvent(statementEvent)
}

override def setOperationException(opEx: KyuubiSQLException): Unit = {
super.setOperationException(opEx)
kyuubiStatementInfo.exception = opEx
statementEvent.exeception = opEx.toString
EventLoggingService.onEvent(statementEvent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.engine.spark

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.rpc.thrift._
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
Expand All @@ -27,7 +27,7 @@ import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.{KyuubiJobInfo, KyuubiStatementInfo}
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
import org.apache.kyuubi.operation.{HiveJDBCTests, OperationHandle}

class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
Expand All @@ -36,32 +36,6 @@ class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = Map.empty

test("add kyuubiStatementInfo into queue") {
var baseSql = "select timestamp'2021-06-0"
val total: Int = 7
// Clear kyuubiStatementQueue first
val getQueue = PrivateMethod[
ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))()
val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue)
kyuubiStatementQueue.clear()
withSessionHandle { (client, handle) =>

for ( a <- 1 to total ) {
val sql = baseSql + a + "'"
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val operationHandle = tExecuteStatementResp.getOperationHandle

val kyuubiStatementInfo = kyuubiStatementQueue.poll()
assert(
kyuubiStatementInfo.statementId === OperationHandle(operationHandle).identifier.toString)
assert(sql === kyuubiStatementInfo.statement)
}
}
}

test("add kyuubiJobInfo into queue and remove them when threshold reached") {
val sql = "select timestamp'2021-06-01'"
val getJobMap = PrivateMethod[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.kyuubi.engine.spark.events
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import org.apache.hive.service.rpc.thrift.TExecuteStatementReq
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, WithSparkSQLEngine}
import org.apache.kyuubi.operation.JDBCTestUtils
import org.apache.kyuubi.operation.{JDBCTestUtils, OperationHandle}

class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
import EventLoggerType._
Expand Down Expand Up @@ -83,4 +86,28 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine with JDBCTestUtils {
assert(rs.getInt("totalOperations") === 3)
}
}

test("statementEvent: generate, dump and query") {
val statementEventPath = Paths.get(logRoot.toString, "statement", engine.engineId + ".json")
val sql = "select timestamp'2021-06-01'"
withSessionHandle { (client, handle) =>

val table = statementEventPath.getParent
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val statementId = OperationHandle(opHandle).identifier.toString

eventually(timeout(60.seconds), interval(5.seconds)) {
val result = spark.sql(s"select * from `json`.`${table}`")
.where(s"statementId = '${statementId}'")

assert(result.select("statementId").first().get(0) === statementId)
assert(result.count() >= 1)
assert(result.select("statement").first().get(0) === sql)
}
}
}
}

0 comments on commit e4c35d5

Please sign in to comment.