From 6d208f5e3bed657bedd5838595efa8d14ff3be33 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 10 Jul 2017 17:00:53 -0700 Subject: [PATCH 1/3] create distinction between read-only and read-write state stores --- .../state/HDFSBackedStateStoreProvider.scala | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index bae7a15165e43..500b8ca3a8958 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -82,6 +82,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Trait and classes representing the internal state of the store */ trait STATE + case object INITIALIZED extends STATE case object UPDATING extends STATE case object COMMITTED extends STATE case object ABORTED extends STATE @@ -89,7 +90,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private val newVersion = version + 1 private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}") private lazy val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) - @volatile private var state: STATE = UPDATING + @volatile private var state: STATE = INITIALIZED @volatile private var finalDeltaFile: Path = null override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId @@ -98,8 +99,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit mapToUpdate.get(key) } + private def isUpdatable: Boolean = state == INITIALIZED || state == UPDATING + override def put(key: UnsafeRow, value: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot put after already committed or aborted") + verify(isUpdatable, "Cannot put after already committed or aborted") + state = UPDATING val keyCopy = key.copy() val valueCopy = value.copy() mapToUpdate.put(keyCopy, valueCopy) @@ -107,7 +111,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def remove(key: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or aborted") + verify(isUpdatable, "Cannot remove after already committed or aborted") + state = UPDATING val prevValue = mapToUpdate.remove(key) if (prevValue != null) { writeRemoveToDeltaFile(tempDeltaFileStream, key) @@ -117,14 +122,13 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit override def getRange( start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = { - verify(state == UPDATING, "Cannot getRange after already committed or aborted") + verify(isUpdatable, "Cannot getRange after already committed or aborted") iterator() } /** Commit all the updates that have been made to the store, and return the new version. */ override def commit(): Long = { - verify(state == UPDATING, "Cannot commit after already committed or aborted") - + verify(isUpdatable, "Cannot commit after already committed or aborted") try { finalizeDeltaFile(tempDeltaFileStream) finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile) @@ -140,25 +144,26 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Abort all the updates made on this store. This store will not be usable any more. */ override def abort(): Unit = { - verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") - try { - state = ABORTED - if (tempDeltaFileStream != null) { - tempDeltaFileStream.close() - } - if (tempDeltaFile != null) { - fs.delete(tempDeltaFile, true) + if (state == UPDATING) { + try { + state = ABORTED + if (tempDeltaFileStream != null) { + tempDeltaFileStream.close() + } + if (tempDeltaFile != null) { + fs.delete(tempDeltaFile, true) + } + } catch { + case c: ClosedChannelException => + // This can happen when underlying file output stream has been closed before the + // compression stream. + logDebug(s"Error aborting version $newVersion into $this", c) + + case e: Exception => + logWarning(s"Error aborting version $newVersion into $this", e) } - } catch { - case c: ClosedChannelException => - // This can happen when underlying file output stream has been closed before the - // compression stream. - logDebug(s"Error aborting version $newVersion into $this", c) - - case e: Exception => - logWarning(s"Error aborting version $newVersion into $this", e) + logInfo(s"Aborted version $newVersion for $this") } - logInfo(s"Aborted version $newVersion for $this") } /** From 33ec508f380916e059c48ae06cb0e4019be78e01 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 10 Jul 2017 17:36:13 -0700 Subject: [PATCH 2/3] add test --- .../state/HDFSBackedStateStoreProvider.scala | 18 ++++++++---- .../streaming/state/StateStoreSuite.scala | 29 +++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 500b8ca3a8958..d906eeb72c0f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -80,12 +80,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit class HDFSBackedStateStore(val version: Long, mapToUpdate: MapType) extends StateStore { - /** Trait and classes representing the internal state of the store */ - trait STATE - case object INITIALIZED extends STATE - case object UPDATING extends STATE - case object COMMITTED extends STATE - case object ABORTED extends STATE + import HDFSBackedStateStore._ private val newVersion = version + 1 private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}") @@ -163,6 +158,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit logWarning(s"Error aborting version $newVersion into $this", e) } logInfo(s"Aborted version $newVersion for $this") + } else { + state = ABORTED } } @@ -191,6 +188,15 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } } + object HDFSBackedStateStore { + /** Trait and classes representing the internal state of the store */ + trait STATE + case object INITIALIZED extends STATE + case object UPDATING extends STATE + case object COMMITTED extends STATE + case object ABORTED extends STATE + } + /** Get the state store for making updates to create a new `version` of the store. */ override def getStore(version: Long): StateStore = synchronized { require(version >= 0, "Version cannot be less than 0") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index c2087ec219e57..8adb841bb6f8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -117,6 +117,35 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(getData(provider) === Set("a" -> 20), "snapshotting messed up the data") } + test("state changes") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5) + val getState = PrivateMethod[provider.HDFSBackedStateStore.STATE]('state) + + // read-write store + val store1 = provider.getStore(0) + assert(store1.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) + put(store1, "a", 0) + assert(store1.invokePrivate(getState()) === provider.HDFSBackedStateStore.UPDATING) + store1.commit() + assert(store1.invokePrivate(getState()) === provider.HDFSBackedStateStore.COMMITTED) + + // read-only store + val store2 = provider.getStore(1) + assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) + assert(get(store2, "a") === 0) + assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) + store2.abort() + assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.ABORTED) + + // read-write store + val store3 = provider.getStore(1) + assert(store3.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) + remove(store3, _ == "a") + assert(store3.invokePrivate(getState()) === provider.HDFSBackedStateStore.UPDATING) + store3.commit() + assert(store3.invokePrivate(getState()) === provider.HDFSBackedStateStore.COMMITTED) + } + test("cleaning") { val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5) From 95e04fa04c44c6ed13f91b8e06bf39ffe83719e0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 10 Jul 2017 17:43:10 -0700 Subject: [PATCH 3/3] fix test --- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 8adb841bb6f8d..8b1ddfbddaafb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -132,7 +132,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // read-only store val store2 = provider.getStore(1) assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) - assert(get(store2, "a") === 0) + assert(get(store2, "a").get === 0) assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.INITIALIZED) store2.abort() assert(store2.invokePrivate(getState()) === provider.HDFSBackedStateStore.ABORTED)