From ab65fa115551c608696fb592310765c03ce68458 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 6 Jul 2015 22:49:06 -0700 Subject: [PATCH 01/10] History Server: some attempt completed to work with showIncomplete --- .../spark/deploy/history/HistoryPage.scala | 55 +++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 0830cc1ba1245..b42f92212692e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -35,7 +35,6 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean val allApps = parent.getApplicationList() - .filter(_.attempts.head.completed != requestedIncomplete) val allAppsSize = allApps.size val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 @@ -51,9 +50,15 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1) val appTable = if (hasMultipleAttempts) { - UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow) + UIUtils.listingTable( + appWithAttemptHeader, + appWithAttemptRow(_, requestedIncomplete), + appsToShow) } else { - UIUtils.listingTable(appHeader, appRow, appsToShow) + UIUtils.listingTable( + appHeader, + appRow(_, requestedIncomplete), + appsToShow) } val providerConfig = parent.getProviderConfig() @@ -157,7 +162,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") renderAttemptIdColumn: Boolean, info: ApplicationHistoryInfo, attempt: ApplicationAttemptInfo, - isFirst: Boolean): Seq[Node] = { + isFirst: Boolean, + requestedIncomplete: Boolean): Seq[Node] = { val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId) val startTime = UIUtils.formatDate(attempt.startTime) val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-" @@ -168,6 +174,10 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "-" } val lastUpdated = UIUtils.formatDate(attempt.lastUpdated) + var someAttemptCompleted = false + info.attempts.foreach{ attempt => + if (attempt.completed) someAttemptCompleted = true + } { if (isFirst) { @@ -185,7 +195,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } } { - if (renderAttemptIdColumn) { + if (renderAttemptIdColumn && + (requestedIncomplete || (!requestedIncomplete && someAttemptCompleted))) { if (info.attempts.size > 1 && attempt.attemptId.isDefined) { {attempt.attemptId.get} @@ -196,22 +207,34 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Nil } } - {startTime} - {endTime} - - {duration} - {attempt.sparkUser} - {lastUpdated} + { + if (requestedIncomplete || (!requestedIncomplete && someAttemptCompleted)) { + {startTime} + {endTime} + + {duration} + {attempt.sparkUser} + {lastUpdated} + } else { + Nil + } + } } - private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - attemptRow(false, info, info.attempts.head, true) + + + private def appRow( + info: ApplicationHistoryInfo, + requestedIncomplete: Boolean): Seq[Node] = { + attemptRow(false, info, info.attempts.head, true, requestedIncomplete) } - private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = { - attemptRow(true, info, info.attempts.head, true) ++ - info.attempts.drop(1).flatMap(attemptRow(true, info, _, false)) + private def appWithAttemptRow( + info: ApplicationHistoryInfo, + requestedIncomplete: Boolean): Seq[Node] = { + attemptRow(true, info, info.attempts.head, true, requestedIncomplete) ++ + info.attempts.drop(1).flatMap(attemptRow(true, info, _, false, requestedIncomplete)) } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { From a41ac4bda10cd83018e651aaeae123ef4056a579 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 8 Jul 2015 12:05:42 -0700 Subject: [PATCH 02/10] History Server: updated order for multiple attempts --- .../deploy/history/FsHistoryProvider.scala | 7 +- .../spark/deploy/history/HistoryPage.scala | 187 ++++++++---------- 2 files changed, 86 insertions(+), 108 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index db383b9823d3c..f83f243ffec8f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -414,7 +414,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same * application. Order is: running attempts before complete attempts, running attempts sorted - * by start time, completed attempts sorted by end time. + * by start time showing whichever started first, + * completed attempts sorted by end time showing whichever ended first. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. @@ -425,9 +426,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) a1: FsApplicationAttemptInfo, a2: FsApplicationAttemptInfo): Boolean = { if (a1.completed == a2.completed) { - if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime + if (a1.completed) a1.endTime <= a2.endTime else a1.startTime <= a2.startTime } else { - !a1.completed + a1.completed } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index b42f92212692e..dd12898723348 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -35,6 +35,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean val allApps = parent.getApplicationList() + .filter(_.attempts.head.completed != requestedIncomplete) val allAppsSize = allApps.size val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0 @@ -50,15 +51,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1) val appTable = if (hasMultipleAttempts) { - UIUtils.listingTable( - appWithAttemptHeader, - appWithAttemptRow(_, requestedIncomplete), - appsToShow) + UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow) } else { - UIUtils.listingTable( - appHeader, - appRow(_, requestedIncomplete), - appsToShow) + UIUtils.listingTable(appHeader, appRow, appsToShow) } val providerConfig = parent.getProviderConfig() @@ -69,61 +64,61 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { - // This displays the indices of pages that are within `plusOrMinus` pages of - // the current page. Regardless of where the current page is, this also links - // to the first and last page. If the current page +/- `plusOrMinus` is greater - // than the 2nd page from the first page or less than the 2nd page from the last - // page, `...` will be displayed. - if (allAppsSize > 0) { - val leftSideIndices = - rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) - val rightSideIndices = - rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount, - requestedIncomplete) - -

    - Showing {actualFirst + 1}-{last + 1} of {allAppsSize} - {if (requestedIncomplete) "(Incomplete applications)"} - - { - if (actualPage > 1) { - < - 1 - } - } - {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} - {leftSideIndices} - {actualPage} - {rightSideIndices} - {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} - { - if (actualPage < pageCount) { - {pageCount} - > - } - } - -

    ++ + // This displays the indices of pages that are within `plusOrMinus` pages of + // the current page. Regardless of where the current page is, this also links + // to the first and last page. If the current page +/- `plusOrMinus` is greater + // than the 2nd page from the first page or less than the 2nd page from the last + // page, `...` will be displayed. + if (allAppsSize > 0) { + val leftSideIndices = + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) + val rightSideIndices = + rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount, + requestedIncomplete) + +

    + Showing {actualFirst + 1}-{last + 1} of {allAppsSize} + {if (requestedIncomplete) "(Incomplete applications)"} + + { + if (actualPage > 1) { + < + 1 + } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + {pageCount} + > + } + } + +

    ++ appTable - } else if (requestedIncomplete) { -

    No incomplete applications found!

    - } else { -

    No completed applications found!

    ++ + } else if (requestedIncomplete) { +

    No incomplete applications found!

    + } else { +

    No completed applications found!

    ++

    Did you specify the correct logging directory? Please verify your setting of spark.history.fs.logDirectory and whether you have the permissions to access it.
    It is also possible that your application did not run to completion or did not stop the SparkContext.

    - } + } } { - if (requestedIncomplete) { - "Back to completed applications" - } else { - "Show incomplete applications" - } + if (requestedIncomplete) { + "Back to completed applications" + } else { + "Show incomplete applications" + } } @@ -151,19 +146,18 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "Last Updated") private def rangeIndices( - range: Seq[Int], - condition: Int => Boolean, - showIncomplete: Boolean): Seq[Node] = { + range: Seq[Int], + condition: Int => Boolean, + showIncomplete: Boolean): Seq[Node] = { range.filter(condition).map(nextPage => {nextPage} ) } private def attemptRow( - renderAttemptIdColumn: Boolean, - info: ApplicationHistoryInfo, - attempt: ApplicationAttemptInfo, - isFirst: Boolean, - requestedIncomplete: Boolean): Seq[Node] = { + renderAttemptIdColumn: Boolean, + info: ApplicationHistoryInfo, + attempt: ApplicationAttemptInfo, + isFirst: Boolean): Seq[Node] = { val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId) val startTime = UIUtils.formatDate(attempt.startTime) val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-" @@ -174,67 +168,50 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "-" } val lastUpdated = UIUtils.formatDate(attempt.lastUpdated) - var someAttemptCompleted = false - info.attempts.foreach{ attempt => - if (attempt.completed) someAttemptCompleted = true - } { - if (isFirst) { - if (info.attempts.size > 1 || renderAttemptIdColumn) { - - {info.id} + if (isFirst) { + if (info.attempts.size > 1 || renderAttemptIdColumn) { + + {info.id} {info.name} - } else { - {info.id} - {info.name} - } } else { - Nil + {info.id} + {info.name} } + } else { + Nil } - { - if (renderAttemptIdColumn && - (requestedIncomplete || (!requestedIncomplete && someAttemptCompleted))) { - if (info.attempts.size > 1 && attempt.attemptId.isDefined) { - - {attempt.attemptId.get} - } else { -   - } - } else { - Nil - } } { - if (requestedIncomplete || (!requestedIncomplete && someAttemptCompleted)) { - {startTime} - {endTime} - - {duration} - {attempt.sparkUser} - {lastUpdated} + if (renderAttemptIdColumn) { + if (info.attempts.size > 1 && attempt.attemptId.isDefined) { + + {attempt.attemptId.get} } else { - Nil +   } + } else { + Nil + } } + {startTime} + {endTime} + + {duration} + {attempt.sparkUser} + {lastUpdated} } - - - private def appRow( - info: ApplicationHistoryInfo, - requestedIncomplete: Boolean): Seq[Node] = { - attemptRow(false, info, info.attempts.head, true, requestedIncomplete) + private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(false, info, info.attempts.head, true) } - private def appWithAttemptRow( - info: ApplicationHistoryInfo, - requestedIncomplete: Boolean): Seq[Node] = { - attemptRow(true, info, info.attempts.head, true, requestedIncomplete) ++ - info.attempts.drop(1).flatMap(attemptRow(true, info, _, false, requestedIncomplete)) + private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = { + attemptRow(true, info, info.attempts.head, true) ++ + info.attempts.drop(1).flatMap(attemptRow(true, info, _, false)) } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { From 85024e8898480b75f31518b538c89d6758ae0801 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 8 Jul 2015 12:17:18 -0700 Subject: [PATCH 03/10] History Server: updated order for multiple attempts --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f83f243ffec8f..698fb62620a01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -413,7 +413,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same - * application. Order is: running attempts before complete attempts, running attempts sorted + * application. Order is: completed attempts before running attempts, running attempts sorted * by start time showing whichever started first, * completed attempts sorted by end time showing whichever ended first. * From 304cb0b67c94b60e47e427b90c43680fd3d4fee1 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 8 Jul 2015 13:22:27 -0700 Subject: [PATCH 04/10] History Server: updated order for multiple attempts(reverted HistoryPage) --- .../spark/deploy/history/HistoryPage.scala | 138 +++++++++--------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index dd12898723348..0830cc1ba1245 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -64,61 +64,61 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {providerConfig.map { case (k, v) =>
  • {k}: {v}
  • }} { - // This displays the indices of pages that are within `plusOrMinus` pages of - // the current page. Regardless of where the current page is, this also links - // to the first and last page. If the current page +/- `plusOrMinus` is greater - // than the 2nd page from the first page or less than the 2nd page from the last - // page, `...` will be displayed. - if (allAppsSize > 0) { - val leftSideIndices = - rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) - val rightSideIndices = - rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount, - requestedIncomplete) - -

    - Showing {actualFirst + 1}-{last + 1} of {allAppsSize} - {if (requestedIncomplete) "(Incomplete applications)"} - - { - if (actualPage > 1) { - < - 1 - } - } - {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} - {leftSideIndices} - {actualPage} - {rightSideIndices} - {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} - { - if (actualPage < pageCount) { - {pageCount} - > - } - } - -

    ++ + // This displays the indices of pages that are within `plusOrMinus` pages of + // the current page. Regardless of where the current page is, this also links + // to the first and last page. If the current page +/- `plusOrMinus` is greater + // than the 2nd page from the first page or less than the 2nd page from the last + // page, `...` will be displayed. + if (allAppsSize > 0) { + val leftSideIndices = + rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete) + val rightSideIndices = + rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount, + requestedIncomplete) + +

    + Showing {actualFirst + 1}-{last + 1} of {allAppsSize} + {if (requestedIncomplete) "(Incomplete applications)"} + + { + if (actualPage > 1) { + < + 1 + } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + {pageCount} + > + } + } + +

    ++ appTable - } else if (requestedIncomplete) { -

    No incomplete applications found!

    - } else { -

    No completed applications found!

    ++ + } else if (requestedIncomplete) { +

    No incomplete applications found!

    + } else { +

    No completed applications found!

    ++

    Did you specify the correct logging directory? Please verify your setting of spark.history.fs.logDirectory and whether you have the permissions to access it.
    It is also possible that your application did not run to completion or did not stop the SparkContext.

    - } + } } { - if (requestedIncomplete) { - "Back to completed applications" - } else { - "Show incomplete applications" - } + if (requestedIncomplete) { + "Back to completed applications" + } else { + "Show incomplete applications" + } } @@ -146,18 +146,18 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") "Last Updated") private def rangeIndices( - range: Seq[Int], - condition: Int => Boolean, - showIncomplete: Boolean): Seq[Node] = { + range: Seq[Int], + condition: Int => Boolean, + showIncomplete: Boolean): Seq[Node] = { range.filter(condition).map(nextPage => {nextPage} ) } private def attemptRow( - renderAttemptIdColumn: Boolean, - info: ApplicationHistoryInfo, - attempt: ApplicationAttemptInfo, - isFirst: Boolean): Seq[Node] = { + renderAttemptIdColumn: Boolean, + info: ApplicationHistoryInfo, + attempt: ApplicationAttemptInfo, + isFirst: Boolean): Seq[Node] = { val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId) val startTime = UIUtils.formatDate(attempt.startTime) val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-" @@ -170,31 +170,31 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val lastUpdated = UIUtils.formatDate(attempt.lastUpdated) { - if (isFirst) { - if (info.attempts.size > 1 || renderAttemptIdColumn) { - - {info.id} + if (isFirst) { + if (info.attempts.size > 1 || renderAttemptIdColumn) { + + {info.id} {info.name} - } else { - {info.id} + } else { + {info.id} {info.name} + } + } else { + Nil } - } else { - Nil - } } { - if (renderAttemptIdColumn) { - if (info.attempts.size > 1 && attempt.attemptId.isDefined) { - - {attempt.attemptId.get} + if (renderAttemptIdColumn) { + if (info.attempts.size > 1 && attempt.attemptId.isDefined) { + + {attempt.attemptId.get} + } else { +   + } } else { -   + Nil } - } else { - Nil - } } {startTime} {endTime} From cc0fda7dc565a8b2a9a2704301a034a36538b987 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 8 Jul 2015 15:07:41 -0700 Subject: [PATCH 05/10] History Server: updated order for multiple attempts(updated test) --- .../history/FsHistoryProviderSuite.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d3a6db5f260d6..b9511eb8b82b9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -251,13 +251,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc appListAfterRename.size should be (1) } - test("apps with multiple attempts") { + test("apps with multiple attempts with order") { val provider = new FsHistoryProvider(createTestConf()) - val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false) + val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), - SparkListenerApplicationEnd(2L) + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")) ) updateAndCheck(provider) { list => @@ -267,27 +266,26 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true) writeFile(attempt2, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")) + SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2")) ) updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be (Some("attempt2")) + list.head.attempts.head.attemptId should be (Some("attempt1")) } - val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false) - attempt2.delete() - writeFile(attempt2, true, None, - SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")), + val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false) + writeFile(attempt3, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")), SparkListenerApplicationEnd(4L) ) updateAndCheck(provider) { list => list should not be (null) list.size should be (1) - list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be (Some("attempt2")) + list.head.attempts.size should be (3) + list.head.attempts.head.attemptId should be (Some("attempt3")) } val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false) @@ -299,7 +297,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (2) list.head.attempts.size should be (1) - list.last.attempts.size should be (2) + list.last.attempts.size should be (3) list.head.attempts.head.attemptId should be (Some("attempt1")) list.foreach { case app => @@ -343,7 +341,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (1) - list.head.attempts.head.attemptId should be (Some("attempt2")) + list.head.attempts.head.attemptId should be (Some("attempt1")) } assert(!log1.exists()) From b0fc922fafcf8fecc83e935c4792af19bf1f9c83 Mon Sep 17 00:00:00 2001 From: Joshi Date: Thu, 9 Jul 2015 16:30:14 -0700 Subject: [PATCH 06/10] History Server: updated order for multiple attempts(updated comment) --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 698fb62620a01..250c38228ad7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -414,8 +414,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same * application. Order is: completed attempts before running attempts, running attempts sorted - * by start time showing whichever started first, - * completed attempts sorted by end time showing whichever ended first. + * by ascending start time,completed attempts sorted by ascending end time. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. From 83306a8bc1749974fd60edf441843ea9a93c3520 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sun, 12 Jul 2015 12:27:27 -0700 Subject: [PATCH 07/10] History Server: updated order for multiple attempts(descending start time) --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 +++--- .../spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 250c38228ad7d..051b5e40d20b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -413,8 +413,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same - * application. Order is: completed attempts before running attempts, running attempts sorted - * by ascending start time,completed attempts sorted by ascending end time. + * application. Order is: completed attempts before running attempts, if both completed + * or both running attempts sorted by descending start time. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. @@ -425,7 +425,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) a1: FsApplicationAttemptInfo, a2: FsApplicationAttemptInfo): Boolean = { if (a1.completed == a2.completed) { - if (a1.completed) a1.endTime <= a2.endTime else a1.startTime <= a2.startTime + a1.startTime >= a2.startTime } else { a1.completed } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index b9511eb8b82b9..12ec126832b04 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -272,7 +272,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (2) - list.head.attempts.head.attemptId should be (Some("attempt1")) + list.head.attempts.head.attemptId should be (Some("attempt2")) } val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false) @@ -289,7 +289,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false) - writeFile(attempt2, true, None, + writeFile(attempt1, true, None, SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")), SparkListenerApplicationEnd(6L) ) From 548c7532f7facc1a177fa3117b8e5c104349afe7 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 13 Jul 2015 16:34:57 -0700 Subject: [PATCH 08/10] History Server: updated order for multiple attempts(descending start time works everytime) --- .../spark/deploy/history/FsHistoryProvider.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 051b5e40d20b4..4b873c3c5989b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -413,8 +413,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same - * application. Order is: completed attempts before running attempts, if both completed - * or both running attempts sorted by descending start time. + * application. Order is: later attempts before earlier attempts. + * The most recent attempt state matches with current state of the app. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. @@ -424,11 +424,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def compareAttemptInfo( a1: FsApplicationAttemptInfo, a2: FsApplicationAttemptInfo): Boolean = { - if (a1.completed == a2.completed) { - a1.startTime >= a2.startTime - } else { - a1.completed - } + a1.startTime >= a2.startTime } /** From 716e0b1d4e92b89106031fd0eb0be9c503f71e61 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 13 Jul 2015 16:40:57 -0700 Subject: [PATCH 09/10] History Server: updated order for multiple attempts(descending start time works everytime) --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 4b873c3c5989b..7d68e37de2a96 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -413,8 +413,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Comparison function that defines the sort order for application attempts within the same - * application. Order is: later attempts before earlier attempts. - * The most recent attempt state matches with current state of the app. + * application. Order is: attempts are sorted by descending start time. + * Most recent attempt state matches with current state of the app. * * Normally applications should have a single running attempt; but failure to call sc.stop() * may cause multiple running attempts to show up. From 874dd8007850649a57037346f743d3d0578d0a4c Mon Sep 17 00:00:00 2001 From: Joshi Date: Tue, 14 Jul 2015 13:54:50 -0700 Subject: [PATCH 10/10] History Server: updated order for multiple attempts(logcleaner) --- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 12ec126832b04..0c8dc25bec969 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -341,7 +341,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc updateAndCheck(provider) { list => list.size should be (1) list.head.attempts.size should be (1) - list.head.attempts.head.attemptId should be (Some("attempt1")) + list.head.attempts.head.attemptId should be (Some("attempt2")) } assert(!log1.exists())