From 6ce7e6a62064a3eea6ec495b30ceba5d0f71af9a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 18 Apr 2016 17:46:56 -0700 Subject: [PATCH 1/3] Fixed StateStoreSuite --- .../sql/execution/streaming/state/StateStore.scala | 4 ++++ .../streaming/state/StateStoreSuite.scala | 14 +++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 952150632519e..2e13672ccb9e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -155,6 +155,10 @@ private[state] object StateStore extends Logging { loadedProviders.contains(storeId) } + def isMaintenanceRunning: Boolean = loadedProviders.synchronized { + maintenanceTask != null + } + /** Unload and stop all state store providers */ def stop(): Unit = loadedProviders.synchronized { loadedProviders.clear() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index dd23925716b06..a4f55ed464326 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -47,8 +47,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth private val keySchema = StructType(Seq(StructField("key", StringType, true))) private val valueSchema = StructType(Seq(StructField("value", IntegerType, true))) + before { + StateStore.stop() + require(!StateStore.isMaintenanceRunning) + } + after { StateStore.stop() + require(!StateStore.isMaintenanceRunning) } test("get, put, remove, commit, and all data iterator") { @@ -352,7 +358,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } } - ignore("maintenance") { + test("maintenance") { val conf = new SparkConf() .setMaster("local") .setAppName("test") @@ -366,20 +372,26 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val provider = new HDFSBackedStateStoreProvider( storeId, keySchema, valueSchema, storeConf, hadoopConf) + quietly { withSpark(new SparkContext(conf)) { sc => withCoordinatorRef(sc) { coordinatorRef => + require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running") + for (i <- 1 to 20) { val store = StateStore.get( storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf) put(store, "a", i) store.commit() } + eventually(timeout(10 seconds)) { assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported") } // Background maintenance should clean up and generate snapshots + assert(StateStore.isMaintenanceRunning, "Maintenance task is not running") + eventually(timeout(10 seconds)) { // Earliest delta file should get cleaned up assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted") From a361ea755969c499b949749b4540dcadf569b3e6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 20 Apr 2016 13:13:43 -0700 Subject: [PATCH 2/3] Handled error --- .../streaming/state/StateStore.scala | 35 +++++++------------ .../streaming/StreamingAggregationSuite.scala | 9 ++++- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 2e13672ccb9e7..617c3ca45b5ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -113,7 +113,7 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate * the store is the active instance. Accordingly, it either keeps it loaded and performs * maintenance, or unloads the store. */ -private[state] object StateStore extends Logging { +private[sql] object StateStore extends Logging { val MAINTENANCE_INTERVAL_CONFIG = "spark.streaming.stateStore.maintenanceInterval" val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 @@ -201,35 +201,26 @@ private[state] object StateStore extends Logging { } } catch { case NonFatal(e) => - logWarning(s"Error managing $provider") + logWarning(s"Error managing $provider, stopping management thread") + stop() } } } private def reportActiveStoreInstance(storeId: StateStoreId): Unit = { - try { - val host = SparkEnv.get.blockManager.blockManagerId.host - val executorId = SparkEnv.get.blockManager.blockManagerId.executorId - coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId)) - logDebug(s"Reported that the loaded instance $storeId is active") - } catch { - case NonFatal(e) => - logWarning(s"Error reporting active instance of $storeId") - } + val host = SparkEnv.get.blockManager.blockManagerId.host + val executorId = SparkEnv.get.blockManager.blockManagerId.executorId + coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId)) + logDebug(s"Reported that the loaded instance $storeId is active") + } private def verifyIfStoreInstanceActive(storeId: StateStoreId): Boolean = { - try { - val executorId = SparkEnv.get.blockManager.blockManagerId.executorId - val verified = - coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) - logDebug(s"Verified whether the loaded instance $storeId is active: $verified" ) - verified - } catch { - case NonFatal(e) => - logWarning(s"Error verifying active instance of $storeId") - false - } + val executorId = SparkEnv.get.blockManager.blockManagerId.executorId + val verified = + coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) + logDebug(s"Verified whether the loaded instance $storeId is active: $verified" ) + verified } private def coordinatorRef: Option[StateStoreCoordinatorRef] = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index fa3b122f6d2da..64dc6e972b4d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.streaming +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.SparkException import org.apache.spark.sql.StreamTest import org.apache.spark.sql.catalyst.analysis.Update import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scala.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -29,7 +32,11 @@ object FailureSinglton { var firstTime = true } -class StreamingAggregationSuite extends StreamTest with SharedSQLContext { +class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll { + + override def afterAll(): Unit = { + StateStore.stop() + } import testImplicits._ From 1d2fb46c7a7e66d1004566cfd495daf4777d74de Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 20 Apr 2016 16:39:53 -0700 Subject: [PATCH 3/3] Fixed bugs --- .../streaming/state/StateStore.scala | 51 +++++++++++-------- .../streaming/state/StateStoreSuite.scala | 1 + 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 617c3ca45b5ab..9948292470ff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -191,36 +191,45 @@ private[sql] object StateStore extends Logging { */ private def doMaintenance(): Unit = { logDebug("Doing maintenance") - loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => - try { - if (verifyIfStoreInstanceActive(id)) { - provider.doMaintenance() - } else { - unload(id) - logInfo(s"Unloaded $provider") + if (SparkEnv.get == null) { + stop() + } else { + loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) => + try { + if (verifyIfStoreInstanceActive(id)) { + provider.doMaintenance() + } else { + unload(id) + logInfo(s"Unloaded $provider") + } + } catch { + case NonFatal(e) => + logWarning(s"Error managing $provider, stopping management thread") + stop() } - } catch { - case NonFatal(e) => - logWarning(s"Error managing $provider, stopping management thread") - stop() } } } private def reportActiveStoreInstance(storeId: StateStoreId): Unit = { - val host = SparkEnv.get.blockManager.blockManagerId.host - val executorId = SparkEnv.get.blockManager.blockManagerId.executorId - coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId)) - logDebug(s"Reported that the loaded instance $storeId is active") - + if (SparkEnv.get != null) { + val host = SparkEnv.get.blockManager.blockManagerId.host + val executorId = SparkEnv.get.blockManager.blockManagerId.executorId + coordinatorRef.foreach(_.reportActiveInstance(storeId, host, executorId)) + logDebug(s"Reported that the loaded instance $storeId is active") + } } private def verifyIfStoreInstanceActive(storeId: StateStoreId): Boolean = { - val executorId = SparkEnv.get.blockManager.blockManagerId.executorId - val verified = - coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) - logDebug(s"Verified whether the loaded instance $storeId is active: $verified" ) - verified + if (SparkEnv.get != null) { + val executorId = SparkEnv.get.blockManager.blockManagerId.executorId + val verified = + coordinatorRef.map(_.verifyIfInstanceActive(storeId, executorId)).getOrElse(false) + logDebug(s"Verified whether the loaded instance $storeId is active: $verified") + verified + } else { + false + } } private def coordinatorRef: Option[StateStoreCoordinatorRef] = synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index a4f55ed464326..f8f8bc7d6ff84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -430,6 +430,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth require(SparkEnv.get === null) eventually(timeout(10 seconds)) { assert(!StateStore.isLoaded(storeId)) + assert(!StateStore.isMaintenanceRunning) } } }