From fbaf9d9c1a0efa7d191cadab46fdced5134042eb Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 12 Mar 2019 20:12:19 +0530 Subject: [PATCH 01/15] expose sql tab via rest interface for monitoring --- .../status/api/v1/ApiSqlRootResource.scala | 27 +++++++ .../spark/status/api/v1/SqlListResource.scala | 77 +++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala new file mode 100644 index 0000000000000..7bc92406822df --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{Path, PathParam} + +@Path("/v1") +private[v1] class ApiSqlRootResource extends ApiRequestContext { + + @Path("applications/{appId}/SQL") + def sqlList(): Class[SqlListResource] = classOf[SqlListResource] +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala new file mode 100644 index 0000000000000..0f6ec89b06fe9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1 + +import java.util.Date +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.sql.execution.ui.SQLAppStatusStore +import org.apache.spark.ui.UIUtils + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class SqlListResource extends BaseAppResource { + + @GET + def sqlList(): ExecutionSummary = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + var executions = List[ExecutionData]() + + sqlStore.executionsList().foreach { exec => + val running = exec.jobs + .filter {case(_, status) => status == JobExecutionStatus.RUNNING } + .keys.toSeq + val completed = exec.jobs + .filter {case(_, status) => status == JobExecutionStatus.SUCCEEDED } + .keys.toSeq + val failed = exec.jobs + .filter {case(_, status) => status == JobExecutionStatus.FAILED } + .keys.toSeq + val status = if (exec.jobs.size == completed.size) { + "COMPLETED" + } else if (failed.length > 0) { + "FAILED" + } else { + "RUNNING" + } + val duration = UIUtils.formatDuration( + exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) + executions = executions.+:(new ExecutionData(exec.executionId, + status, exec.description, UIUtils.formatDate(exec.submissionTime), + duration, running, completed, failed)) + } + if (executions.size > 0) { + executions = executions.sortBy(x => x.id) + } + new ExecutionSummary(executions) + } + } +} + +class ExecutionData (val id : Long, + val status: String, + val description: String, + val submissionTime: String, + val duration: String, + val runningJobs: Seq[Int], + val successJobs: Seq[Int], + val failedJobs: Seq[Int]) + +class ExecutionSummary (val sql: Seq[ExecutionData]) From 89905fcea5fb3ca9cf19fbac8ca09f48b851e639 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 12 Mar 2019 20:16:03 +0530 Subject: [PATCH 02/15] update --- .../org/apache/spark/status/api/v1/ApiSqlRootResource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala index 7bc92406822df..86d02d193659d 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala @@ -17,7 +17,7 @@ package org.apache.spark.status.api.v1 -import javax.ws.rs.{Path, PathParam} +import javax.ws.rs.Path @Path("/v1") private[v1] class ApiSqlRootResource extends ApiRequestContext { From 9f085c382cd6ad89355cc21ef85447e1ff9b4c02 Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 13 Mar 2019 09:01:34 +0530 Subject: [PATCH 03/15] refactor --- .../spark/status/api/v1/SqlListResource.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala index 0f6ec89b06fe9..54c2d0c8cf40b 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala @@ -35,15 +35,20 @@ private[v1] class SqlListResource extends BaseAppResource { var executions = List[ExecutionData]() sqlStore.executionsList().foreach { exec => - val running = exec.jobs - .filter {case(_, status) => status == JobExecutionStatus.RUNNING } - .keys.toSeq - val completed = exec.jobs - .filter {case(_, status) => status == JobExecutionStatus.SUCCEEDED } - .keys.toSeq - val failed = exec.jobs - .filter {case(_, status) => status == JobExecutionStatus.FAILED } - .keys.toSeq + var running = Seq[Int]() + var completed = Seq[Int]() + var failed = Seq[Int]() + + exec.jobs.map { job => job match { + case (id, status) if status == JobExecutionStatus.RUNNING => + running = running :+ id + case (id, status) if status == JobExecutionStatus.SUCCEEDED => + completed = completed :+ id + case (id, status) if status == JobExecutionStatus.FAILED => + failed = failed :+ id + } + } + val status = if (exec.jobs.size == completed.size) { "COMPLETED" } else if (failed.length > 0) { @@ -51,6 +56,7 @@ private[v1] class SqlListResource extends BaseAppResource { } else { "RUNNING" } + val duration = UIUtils.formatDuration( exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) executions = executions.+:(new ExecutionData(exec.executionId, From 566e55130edd32f3eb6ac86140ee21d655a54566 Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 13 Mar 2019 09:18:14 +0530 Subject: [PATCH 04/15] refactored --- .../org/apache/spark/status/api/v1/SqlListResource.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala index 54c2d0c8cf40b..d5e60139d84f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.UIUtils private[v1] class SqlListResource extends BaseAppResource { @GET - def sqlList(): ExecutionSummary = { + def sqlList(): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) var executions = List[ExecutionData]() @@ -66,7 +66,7 @@ private[v1] class SqlListResource extends BaseAppResource { if (executions.size > 0) { executions = executions.sortBy(x => x.id) } - new ExecutionSummary(executions) + executions } } } @@ -79,5 +79,3 @@ class ExecutionData (val id : Long, val runningJobs: Seq[Int], val successJobs: Seq[Int], val failedJobs: Seq[Int]) - -class ExecutionSummary (val sql: Seq[ExecutionData]) From 8cff44f19afb0d0c07553253d57340d672fc519c Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 13 Mar 2019 18:03:44 +0530 Subject: [PATCH 05/15] update to introduce induvidual api for sql --- .../status/api/v1/ApiSqlRootResource.scala | 2 +- .../spark/status/api/v1/SqlListResource.scala | 93 +++++++++++-------- 2 files changed, 55 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala index 86d02d193659d..a184d918c8ec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala @@ -22,6 +22,6 @@ import javax.ws.rs.Path @Path("/v1") private[v1] class ApiSqlRootResource extends ApiRequestContext { - @Path("applications/{appId}/SQL") + @Path("applications/{appId}/sql") def sqlList(): Class[SqlListResource] = classOf[SqlListResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala index d5e60139d84f3..858569187153a 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala @@ -18,11 +18,11 @@ package org.apache.spark.status.api.v1 import java.util.Date -import javax.ws.rs.{GET, Produces} +import javax.ws.rs.{GET, Path, PathParam, Produces} import javax.ws.rs.core.MediaType import org.apache.spark.JobExecutionStatus -import org.apache.spark.sql.execution.ui.SQLAppStatusStore +import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData} import org.apache.spark.ui.UIUtils @Produces(Array(MediaType.APPLICATION_JSON)) @@ -32,50 +32,65 @@ private[v1] class SqlListResource extends BaseAppResource { def sqlList(): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - var executions = List[ExecutionData]() - sqlStore.executionsList().foreach { exec => - var running = Seq[Int]() - var completed = Seq[Int]() - var failed = Seq[Int]() - - exec.jobs.map { job => job match { - case (id, status) if status == JobExecutionStatus.RUNNING => - running = running :+ id - case (id, status) if status == JobExecutionStatus.SUCCEEDED => - completed = completed :+ id - case (id, status) if status == JobExecutionStatus.FAILED => - failed = failed :+ id - } - } - - val status = if (exec.jobs.size == completed.size) { - "COMPLETED" - } else if (failed.length > 0) { - "FAILED" - } else { - "RUNNING" - } - - val duration = UIUtils.formatDuration( - exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) - executions = executions.+:(new ExecutionData(exec.executionId, - status, exec.description, UIUtils.formatDate(exec.submissionTime), - duration, running, completed, failed)) - } + var executions = sqlStore.executionsList() + .map(exec => prepareExecutionData(exec)) if (executions.size > 0) { executions = executions.sortBy(x => x.id) } executions } } + + @GET + @Path("{executionId:\\d+}") + def sql(@PathParam("executionId") execId: Long): Seq[ExecutionData] = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + + sqlStore.executionsList() + .filter(execution => execution.executionId == execId) + .map(exec => prepareExecutionData(exec)) + } + } + + def prepareExecutionData(exec: SQLExecutionUIData): ExecutionData = { + var running = Seq[Int]() + var completed = Seq[Int]() + var failed = Seq[Int]() + + exec.jobs.map { job => + job match { + case (id, status) if status == JobExecutionStatus.RUNNING => + running = running :+ id + case (id, status) if status == JobExecutionStatus.SUCCEEDED => + completed = completed :+ id + case (id, status) if status == JobExecutionStatus.FAILED => + failed = failed :+ id + } + } + + val status = if (exec.jobs.size == completed.size) { + "COMPLETED" + } else if (failed.length > 0) { + "FAILED" + } else { + "RUNNING" + } + + val duration = UIUtils.formatDuration( + exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) + new ExecutionData(exec.executionId, + status, exec.description, UIUtils.formatDate(exec.submissionTime), + duration, running, completed, failed) + } } class ExecutionData (val id : Long, - val status: String, - val description: String, - val submissionTime: String, - val duration: String, - val runningJobs: Seq[Int], - val successJobs: Seq[Int], - val failedJobs: Seq[Int]) + val status: String, + val description: String, + val submissionTime: String, + val duration: String, + val runningJobs: Seq[Int], + val successJobs: Seq[Int], + val failedJobs: Seq[Int]) From e1893ad22e3b5960318da425cdabcc9d925701bd Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 13 Mar 2019 22:05:53 +0530 Subject: [PATCH 06/15] update as per review comments --- .../org/apache/spark/status/api/v1/api.scala | 10 ++++++++++ .../spark/status/api/v1/ApiSqlRootResource.scala | 2 +- .../{SqlListResource.scala => SqlResource.scala} | 15 +++------------ 3 files changed, 14 insertions(+), 13 deletions(-) rename sql/core/src/main/scala/org/apache/spark/status/api/v1/{SqlListResource.scala => SqlResource.scala} (88%) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b36393764..da69df971daf9 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -405,3 +405,13 @@ case class ThreadStackTrace( val blockedByThreadId: Option[Long], val blockedByLock: String, val holdingLocks: Seq[String]) + + +class ExecutionData (val id : Long, + val status: String, + val description: String, + val submissionTime: String, + val duration: String, + val runningJobs: Seq[Int], + val successJobs: Seq[Int], + val failedJobs: Seq[Int]) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala index a184d918c8ec9..def4f03d9f3e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala @@ -23,5 +23,5 @@ import javax.ws.rs.Path private[v1] class ApiSqlRootResource extends ApiRequestContext { @Path("applications/{appId}/sql") - def sqlList(): Class[SqlListResource] = classOf[SqlListResource] + def sqlList(): Class[SqlResource] = classOf[SqlResource] } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala similarity index 88% rename from sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala rename to sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala index 858569187153a..be8c7f47231f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlListResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData} import org.apache.spark.ui.UIUtils @Produces(Array(MediaType.APPLICATION_JSON)) -private[v1] class SqlListResource extends BaseAppResource { +private[v1] class SqlResource extends BaseAppResource { @GET def sqlList(): Seq[ExecutionData] = { @@ -35,7 +35,7 @@ private[v1] class SqlListResource extends BaseAppResource { var executions = sqlStore.executionsList() .map(exec => prepareExecutionData(exec)) - if (executions.size > 0) { + if (executions.nonEmpty) { executions = executions.sortBy(x => x.id) } executions @@ -72,7 +72,7 @@ private[v1] class SqlListResource extends BaseAppResource { val status = if (exec.jobs.size == completed.size) { "COMPLETED" - } else if (failed.length > 0) { + } else if (failed.nonEmpty) { "FAILED" } else { "RUNNING" @@ -85,12 +85,3 @@ private[v1] class SqlListResource extends BaseAppResource { duration, running, completed, failed) } } - -class ExecutionData (val id : Long, - val status: String, - val description: String, - val submissionTime: String, - val duration: String, - val runningJobs: Seq[Int], - val successJobs: Seq[Int], - val failedJobs: Seq[Int]) From 0cc87c0f9580ba10fb9552460a0bea7f11db2a92 Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 13 Mar 2019 22:07:52 +0530 Subject: [PATCH 07/15] update --- core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index da69df971daf9..ecfc8e14fd778 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -406,7 +406,6 @@ case class ThreadStackTrace( val blockedByLock: String, val holdingLocks: Seq[String]) - class ExecutionData (val id : Long, val status: String, val description: String, From f95b00b3a47b1de5c5438a73ba7626218a1bf716 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 21 Mar 2019 14:24:02 +0800 Subject: [PATCH 08/15] minor refactor --- .../org/apache/spark/status/api/v1/api.scala | 8 ------ .../spark/status/api/v1/SqlResource.scala | 22 ++++++++-------- .../org/apache/spark/status/api/v1/api.scala | 26 +++++++++++++++++++ 3 files changed, 37 insertions(+), 19 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index ecfc8e14fd778..7b989d787332f 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -406,11 +406,3 @@ case class ThreadStackTrace( val blockedByLock: String, val holdingLocks: Seq[String]) -class ExecutionData (val id : Long, - val status: String, - val description: String, - val submissionTime: String, - val duration: String, - val runningJobs: Seq[Int], - val successJobs: Seq[Int], - val failedJobs: Seq[Int]) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala index be8c7f47231f6..21e771c9671cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala @@ -48,9 +48,10 @@ private[v1] class SqlResource extends BaseAppResource { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore.executionsList() - .filter(execution => execution.executionId == execId) + sqlStore + .execution(execId) .map(exec => prepareExecutionData(exec)) + .toSeq } } @@ -59,15 +60,14 @@ private[v1] class SqlResource extends BaseAppResource { var completed = Seq[Int]() var failed = Seq[Int]() - exec.jobs.map { job => - job match { - case (id, status) if status == JobExecutionStatus.RUNNING => - running = running :+ id - case (id, status) if status == JobExecutionStatus.SUCCEEDED => - completed = completed :+ id - case (id, status) if status == JobExecutionStatus.FAILED => - failed = failed :+ id - } + exec.jobs.foreach { + case (id, JobExecutionStatus.RUNNING) => + running = running :+ id + case (id, JobExecutionStatus.SUCCEEDED ) => + completed = completed :+ id + case (id, JobExecutionStatus.FAILED) => + failed = failed :+ id + case _ => } val status = if (exec.jobs.size == completed.size) { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala new file mode 100644 index 0000000000000..8133166bdd255 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +class ExecutionData (val id : Long, + val status: String, + val description: String, + val submissionTime: String, + val duration: String, + val runningJobIds: Seq[Int], + val successJobIds: Seq[Int], + val failedJobIds: Seq[Int]) From ae5413c9abab8bb380a9eb4e5fb88487f83eca10 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 26 Mar 2019 11:18:16 +0530 Subject: [PATCH 09/15] update --- core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 7b989d787332f..5ec9b36393764 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -405,4 +405,3 @@ case class ThreadStackTrace( val blockedByThreadId: Option[Long], val blockedByLock: String, val holdingLocks: Seq[String]) - From 640e1a7864f1a0b4b2548565d0b1bc597b9589e6 Mon Sep 17 00:00:00 2001 From: Ajith Date: Fri, 9 Aug 2019 15:03:29 +0530 Subject: [PATCH 10/15] Updated as per review comments --- .../apache/spark/status/api/v1/SqlResource.scala | 15 ++++----------- .../org/apache/spark/status/api/v1/api.scala | 3 ++- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala index 21e771c9671cb..26129efd8861d 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala @@ -32,26 +32,19 @@ private[v1] class SqlResource extends BaseAppResource { def sqlList(): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - - var executions = sqlStore.executionsList() - .map(exec => prepareExecutionData(exec)) - if (executions.nonEmpty) { - executions = executions.sortBy(x => x.id) - } - executions + sqlStore.executionsList().map(prepareExecutionData) } } @GET @Path("{executionId:\\d+}") - def sql(@PathParam("executionId") execId: Long): Seq[ExecutionData] = { + def sql(@PathParam("executionId") execId: Long): ExecutionData = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore .execution(execId) - .map(exec => prepareExecutionData(exec)) - .toSeq + .map(prepareExecutionData) + .getOrElse(throw new NotFoundException("unknown id: " + execId)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 8133166bdd255..c6053f6d03e94 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -16,7 +16,8 @@ */ package org.apache.spark.status.api.v1 -class ExecutionData (val id : Long, +class ExecutionData ( + val id : Long, val status: String, val description: String, val submissionTime: String, From 31bba41d7b0814833b6c73e76dc8e6f97bf12102 Mon Sep 17 00:00:00 2001 From: Ajith Date: Thu, 26 Dec 2019 19:48:03 +0530 Subject: [PATCH 11/15] Add option to get the planDescription and Metrics details --- .../spark/status/api/v1/SqlResource.scala | 26 +++++++++++++------ .../org/apache/spark/status/api/v1/api.scala | 2 ++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala index 26129efd8861d..7dbda46ec6a6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala @@ -18,37 +18,44 @@ package org.apache.spark.status.api.v1 import java.util.Date -import javax.ws.rs.{GET, Path, PathParam, Produces} +import javax.ws.rs.{DefaultValue, GET, Path, PathParam, Produces, QueryParam} import javax.ws.rs.core.MediaType import org.apache.spark.JobExecutionStatus -import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData} +import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} import org.apache.spark.ui.UIUtils @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SqlResource extends BaseAppResource { @GET - def sqlList(): Seq[ExecutionData] = { + def sqlList(@QueryParam("details") @DefaultValue("false") details: Boolean): + Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore.executionsList().map(prepareExecutionData) + sqlStore.executionsList().map(prepareExecutionData(_, details)) } } @GET @Path("{executionId:\\d+}") - def sql(@PathParam("executionId") execId: Long): ExecutionData = { + def sql(@PathParam("executionId") execId: Long, + @QueryParam("details") @DefaultValue("false") details: Boolean): ExecutionData = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) sqlStore .execution(execId) - .map(prepareExecutionData) + .map(prepareExecutionData(_, details)) .getOrElse(throw new NotFoundException("unknown id: " + execId)) } } - def prepareExecutionData(exec: SQLExecutionUIData): ExecutionData = { + def printableMetrics(metrics: Seq[SQLPlanMetric], + metricValues: Map[Long, String]): Seq[(String, String)] = { + metrics.map(metric => (metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) + } + + def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { var running = Seq[Int]() var completed = Seq[Int]() var failed = Seq[Int]() @@ -73,8 +80,11 @@ private[v1] class SqlResource extends BaseAppResource { val duration = UIUtils.formatDuration( exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) + val planDetails = if (details) exec.physicalPlanDescription else "" + val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty new ExecutionData(exec.executionId, - status, exec.description, UIUtils.formatDate(exec.submissionTime), + status, exec.description, planDetails, metrics, + UIUtils.formatDate(exec.submissionTime), duration, running, completed, failed) } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index c6053f6d03e94..d6e6f421818a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -20,6 +20,8 @@ class ExecutionData ( val id : Long, val status: String, val description: String, + val planDescription: String, + val metrics: Seq[(String, String)], val submissionTime: String, val duration: String, val runningJobIds: Seq[Int], From 1c2419aba0f1debfef2df7a05f802cb06d5b2f6a Mon Sep 17 00:00:00 2001 From: Ajith Date: Wed, 8 Jan 2020 09:54:59 +0530 Subject: [PATCH 12/15] Updated as per review comments --- .../sql/execution/ui/SQLAppStatusStore.scala | 18 ++++---- .../spark/status/api/v1/SqlResource.scala | 43 ++++++++++++------- .../org/apache/spark/status/api/v1/api.scala | 14 +++--- 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index c6e7f3978469d..1b7f39ef5fa63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -34,14 +34,16 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore} * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's * no state kept in this class, so it's ok to have multiple instances of it in an application. */ -class SQLAppStatusStore( - store: KVStore, - val listener: Option[SQLAppStatusListener] = None) { +class SQLAppStatusStore(store: KVStore, val listener: Option[SQLAppStatusListener] = None) { def executionsList(): Seq[SQLExecutionUIData] = { store.view(classOf[SQLExecutionUIData]).asScala.toSeq } + def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = { + store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq + } + def execution(executionId: Long): Option[SQLExecutionUIData] = { try { Some(store.read(classOf[SQLExecutionUIData], executionId)) @@ -120,7 +122,10 @@ class SparkPlanGraphClusterWrapper( val metrics: Seq[SQLPlanMetric]) { def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { - new SparkPlanGraphCluster(id, name, desc, + new SparkPlanGraphCluster( + id, + name, + desc, new ArrayBuffer() ++ nodes.map(_.toSparkPlanGraphNode()), metrics) } @@ -139,7 +144,4 @@ class SparkPlanGraphNodeWrapper( } -case class SQLPlanMetric( - name: String, - accumulatorId: Long, - metricType: String) +case class SQLPlanMetric(name: String, accumulatorId: Long, metricType: String) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala index 7dbda46ec6a6f..2e30bd668a71e 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala @@ -29,18 +29,21 @@ import org.apache.spark.ui.UIUtils private[v1] class SqlResource extends BaseAppResource { @GET - def sqlList(@QueryParam("details") @DefaultValue("false") details: Boolean): - Seq[ExecutionData] = { + def sqlList( + @DefaultValue("false") @QueryParam("details") details: Boolean, + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) - sqlStore.executionsList().map(prepareExecutionData(_, details)) + sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details)) } } @GET @Path("{executionId:\\d+}") - def sql(@PathParam("executionId") execId: Long, - @QueryParam("details") @DefaultValue("false") details: Boolean): ExecutionData = { + def sql( + @PathParam("executionId") execId: Long, + @DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = { withUI { ui => val sqlStore = new SQLAppStatusStore(ui.store.store) sqlStore @@ -50,12 +53,14 @@ private[v1] class SqlResource extends BaseAppResource { } } - def printableMetrics(metrics: Seq[SQLPlanMetric], - metricValues: Map[Long, String]): Seq[(String, String)] = { - metrics.map(metric => (metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) + private def printableMetrics( + metrics: Seq[SQLPlanMetric], + metricValues: Map[Long, String]): Seq[Metrics] = { + metrics.map(metric => + Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) } - def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { + private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { var running = Seq[Int]() var completed = Seq[Int]() var failed = Seq[Int]() @@ -63,7 +68,7 @@ private[v1] class SqlResource extends BaseAppResource { exec.jobs.foreach { case (id, JobExecutionStatus.RUNNING) => running = running :+ id - case (id, JobExecutionStatus.SUCCEEDED ) => + case (id, JobExecutionStatus.SUCCEEDED) => completed = completed :+ id case (id, JobExecutionStatus.FAILED) => failed = failed :+ id @@ -78,13 +83,19 @@ private[v1] class SqlResource extends BaseAppResource { "RUNNING" } - val duration = UIUtils.formatDuration( - exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime) + val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime val planDetails = if (details) exec.physicalPlanDescription else "" val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty - new ExecutionData(exec.executionId, - status, exec.description, planDetails, metrics, - UIUtils.formatDate(exec.submissionTime), - duration, running, completed, failed) + new ExecutionData( + exec.executionId, + status, + exec.description, + planDetails, + metrics, + new Date(exec.submissionTime), + duration, + running, + completed, + failed) } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index d6e6f421818a7..210642b291a80 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -16,14 +16,18 @@ */ package org.apache.spark.status.api.v1 -class ExecutionData ( - val id : Long, +import java.util.Date + +class ExecutionData private[spark] ( + val id: Long, val status: String, val description: String, val planDescription: String, - val metrics: Seq[(String, String)], - val submissionTime: String, - val duration: String, + val metrics: Seq[Metrics], + val submissionTime: Date, + val duration: Long, val runningJobIds: Seq[Int], val successJobIds: Seq[Int], val failedJobIds: Seq[Int]) + +case class Metrics private[spark] (metricName: String, metricValue: String) From 7d0dd19c9477f142cbbe662acd5112f3bbe88ad6 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sat, 11 Jan 2020 02:34:54 +0530 Subject: [PATCH 13/15] run scalafmt on modifed block only and move to sql package to handle review comment --- .../sql/execution/ui/SQLAppStatusStore.scala | 70 ++++++++++--------- .../api/v1/{ => sql}/ApiSqlRootResource.scala | 4 +- .../status/api/v1/{ => sql}/SqlResource.scala | 7 +- .../spark/status/api/v1/{ => sql}/api.scala | 2 +- 4 files changed, 44 insertions(+), 39 deletions(-) rename sql/core/src/main/scala/org/apache/spark/status/api/v1/{ => sql}/ApiSqlRootResource.scala (90%) rename sql/core/src/main/scala/org/apache/spark/status/api/v1/{ => sql}/SqlResource.scala (95%) rename sql/core/src/main/scala/org/apache/spark/status/api/v1/{ => sql}/api.scala (96%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 1b7f39ef5fa63..f3d1168e9c900 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -34,7 +34,9 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore} * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's * no state kept in this class, so it's ok to have multiple instances of it in an application. */ -class SQLAppStatusStore(store: KVStore, val listener: Option[SQLAppStatusListener] = None) { +class SQLAppStatusStore( + store: KVStore, + val listener: Option[SQLAppStatusListener] = None) { def executionsList(): Seq[SQLExecutionUIData] = { store.view(classOf[SQLExecutionUIData]).asScala.toSeq @@ -80,33 +82,33 @@ class SQLAppStatusStore(store: KVStore, val listener: Option[SQLAppStatusListene } class SQLExecutionUIData( - @KVIndexParam val executionId: Long, - val description: String, - val details: String, - val physicalPlanDescription: String, - val metrics: Seq[SQLPlanMetric], - val submissionTime: Long, - val completionTime: Option[Date], - @JsonDeserialize(keyAs = classOf[Integer]) - val jobs: Map[Int, JobExecutionStatus], - @JsonDeserialize(contentAs = classOf[Integer]) - val stages: Set[Int], - /** - * This field is only populated after the execution is finished; it will be null while the - * execution is still running. During execution, aggregate metrics need to be retrieved - * from the SQL listener instance. - */ - @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + @KVIndexParam val executionId: Long, + val description: String, + val details: String, + val physicalPlanDescription: String, + val metrics: Seq[SQLPlanMetric], + val submissionTime: Long, + val completionTime: Option[Date], + @JsonDeserialize(keyAs = classOf[Integer]) + val jobs: Map[Int, JobExecutionStatus], + @JsonDeserialize(contentAs = classOf[Integer]) + val stages: Set[Int], + /** + * This field is only populated after the execution is finished; it will be null while the + * execution is still running. During execution, aggregate metrics need to be retrieved + * from the SQL listener instance. + */ + @JsonDeserialize(keyAs = classOf[JLong]) + val metricValues: Map[Long, String]) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) } class SparkPlanGraphWrapper( - @KVIndexParam val executionId: Long, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val edges: Seq[SparkPlanGraphEdge]) { + @KVIndexParam val executionId: Long, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val edges: Seq[SparkPlanGraphEdge]) { def toSparkPlanGraph(): SparkPlanGraph = { SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges) @@ -115,17 +117,14 @@ class SparkPlanGraphWrapper( } class SparkPlanGraphClusterWrapper( - val id: Long, - val name: String, - val desc: String, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val metrics: Seq[SQLPlanMetric]) { + val id: Long, + val name: String, + val desc: String, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val metrics: Seq[SQLPlanMetric]) { def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { - new SparkPlanGraphCluster( - id, - name, - desc, + new SparkPlanGraphCluster(id, name, desc, new ArrayBuffer() ++ nodes.map(_.toSparkPlanGraphNode()), metrics) } @@ -134,8 +133,8 @@ class SparkPlanGraphClusterWrapper( /** Only one of the values should be set. */ class SparkPlanGraphNodeWrapper( - val node: SparkPlanGraphNode, - val cluster: SparkPlanGraphClusterWrapper) { + val node: SparkPlanGraphNode, + val cluster: SparkPlanGraphClusterWrapper) { def toSparkPlanGraphNode(): SparkPlanGraphNode = { assert(node == null ^ cluster == null, "Exactly one of node, cluster values to be set.") @@ -144,4 +143,7 @@ class SparkPlanGraphNodeWrapper( } -case class SQLPlanMetric(name: String, accumulatorId: Long, metricType: String) +case class SQLPlanMetric( + name: String, + accumulatorId: Long, + metricType: String) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala similarity index 90% rename from sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala rename to sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala index def4f03d9f3e3..5fc7123c9097b 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/ApiSqlRootResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -15,10 +15,12 @@ * limitations under the License. */ -package org.apache.spark.status.api.v1 +package org.apache.spark.status.api.v1.sql import javax.ws.rs.Path +import org.apache.spark.status.api.v1.ApiRequestContext + @Path("/v1") private[v1] class ApiSqlRootResource extends ApiRequestContext { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala rename to sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 2e30bd668a71e..a378e875445c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.spark.status.api.v1 +package org.apache.spark.status.api.v1.sql import java.util.Date -import javax.ws.rs.{DefaultValue, GET, Path, PathParam, Produces, QueryParam} +import javax.ws.rs._ import javax.ws.rs.core.MediaType import org.apache.spark.JobExecutionStatus + import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} -import org.apache.spark.ui.UIUtils +import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} @Produces(Array(MediaType.APPLICATION_JSON)) private[v1] class SqlResource extends BaseAppResource { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala rename to sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 210642b291a80..7ace66ffb06e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.status.api.v1 +package org.apache.spark.status.api.v1.sql import java.util.Date From a8e25256d26b0ef38440dd1a7fb1d9e18bbb4b09 Mon Sep 17 00:00:00 2001 From: Ajith Date: Sat, 11 Jan 2020 02:37:55 +0530 Subject: [PATCH 14/15] Update --- .../sql/execution/ui/SQLAppStatusStore.scala | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index f3d1168e9c900..a90f37a80d525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -35,8 +35,8 @@ import org.apache.spark.util.kvstore.{KVIndex, KVStore} * no state kept in this class, so it's ok to have multiple instances of it in an application. */ class SQLAppStatusStore( - store: KVStore, - val listener: Option[SQLAppStatusListener] = None) { + store: KVStore, + val listener: Option[SQLAppStatusListener] = None) { def executionsList(): Seq[SQLExecutionUIData] = { store.view(classOf[SQLExecutionUIData]).asScala.toSeq @@ -82,33 +82,33 @@ class SQLAppStatusStore( } class SQLExecutionUIData( - @KVIndexParam val executionId: Long, - val description: String, - val details: String, - val physicalPlanDescription: String, - val metrics: Seq[SQLPlanMetric], - val submissionTime: Long, - val completionTime: Option[Date], - @JsonDeserialize(keyAs = classOf[Integer]) - val jobs: Map[Int, JobExecutionStatus], - @JsonDeserialize(contentAs = classOf[Integer]) - val stages: Set[Int], - /** - * This field is only populated after the execution is finished; it will be null while the - * execution is still running. During execution, aggregate metrics need to be retrieved - * from the SQL listener instance. - */ - @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String]) { + @KVIndexParam val executionId: Long, + val description: String, + val details: String, + val physicalPlanDescription: String, + val metrics: Seq[SQLPlanMetric], + val submissionTime: Long, + val completionTime: Option[Date], + @JsonDeserialize(keyAs = classOf[Integer]) + val jobs: Map[Int, JobExecutionStatus], + @JsonDeserialize(contentAs = classOf[Integer]) + val stages: Set[Int], + /** + * This field is only populated after the execution is finished; it will be null while the + * execution is still running. During execution, aggregate metrics need to be retrieved + * from the SQL listener instance. + */ + @JsonDeserialize(keyAs = classOf[JLong]) + val metricValues: Map[Long, String]) { @JsonIgnore @KVIndex("completionTime") private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L) } class SparkPlanGraphWrapper( - @KVIndexParam val executionId: Long, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val edges: Seq[SparkPlanGraphEdge]) { + @KVIndexParam val executionId: Long, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val edges: Seq[SparkPlanGraphEdge]) { def toSparkPlanGraph(): SparkPlanGraph = { SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges) @@ -117,11 +117,11 @@ class SparkPlanGraphWrapper( } class SparkPlanGraphClusterWrapper( - val id: Long, - val name: String, - val desc: String, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val metrics: Seq[SQLPlanMetric]) { + val id: Long, + val name: String, + val desc: String, + val nodes: Seq[SparkPlanGraphNodeWrapper], + val metrics: Seq[SQLPlanMetric]) { def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { new SparkPlanGraphCluster(id, name, desc, @@ -133,8 +133,8 @@ class SparkPlanGraphClusterWrapper( /** Only one of the values should be set. */ class SparkPlanGraphNodeWrapper( - val node: SparkPlanGraphNode, - val cluster: SparkPlanGraphClusterWrapper) { + val node: SparkPlanGraphNode, + val cluster: SparkPlanGraphClusterWrapper) { def toSparkPlanGraphNode(): SparkPlanGraphNode = { assert(node == null ^ cluster == null, "Exactly one of node, cluster values to be set.") @@ -144,6 +144,6 @@ class SparkPlanGraphNodeWrapper( } case class SQLPlanMetric( - name: String, - accumulatorId: Long, - metricType: String) + name: String, + accumulatorId: Long, + metricType: String) From 4560df966b767336998462faae7c3d5ab90063ed Mon Sep 17 00:00:00 2001 From: Ajith Date: Sat, 11 Jan 2020 02:58:57 +0530 Subject: [PATCH 15/15] Import --- .../scala/org/apache/spark/status/api/v1/sql/SqlResource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index a378e875445c7..346e07f2bef15 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -22,7 +22,6 @@ import javax.ws.rs._ import javax.ws.rs.core.MediaType import org.apache.spark.JobExecutionStatus - import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}