Skip to content

Commit

Permalink
[KYUUBI #1601] Align SparkStatementEvent to KyuubiOperationEvent
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Align SparkStatementEvent to KyuubiOperationEvent

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1601 from yaooqinn/soe.

Closes #1601

090dee8 [Kent Yao] Merge branch 'master' of github.com:apache/incubator-kyuubi into soe
a61ce3b [Kent Yao] Align SparkStatementEvent to KyuubiOperationEven
efbbdf7 [Kent Yao] Align SparkStatementEvent to KyuubiOperationEven

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
yaooqinn authored and ulysses-you committed Dec 23, 2021
1 parent 152e394 commit 1b48b18
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 150 deletions.
Expand Up @@ -570,31 +570,31 @@ object DataGenerator {
$"web_tax_percentage" .decimal(5, 2))
// format: on

Seq(
store_sales,
store_returns,
catalog_sales,
catalog_returns,
web_sales,
web_returns,
inventory,
catalog_page,
call_center,
customer,
customer_address,
customer_demographics,
household_demographics,
store,
warehouse,
item,
income_band,
web_site,
web_page,
date_dim,
time_dim,
ship_mode,
reason,
promotion)
Seq(
store_sales,
store_returns,
catalog_sales,
catalog_returns,
web_sales,
web_returns,
inventory,
catalog_page,
call_center,
customer,
customer_address,
customer_demographics,
household_demographics,
store,
warehouse,
item,
income_band,
web_site,
web_page,
date_dim,
time_dim,
ship_mode,
reason,
promotion)
}

def run(config: RunConfig): Unit = {
Expand Down
Expand Up @@ -136,10 +136,10 @@ abstract class FlinkOperation(
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}
Expand Down
Expand Up @@ -92,23 +92,23 @@ class EngineEventsStore(conf: KyuubiConf) {
/**
* store all statements events.
*/
val statements = new ConcurrentHashMap[String, SparkStatementEvent]
val statements = new ConcurrentHashMap[String, SparkOperationEvent]

/**
* get all statement events order by startTime
*/
def getStatementList: Seq[SparkStatementEvent] = {
def getStatementList: Seq[SparkOperationEvent] = {
statements.values().asScala.toSeq.sortBy(_.createTime)
}

def getStatement(statementId: String): Option[SparkStatementEvent] = {
def getStatement(statementId: String): Option[SparkOperationEvent] = {
Option(statements.get(statementId))
}

/**
* save statement events and check the capacity threshold
*/
def saveStatement(statementEvent: SparkStatementEvent): Unit = {
def saveStatement(statementEvent: SparkOperationEvent): Unit = {
statements.put(statementEvent.statementId, statementEvent)
checkStatementCapacity()
}
Expand Down
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.events

import org.apache.spark.sql.{DataFrame, Encoders}
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.operation.SparkOperation

/**
* A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at Spark SQL Engine side.
* <ul>
* <li>Operation Basis</li>
* <li>Operation Live Status</li>
* <li>Parent Session Id</li>
* </ul>
*
* @param statementId the unique identifier of a single operation
* @param statement the sql that you execute
* @param shouldRunAsync the flag indicating whether the query runs synchronously or not
* @param state the current operation state
* @param eventTime the time when the event created & logged
* @param createTime the time for changing to the current operation state
* @param startTime the time the query start to time of this operation
* @param completeTime time time the query ends
* @param exception: caught exception if have
* @param sessionId the identifier of the parent session
* @param sessionUser the authenticated client user
* @param queryExecution the query execution of this operation
*/
case class SparkOperationEvent(
statementId: String,
statement: String,
shouldRunAsync: Boolean,
state: String,
eventTime: Long,
createTime: Long,
startTime: Long,
completeTime: Long,
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
queryExecution: String) extends KyuubiSparkEvent {

override def schema: StructType = Encoders.product[SparkOperationEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil

def duration: Long = {
if (completeTime == -1L) {
System.currentTimeMillis - createTime
} else {
completeTime - createTime
}
}
}

object SparkOperationEvent {
def apply(operation: SparkOperation, result: Option[DataFrame] = None): SparkOperationEvent = {
val session = operation.getSession
val status = operation.getStatus
new SparkOperationEvent(
operation.statementId,
operation.statement,
operation.shouldRunAsync,
status.state.name(),
status.lastModified,
status.create,
status.start,
status.completed,
status.exception,
session.handle.identifier.toString,
session.user,
result.map(_.queryExecution.toString).orNull)
}
}

This file was deleted.

Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkStatementEvent}
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SparkOperationEvent}
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -49,16 +49,7 @@ class ExecuteStatement(

private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)

val statementEvent: SparkStatementEvent = SparkStatementEvent(
session.user,
statementId,
statement,
spark.sparkContext.applicationId,
session.handle.identifier.toString,
lastAccessTime,
state.toString,
lastAccessTime)
EventLoggingService.onEvent(statementEvent)
EventLoggingService.onEvent(SparkOperationEvent(this))

override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
Expand Down Expand Up @@ -87,7 +78,6 @@ class ExecuteStatement(
spark.sparkContext.addSparkListener(operationListener)
result = spark.sql(statement)
// TODO #921: COMPILED need consider eagerly executed commands
statementEvent.queryExecution = result.queryExecution.toString()
setState(OperationState.COMPILED)
debug(result.queryExecution)
iter =
Expand Down Expand Up @@ -156,17 +146,6 @@ class ExecuteStatement(

override def setState(newState: OperationState): Unit = {
super.setState(newState)
statementEvent.state = newState.toString
statementEvent.eventTime = lastAccessTime
if (newState == OperationState.ERROR || newState == OperationState.FINISHED) {
statementEvent.completeTime = System.currentTimeMillis()
}
EventLoggingService.onEvent(statementEvent)
}

override def setOperationException(opEx: KyuubiSQLException): Unit = {
super.setOperationException(opEx)
statementEvent.exception = opEx.toString
EventLoggingService.onEvent(statementEvent)
EventLoggingService.onEvent(SparkOperationEvent(this, Option(result)))
}
}
Expand Up @@ -130,10 +130,10 @@ abstract class SparkOperation(opType: OperationType, session: Session)
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}
Expand Down
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkStatementEvent}
import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, SparkOperationEvent}
import org.apache.kyuubi.service.{Serverable, ServiceState}

/**
Expand Down Expand Up @@ -119,7 +119,7 @@ class SparkSQLEngineListener(
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SessionEvent => updateSessionStore(e)
case e: SparkStatementEvent => updateStatementStore(e)
case e: SparkOperationEvent => updateStatementStore(e)
case _ => // Ignore
}
}
Expand All @@ -128,7 +128,7 @@ class SparkSQLEngineListener(
store.saveSession(event)
}

private def updateStatementStore(event: SparkStatementEvent): Unit = {
private def updateStatementStore(event: SparkOperationEvent): Unit = {
store.saveStatement(event)
}
}

0 comments on commit 1b48b18

Please sign in to comment.