Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
ajithme committed Jan 10, 2020
1 parent 7d0dd19 commit a8e2525
Showing 1 changed file with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore}
* no state kept in this class, so it's ok to have multiple instances of it in an application.
*/
class SQLAppStatusStore(
store: KVStore,
val listener: Option[SQLAppStatusListener] = None) {
store: KVStore,
val listener: Option[SQLAppStatusListener] = None) {

def executionsList(): Seq[SQLExecutionUIData] = {
store.view(classOf[SQLExecutionUIData]).asScala.toSeq
Expand Down Expand Up @@ -82,33 +82,33 @@ class SQLAppStatusStore(
}

class SQLExecutionUIData(
@KVIndexParam val executionId: Long,
val description: String,
val details: String,
val physicalPlanDescription: String,
val metrics: Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
@JsonDeserialize(keyAs = classOf[Integer])
val jobs: Map[Int, JobExecutionStatus],
@JsonDeserialize(contentAs = classOf[Integer])
val stages: Set[Int],
/**
* This field is only populated after the execution is finished; it will be null while the
* execution is still running. During execution, aggregate metrics need to be retrieved
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {
@KVIndexParam val executionId: Long,
val description: String,
val details: String,
val physicalPlanDescription: String,
val metrics: Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
@JsonDeserialize(keyAs = classOf[Integer])
val jobs: Map[Int, JobExecutionStatus],
@JsonDeserialize(contentAs = classOf[Integer])
val stages: Set[Int],
/**
* This field is only populated after the execution is finished; it will be null while the
* execution is still running. During execution, aggregate metrics need to be retrieved
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
val metricValues: Map[Long, String]) {

@JsonIgnore @KVIndex("completionTime")
private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
}

class SparkPlanGraphWrapper(
@KVIndexParam val executionId: Long,
val nodes: Seq[SparkPlanGraphNodeWrapper],
val edges: Seq[SparkPlanGraphEdge]) {
@KVIndexParam val executionId: Long,
val nodes: Seq[SparkPlanGraphNodeWrapper],
val edges: Seq[SparkPlanGraphEdge]) {

def toSparkPlanGraph(): SparkPlanGraph = {
SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges)
Expand All @@ -117,11 +117,11 @@ class SparkPlanGraphWrapper(
}

class SparkPlanGraphClusterWrapper(
val id: Long,
val name: String,
val desc: String,
val nodes: Seq[SparkPlanGraphNodeWrapper],
val metrics: Seq[SQLPlanMetric]) {
val id: Long,
val name: String,
val desc: String,
val nodes: Seq[SparkPlanGraphNodeWrapper],
val metrics: Seq[SQLPlanMetric]) {

def toSparkPlanGraphCluster(): SparkPlanGraphCluster = {
new SparkPlanGraphCluster(id, name, desc,
Expand All @@ -133,8 +133,8 @@ class SparkPlanGraphClusterWrapper(

/** Only one of the values should be set. */
class SparkPlanGraphNodeWrapper(
val node: SparkPlanGraphNode,
val cluster: SparkPlanGraphClusterWrapper) {
val node: SparkPlanGraphNode,
val cluster: SparkPlanGraphClusterWrapper) {

def toSparkPlanGraphNode(): SparkPlanGraphNode = {
assert(node == null ^ cluster == null, "Exactly one of node, cluster values to be set.")
Expand All @@ -144,6 +144,6 @@ class SparkPlanGraphNodeWrapper(
}

case class SQLPlanMetric(
name: String,
accumulatorId: Long,
metricType: String)
name: String,
accumulatorId: Long,
metricType: String)

0 comments on commit a8e2525

Please sign in to comment.