Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
shahidki31 committed Nov 27, 2019
1 parent af65eed commit db4269c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) {
}

def onSessionClosed(sessionId: String): Unit = {
postLiveListenerBus(SparkListenerSessionClosed(sessionId, System.currentTimeMillis()))
postLiveListenerBus(SparkListenerThriftServerSessionClosed(sessionId,
System.currentTimeMillis()))
}

def onStatementStart(
Expand Down Expand Up @@ -77,7 +78,7 @@ private[thriftserver] case class SparkListenerThriftServerSessionCreated(
userName: String,
startTime: Long) extends SparkListenerEvent

private[thriftserver] case class SparkListenerSessionClosed(
private[thriftserver] case class SparkListenerThriftServerSessionClosed(
sessionId: String, finishTime: Long) extends SparkListenerEvent

private[thriftserver] case class SparkListenerThriftServerOperationStart(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[thriftserver] class HiveThriftServer2Listener(

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
// operations that we can live without when rapidly processing incoming task events.
// operations that we can live without when rapidly processing incoming events.
private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

// Returns true if this listener has no live data. Exposed for tests only.
Expand Down Expand Up @@ -88,11 +88,6 @@ private[thriftserver] class HiveThriftServer2Listener(
}
}

/**
* This method is to handle out of order events. ie. if Job event come after execution end event.
* @param jobId
* @param groupId
*/
private def updateJobDetails(jobId: String, groupId: String): Unit = {
val execList = executionList.values().asScala.filter(_.groupId == groupId).toSeq
if (execList.nonEmpty) {
Expand All @@ -101,7 +96,7 @@ private[thriftserver] class HiveThriftServer2Listener(
updateLiveStore(exec)
}
} else {
// It may possible that event reordering happens such a way that JobStart event come after
// It may possible that event reordering happens, such a way that JobStart event come after
// Execution end event (Refer SPARK-27019). To handle that situation, if occurs in
// Thriftserver, following code will take care. Here will come only if JobStart event comes
// after Execution End event.
Expand All @@ -119,7 +114,7 @@ private[thriftserver] class HiveThriftServer2Listener(
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SparkListenerThriftServerSessionCreated => onSessionCreated(e)
case e: SparkListenerSessionClosed => onSessionClosed(e)
case e: SparkListenerThriftServerSessionClosed => onSessionClosed(e)
case e: SparkListenerThriftServerOperationStart => onOperationStart(e)
case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e)
case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e)
Expand All @@ -136,7 +131,7 @@ private[thriftserver] class HiveThriftServer2Listener(
updateLiveStore(session)
}

private def onSessionClosed(e: SparkListenerSessionClosed): Unit = {
private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
val session = sessionList.get(e.sessionId)
session.finishTimestamp = e.finishTime
updateStoreWithTriggerEnabled(session)
Expand Down Expand Up @@ -299,7 +294,6 @@ private[thriftserver] class LiveExecutionData(
}
}


private[thriftserver] class LiveSessionData(
val sessionId: String,
val startTimeStamp: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(statusStore.getOnlineSessionNum === 1)
}

listener.onOtherEvent(SparkListenerSessionClosed("sessionId", System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId",
System.currentTimeMillis()))

if (!live) {
// To update history store
Expand Down Expand Up @@ -96,15 +97,15 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
"user", time))

time += 1
listener.onOtherEvent(SparkListenerSessionClosed("sessionId1", time))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId1", time))

time += 1
listener.onOtherEvent(SparkListenerSessionClosed("sessionId2", time))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId2", time))

listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId3",
"user", time))
time += 1
listener.onOtherEvent(SparkListenerSessionClosed("sessionId3", 4))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId3", 4))

if (!live) {
kvstore.close(false)
Expand Down Expand Up @@ -134,7 +135,8 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
System.currentTimeMillis(),
Nil,
createProperties))
listener.onOtherEvent(SparkListenerSessionClosed("sessionId", System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId",
System.currentTimeMillis()))
val exec = statusStore.getExecution("id")
assert(exec.isDefined)
assert(exec.get.jobId === Seq("0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ThriftServerPageSuite extends SparkFunSuite with BeforeAndAfter {
System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerThriftServerOperationClosed("id",
System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerSessionClosed("sessionid", System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionid", System.currentTimeMillis()))

statusStore
}
Expand Down

0 comments on commit db4269c

Please sign in to comment.