diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index c6e7f3978469d..a90f37a80d525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -42,6 +42,10 @@ class SQLAppStatusStore( store.view(classOf[SQLExecutionUIData]).asScala.toSeq } + def executionsList(offset: Int, length: Int): Seq[SQLExecutionUIData] = { + store.view(classOf[SQLExecutionUIData]).skip(offset).max(length).asScala.toSeq + } + def execution(executionId: Long): Option[SQLExecutionUIData] = { try { Some(store.read(classOf[SQLExecutionUIData], executionId)) diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala new file mode 100644 index 0000000000000..5fc7123c9097b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/ApiSqlRootResource.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import javax.ws.rs.Path + +import org.apache.spark.status.api.v1.ApiRequestContext + +@Path("/v1") +private[v1] class ApiSqlRootResource extends ApiRequestContext { + + @Path("applications/{appId}/sql") + def sqlList(): Class[SqlResource] = classOf[SqlResource] +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala new file mode 100644 index 0000000000000..346e07f2bef15 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.sql + +import java.util.Date +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric} +import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException} + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class SqlResource extends BaseAppResource { + + @GET + def sqlList( + @DefaultValue("false") @QueryParam("details") details: Boolean, + @DefaultValue("0") @QueryParam("offset") offset: Int, + @DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details)) + } + } + + @GET + @Path("{executionId:\\d+}") + def sql( + @PathParam("executionId") execId: Long, + @DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = { + withUI { ui => + val sqlStore = new SQLAppStatusStore(ui.store.store) + sqlStore + .execution(execId) + .map(prepareExecutionData(_, details)) + .getOrElse(throw new NotFoundException("unknown id: " + execId)) + } + } + + private def printableMetrics( + metrics: Seq[SQLPlanMetric], + metricValues: Map[Long, String]): Seq[Metrics] = { + metrics.map(metric => + Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse(""))) + } + + private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = { + var running = Seq[Int]() + var completed = Seq[Int]() + var failed = Seq[Int]() + + exec.jobs.foreach { + case (id, JobExecutionStatus.RUNNING) => + running = running :+ id + case (id, JobExecutionStatus.SUCCEEDED) => + completed = completed :+ id + case (id, JobExecutionStatus.FAILED) => + failed = failed :+ id + case _ => + } + + val status = if (exec.jobs.size == completed.size) { + "COMPLETED" + } else if (failed.nonEmpty) { + "FAILED" + } else { + "RUNNING" + } + + val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime + val planDetails = if (details) exec.physicalPlanDescription else "" + val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty + new ExecutionData( + exec.executionId, + status, + exec.description, + planDetails, + metrics, + new Date(exec.submissionTime), + duration, + running, + completed, + failed) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala new file mode 100644 index 0000000000000..7ace66ffb06e1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1.sql + +import java.util.Date + +class ExecutionData private[spark] ( + val id: Long, + val status: String, + val description: String, + val planDescription: String, + val metrics: Seq[Metrics], + val submissionTime: Date, + val duration: Long, + val runningJobIds: Seq[Int], + val successJobIds: Seq[Int], + val failedJobIds: Seq[Int]) + +case class Metrics private[spark] (metricName: String, metricValue: String)