From 5f5be5bcce29acfff2fe84f2e363afbd8d7ffde6 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Wed, 23 Sep 2015 22:13:51 +0800 Subject: [PATCH 1/7] add search keywords in history page ui --- .../spark/deploy/history/HistoryPage.scala | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index b347cb3be69f7..c2250bdfce5e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -33,9 +33,15 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedFirst = (requestedPage - 1) * pageSize val requestedIncomplete = Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean + val requestedQuery = Option(request.getParameter("query")).getOrElse("") - val allApps = parent.getApplicationList() + var allApps = parent.getApplicationList() .filter(_.attempts.head.completed != requestedIncomplete) + if (!requestedQuery.equals("")) { + val query = requestedQuery.toLowerCase + allApps = allApps.filter(app => + app.id.contains(query) || app.name.toLowerCase.contains(query)) + } val allAppsSize = allApps.size val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 @@ -74,19 +80,27 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") // page, `...` will be displayed. if (allAppsSize > 0) { val leftSideIndices = - rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete, + requestedQuery) val rightSideIndices = rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount, - requestedIncomplete) + requestedIncomplete, requestedQuery)

+
+ Search: + + +
Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} { if (actualPage > 1) { - < - 1 + + < + + 1 } } {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} @@ -96,8 +110,12 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} { if (actualPage < pageCount) { - {pageCount} - > + + {pageCount} + + + > + } } @@ -115,7 +133,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

} } - + { if (requestedIncomplete) { "Back to completed applications" @@ -151,9 +169,10 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") private def rangeIndices( range: Seq[Int], condition: Int => Boolean, - showIncomplete: Boolean): Seq[Node] = { + showIncomplete: Boolean, + showQuery: String): Seq[Node] = { range.filter(condition).map(nextPage => - {nextPage} ) + {nextPage} ) } private def attemptRow( @@ -217,10 +236,11 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") info.attempts.drop(1).flatMap(attemptRow(true, info, _, false)) } - private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { + private def makePageLink(linkPage: Int, showIncomplete: Boolean, showQuery: String): String = { "/?" + Array( "page=" + linkPage, - "showIncomplete=" + showIncomplete + "showIncomplete=" + showIncomplete, + "query=" + showQuery ).mkString("&") } } From bff49b8f3b6a3e3562253fabfcbf2a11af4d9c88 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 00:57:28 +0800 Subject: [PATCH 2/7] rename Submit to Search --- .../spark/deploy/history/HistoryPage.scala | 2 +- .../org/apache/spark/sql/execution/Sort.scala | 100 ------------------ 2 files changed, 1 insertion(+), 101 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index c2250bdfce5e5..cc81e62a5ba1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -90,7 +90,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Search: - +
Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala deleted file mode 100644 index 24207cb46fd29..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.metric.SQLMetrics - -/** - * Performs (external) sorting. - * - * @param global when true performs a global sort of all partitions by shuffling the data first - * if necessary. - * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will - * spill every `frequency` records. - */ -case class Sort( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan, - testSpillFrequency: Int = 0) - extends UnaryNode { - - override def outputsUnsafeRows: Boolean = true - override def canProcessUnsafeRows: Boolean = true - override def canProcessSafeRows: Boolean = false - - override def output: Seq[Attribute] = child.output - - override def outputOrdering: Seq[SortOrder] = sortOrder - - override def requiredChildDistribution: Seq[Distribution] = - if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - - override private[sql] lazy val metrics = Map( - "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) - - protected override def doExecute(): RDD[InternalRow] = { - val schema = child.schema - val childOutput = child.output - - val dataSize = longMetric("dataSize") - val spillSize = longMetric("spillSize") - - child.execute().mapPartitionsInternal { iter => - val ordering = newOrdering(sortOrder, childOutput) - - // The comparator for comparing prefix - val boundSortExpression = BindReferences.bindReference(sortOrder.head, childOutput) - val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) - - // The generator for prefix - val prefixProjection = UnsafeProjection.create(Seq(SortPrefix(boundSortExpression))) - val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { - override def computePrefix(row: InternalRow): Long = { - prefixProjection.apply(row).getLong(0) - } - } - - val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - val sorter = new UnsafeExternalRowSorter( - schema, ordering, prefixComparator, prefixComputer, pageSize) - if (testSpillFrequency > 0) { - sorter.setTestSpillFrequency(testSpillFrequency) - } - - // Remember spill data size of this task before execute this operator so that we can - // figure out how many bytes we spilled for this operator. - val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled - - val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) - - dataSize += sorter.getPeakMemoryUsage - spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore - - TaskContext.get().internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage) - sortedIterator - } - } -} From 560de66e1f1a253d432a37bf10603cdced1c9ae3 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 01:08:55 +0800 Subject: [PATCH 3/7] Revert "rename Submit to Search" This reverts commit bff49b8f3b6a3e3562253fabfcbf2a11af4d9c88. --- .../spark/deploy/history/HistoryPage.scala | 2 +- .../org/apache/spark/sql/execution/Sort.scala | 100 ++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index cc81e62a5ba1a..c2250bdfce5e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -90,7 +90,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Search: - +
Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala new file mode 100644 index 0000000000000..24207cb46fd29 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.execution.metric.SQLMetrics + +/** + * Performs (external) sorting. + * + * @param global when true performs a global sort of all partitions by shuffling the data first + * if necessary. + * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will + * spill every `frequency` records. + */ +case class Sort( + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan, + testSpillFrequency: Int = 0) + extends UnaryNode { + + override def outputsUnsafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = false + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = sortOrder + + override def requiredChildDistribution: Seq[Distribution] = + if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil + + override private[sql] lazy val metrics = Map( + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + + protected override def doExecute(): RDD[InternalRow] = { + val schema = child.schema + val childOutput = child.output + + val dataSize = longMetric("dataSize") + val spillSize = longMetric("spillSize") + + child.execute().mapPartitionsInternal { iter => + val ordering = newOrdering(sortOrder, childOutput) + + // The comparator for comparing prefix + val boundSortExpression = BindReferences.bindReference(sortOrder.head, childOutput) + val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) + + // The generator for prefix + val prefixProjection = UnsafeProjection.create(Seq(SortPrefix(boundSortExpression))) + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + override def computePrefix(row: InternalRow): Long = { + prefixProjection.apply(row).getLong(0) + } + } + + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes + val sorter = new UnsafeExternalRowSorter( + schema, ordering, prefixComparator, prefixComputer, pageSize) + if (testSpillFrequency > 0) { + sorter.setTestSpillFrequency(testSpillFrequency) + } + + // Remember spill data size of this task before execute this operator so that we can + // figure out how many bytes we spilled for this operator. + val spillSizeBefore = TaskContext.get().taskMetrics().memoryBytesSpilled + + val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + + dataSize += sorter.getPeakMemoryUsage + spillSize += TaskContext.get().taskMetrics().memoryBytesSpilled - spillSizeBefore + + TaskContext.get().internalMetricsToAccumulators( + InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage) + sortedIterator + } + } +} From b146c2d9043287776778a139aa85d88312b1879e Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 01:10:23 +0800 Subject: [PATCH 4/7] fix minor style --- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index c2250bdfce5e5..5eadd9bc671b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -90,7 +90,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Search: - +
Showing {actualFirst + 1}-{last + 1} of {allAppsSize} {if (requestedIncomplete) "(Incomplete applications)"} @@ -237,10 +237,10 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } private def makePageLink(linkPage: Int, showIncomplete: Boolean, showQuery: String): String = { - "/?" + Array( + UIUtils.prependBaseUri("/?" + Array( "page=" + linkPage, "showIncomplete=" + showIncomplete, "query=" + showQuery - ).mkString("&") + ).mkString("&")) } } From 16965b681e85276052abba8f6ffd1ff7dc17e3ea Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 01:15:02 +0800 Subject: [PATCH 5/7] update minor code --- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 5eadd9bc671b4..b53b0f7131994 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -180,7 +180,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") info: ApplicationHistoryInfo, attempt: ApplicationAttemptInfo, isFirst: Boolean): Seq[Node] = { - val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId) + val uiAddress = UIUtils.prependBaseUri(HistoryServer.getAttemptURI(info.id, attempt.attemptId)) val startTime = UIUtils.formatDate(attempt.startTime) val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-" val duration = @@ -209,8 +209,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { if (renderAttemptIdColumn) { if (info.attempts.size > 1 && attempt.attemptId.isDefined) { - - {attempt.attemptId.get} + {attempt.attemptId.get} } else {   } From 28220a666be8984f5b31ae2a21c69108137226d3 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 09:39:10 +0800 Subject: [PATCH 6/7] address sarutak's comments --- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index b53b0f7131994..4101a2ad95968 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -39,8 +39,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") .filter(_.attempts.head.completed != requestedIncomplete) if (!requestedQuery.equals("")) { val query = requestedQuery.toLowerCase - allApps = allApps.filter(app => - app.id.contains(query) || app.name.toLowerCase.contains(query)) + allApps = allApps.filter(app => app.name.toLowerCase.contains(query)) } val allAppsSize = allApps.size @@ -88,7 +87,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

- Search: + Application Names:
From a3f6885a90d970c8d128522a40df496dfc1612a7 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Thu, 17 Dec 2015 09:50:59 +0800 Subject: [PATCH 7/7] fix minor name --- .../scala/org/apache/spark/deploy/history/HistoryPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 4101a2ad95968..9249b40440529 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -87,7 +87,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")

- Application Names: + Application Name: