Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Tracking: For statement #767

Merged
merged 21 commits into from
Jul 14, 2021
Merged

Event Tracking: For statement #767

merged 21 commits into from
Jul 14, 2021

Conversation

zhang1002
Copy link
Contributor

@zhang1002 zhang1002 commented Jul 8, 2021

Generate KyuubiStatementInfo. This object includes the following elements:

  1. statement
  2. statementId
  3. appId
  4. sessionId
  5. exception: this statement's exception
  6. physicalPlan
  7. stateTime: contains each state and the time occurrence
  8. sparkUser

Those data was packaged in KyuubiStatementInfo and we store those object in mem.
You can get some summary data from this object.

get each sql's physicalPlan and the starting time
1. Store the relationship between executionId and operationId
2. Store the relationship between operationId and statement
3. Store the relationship between executionId and physicalPlan
4. Store each state and its happen time for this statement: initialized, running, finished
Generate KStatement info. This object includes the following elements:
1. statement
2. statementId
3. appId
4. sessionId
5. executionId
6. physicalPlan
7. stateTime: contains each state and the time occurrence

Those data was packaged in KStatement and we store those object in mem.
You can get some summary data from this object.
spark.sparkContext.applicationId,
session.getTypeInfo.identifier.toString,
OperationState.INITIALIZED.toString,
new Date().getTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.currentTimeMillis?

2. Get each state by the function: setState
3. Get this statement's physicalPlan in ExecuteStatement
4. Add sparkUser item
5. Remove java code
Add newline in KyuubiStatementInfo
@codecov-commenter
Copy link

codecov-commenter commented Jul 9, 2021

Codecov Report

Merging #767 (3c8d9af) into master (82fd58f) will decrease coverage by 0.14%.
The diff coverage is 43.47%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #767      +/-   ##
============================================
- Coverage     79.27%   79.12%   -0.15%     
  Complexity       10       10              
============================================
  Files           130      132       +2     
  Lines          5090     5117      +27     
  Branches        657      658       +1     
============================================
+ Hits           4035     4049      +14     
- Misses          694      707      +13     
  Partials        361      361              
Impacted Files Coverage Δ
.../engine/spark/monitor/KyuubiStatementMonitor.scala 0.00% <0.00%> (ø)
...ine/spark/monitor/entity/KyuubiStatementInfo.scala 0.00% <0.00%> (ø)
...uubi/engine/spark/operation/ExecuteStatement.scala 86.41% <100.00%> (+1.91%) ⬆️
...g/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala 92.30% <0.00%> (+1.39%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 82fd58f...3c8d9af. Read the comment docs.

var physicalPlan: String,
sparkUser: String,
var exception: String,
var stateTimeMap: Map[String, Long])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this field can be modified, we can add into class.

case class {
  var xxx: Option[String];
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same with stateTimeMap, I'd like move it into class

@@ -58,6 +61,12 @@ class ExecuteStatement(

private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)

var kyuubiStatementInfo = KyuubiStatementInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var -> private val


override def setOperationException(opEx: KyuubiSQLException): Unit = {
KyuubiStatementMonitor.addOperationExceptionByOperationId(statementId, opEx.toString)
super.setOperationException(opEx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep the same order.

super.setOperationException(opEx)
KyuubiStatementMonitor.addOperationExceptionByOperationId(statementId, opEx.toString)

var kyuubiStatementInfo = KyuubiStatementInfo(
statementId, statement, spark.sparkContext.applicationId,
session.getTypeInfo.identifier.toString, null,
spark.sparkContext.sparkUser, null, Map(state.toString->lastAccessTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map(state.toString->lastAccessTime) -> Map(state.toString -> lastAccessTime)

@@ -19,7 +19,10 @@ package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}

import org.apache.spark.kyuubi.SQLOperationListener
import scala.collection.mutable.Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need mutable Map here

// endStateList.add(OperationState.CANCELED.toString)
// endStateList.add(OperationState.CLOSED.toString)
// endStateList.add(OperationState.ERROR.toString)
// endStateList.add(OperationState.TIMEOUT.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unused code


def addEachStateTimeForOperationid(operationId: String, state: String, time: Long): Unit = {
operationStatementMap.get(operationId).stateTimeMap.put(state, time)
print(1231)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unused code


import org.apache.spark.kyuubi.entity.KyuubiStatementInfo

object KyuubiStatementMonitor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to add some comment at class description.

2. Add some comment
@zhang1002 zhang1002 closed this Jul 9, 2021
@zhang1002 zhang1002 reopened this Jul 9, 2021
statement: String,
appId: String,
sessionId: String,
var physicalPlan: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can track queryExecution directly in memory


import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.kyuubi.entity.KyuubiStatementInfo
import org.apache.kyuubi.entity.KyuubiStatementInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.kyuubi.engine.spark.monitor.KyuubiStatementInfo

2. code optimizing
@yaooqinn
Copy link
Member

yaooqinn commented Jul 13, 2021

How about adding some unit tests?

statement: String,
appId: String,
sessionId: String,
var physicalPlan: QueryExecution,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var queryExecution: QueryExecutio

appId: String,
sessionId: String,
var physicalPlan: QueryExecution,
sparkUser: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this, we can manage this in session info

var physicalPlan: QueryExecution,
sparkUser: String,
var exception: KyuubiSQLException,
stateTimeMap: Map[OperationState, Long])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateToTime


override def setState(newState: OperationState): Unit = {
super.setState(newState)
kyuubiStatementInfo.stateTimeMap.put(newState, lastAccessTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

statementId, statement, spark.sparkContext.applicationId,
session.getTypeInfo.identifier.toString, null,
spark.sparkContext.sparkUser, null, Map(state -> lastAccessTime))
KyuubiStatementMonitor.addStatementDetailForOperationId(statementId, kyuubiStatementInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this lead to memory leak?

statementId: String,
statement: String,
appId: String,
sessionId: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use session handle?

2. Add some comment
3. Code optimization
4. Use queue to store kyuubiStatementInfo
2. Add PrivateMethodTester to get private method for unit test
3. Optimize some code
yaooqinn
yaooqinn previously approved these changes Jul 14, 2021
@zhang1002 zhang1002 closed this Jul 14, 2021
@zhang1002 zhang1002 reopened this Jul 14, 2021
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
</allocations>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this file ?

import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo

// TODO: Thread Safe need to consider
object KyuubiStatementMonitor extends Logging{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Logging{ -> Logging {

*/
// TODO: Lack size type threshold and time type threshold
def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = {
if (kyuubiStatementQueue.size() >= 7) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to define a constant:

val STATEMEMT_CAPACITY = 10
val FORCE_DUMP_STATEMENT_THREDHOLD = 7
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just for test. We will make those configurable soon.

* and dumpping them to a file by threshold.
*/
// TODO: Need ensure those items have finished. If not, we should put them into this queue again.
def removeAndDumpStatementInfoFromQueue(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[kyuubi]

*/
case class KyuubiStatementInfo(
statementId: String,
statement: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's say: query: String

appId: String,
sessionId: HandleIdentifier,
var queryExecution: QueryExecution,
var exception: KyuubiSQLException,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move these var inside class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

var iterator = kyuubiStatementQueue.iterator()
while (iterator.hasNext) {
val kyuubiStatementInfo = iterator.next()
assert(kyuubiStatementInfo.statement !== null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check the real value directly ? != null seems unreliable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will do it

val getQueue = PrivateMethod[
ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))()
val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue)
kyuubiStatementQueue.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to add a new method reset in KyuubiStatementMonitor ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is just for unit test.
In pro_env, we don't need this method.

@ulysses-you ulysses-you merged commit ff52b20 into apache:master Jul 14, 2021
@ulysses-you
Copy link
Contributor

thanks, merging to master for v1.3.0

@ulysses-you ulysses-you added the kind:feature Feature request label Jul 14, 2021
@ulysses-you ulysses-you added this to the v1.3.0 milestone Jul 14, 2021
@zhang1002 zhang1002 deleted the branch-1.2_spark-monitor branch September 24, 2021 16:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants