Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11290][STREAMING] Basic implementation of trackStateByKey #9256

Closed
wants to merge 27 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Oct 23, 2015

Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons.

  • Need for more optimized state management that does not scan every key
  • Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state

The high level idea that of this PR

  • Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts.
  • Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data.
    Here is the detailed design doc. Please take a look and provide feedback as comments.
    https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em

This is still WIP. Major things left to be done.

  • Implement basic functionality of state tracking, with initial RDD and timeouts
  • Unit tests for state tracking
  • Unit tests for initial RDD and timeout
  • Unit tests for TrackStateRDD
    • state creating, updating, removing
    • emitting
    • checkpointing
  • Misc unit tests for State, TrackStateSpec, etc.
  • Update docs and experimental tags

@tdas tdas changed the title [SPARK-2629][STREAMING] Basic implementation of trackStateByKey [SPARK-2629][STREAMING] Basic implementation of trackStateByKey [WIP] Oct 23, 2015
@tdas
Copy link
Contributor Author

tdas commented Oct 23, 2015

PR is out - @pwendell @rxin @mateiz

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44266 has finished for PR 9256 at commit bd9cd94.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister\n * class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)\n * sealed abstract class State[S]\n * case class TrackStateSpecImpl[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@inline final def getOption(): Option[S] = Option(get())

/** Get the state if it exists, otherwise return the default value */
@inline final def getOrElse[S1 >: S](default: => S1): S1 = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure is this "call-by-name" parameter Java friendly? Assuming this State should also be used in Java code :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is probably a valid concern. If users cannot call it from Java, this could be a Scala-only thing.

@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44807 has finished for PR 9256 at commit be8cffc.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * case class TrackStateSpecImpl[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 2, 2015

Test build #44808 has finished for PR 9256 at commit 6c02f44.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * case class TrackStateSpecImpl[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44911 has finished for PR 9256 at commit df927ba.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * case class TrackStateSpecImpl[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 3, 2015

Test build #44913 has finished for PR 9256 at commit 6a75966.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * case class TrackStateSpecImpl[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * abstract class EmittedRecordsDStream[K: ClassTag, V: ClassTag, S: ClassTag, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

trackingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[EmittedType]
): StateSpec[KeyType, ValueType, StateType, EmittedType] = {
new StateSpecImpl[KeyType, ValueType, StateType, EmittedType](trackingFunction)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO don't have apply(), just create()

@SparkQA
Copy link

SparkQA commented Nov 8, 2015

Test build #45310 has finished for PR 9256 at commit a78130d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * sealed abstract class StateSpec[K, V, S, T] extends Serializable\n * case class StateSpecImpl[K, V, S, T](\n * sealed abstract class EmittedRecordsDStream[K, S, T: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45527 has finished for PR 9256 at commit fb5a296.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * sealed abstract class StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable\n * case class StateSpecImpl[K, V, S, T](\n * sealed abstract class TrackStateDStream[KeyType, StateType, EmittedType: ClassTag](\n * class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45545 has finished for PR 9256 at commit f1a6696.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * sealed abstract class StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable\n * case class StateSpecImpl[K, V, S, T](\n * sealed abstract class TrackStateDStream[KeyType, StateType, EmittedType: ClassTag](\n * class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

*/
def function[ValueType, StateType, EmittedType](
trackingFunction: (Option[ValueType], State[StateType]) => EmittedType
): StateSpec[Any, ValueType, StateType, EmittedType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StateSpec[Any, ValueType, StateType, EmittedType] will require the user to cast the type if he wants to use the key type. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid point. Somehow the unit test that covers this API did not fail on this. This will be problem. Now if we take KeyType as part of the types in function, then scala compiler will not be able to infer the key-type (as its not used in the function), and the user will have to explicitly specify the types. This is a little cumbersome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But without fixing this, it become more complicated for Java users. So its best to fix this. Also I tested, the type inference works when the StateSpec is defined inline with the trackStateByKey

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45550 has finished for PR 9256 at commit 77c9a66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * sealed abstract class StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable\n * case class StateSpecImpl[K, V, S, T](\n * sealed abstract class TrackStateDStream[KeyType, StateType, EmittedType: ClassTag](\n * class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45572 has finished for PR 9256 at commit ae64786.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * sealed abstract class State[S]\n * sealed abstract class StateSpec[KeyType, ValueType, StateType, EmittedType] extends Serializable\n * case class StateSpecImpl[K, V, S, T](\n * sealed abstract class TrackStateDStream[KeyType, ValueType, StateType, EmittedType: ClassTag](\n * class InternalTrackStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](\n * case class StateInfo[S](\n * class LimitMarker(val num: Int) extends Serializable\n

private var prevStateRDD: RDD[TrackStateRDDRecord[K, S, T]],
private var partitionedDataRDD: RDD[(K, V)],
trackingFunction: (Time, K, Option[V], State[S]) => Option[T],
batchTime: Time, timeoutThresholdTime: Option[Long]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: timeoutThresholdTime should be in a new line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no doc for timeoutThresholdTime

@zsxwing
Copy link
Member

zsxwing commented Nov 11, 2015

@tdas, looks good. My comments are minor. There are some unnecessary Java APIs in this PR. But I can fix them while I'm adding the Java APIs.

@tdas
Copy link
Contributor Author

tdas commented Nov 11, 2015

All right. I am merging this for now. Can you take care of these last two comments in your follow up PR on the Java API? Also, I have a few more changes, refactorings and more unit tests that I am working on, but that can be done later as well.

Merging to master and 1.6. Thank you very much @mateiz, @rxin, @pwendell and @zsxwing for reviewing this massive PR.

@tdas tdas changed the title [SPARK-2629][STREAMING] Basic implementation of trackStateByKey [WIP] [SPARK-2629][STREAMING] Basic implementation of trackStateByKey Nov 11, 2015
@tdas tdas changed the title [SPARK-2629][STREAMING] Basic implementation of trackStateByKey [SPARK-11290][STREAMING] Basic implementation of trackStateByKey Nov 11, 2015
@asfgit asfgit closed this in 99f5f98 Nov 11, 2015
asfgit pushed a commit that referenced this pull request Nov 11, 2015
Current updateStateByKey provides stateful processing in Spark Streaming. It allows the user to maintain per-key state and manage that state using an updateFunction. The updateFunction is called for each key, and it uses new data and existing state of the key, to generate an updated state. However, based on community feedback, we have learnt the following lessons.
* Need for more optimized state management that does not scan every key
* Need to make it easier to implement common use cases - (a) timeout of idle data, (b) returning items other than state

The high level idea that of this PR
* Introduce a new API trackStateByKey that, allows the user to update per-key state, and emit arbitrary records. The new API is necessary as this will have significantly different semantics than the existing updateStateByKey API. This API will have direct support for timeouts.
* Internally, the system will keep the state data as a map/list within the partitions of the state RDDs. The new data RDDs will be partitioned appropriately, and for all the key-value data, it will lookup the map/list in the state RDD partition and create a new list/map of updated state data. The new state RDD partition will be created based on the update data and if necessary, with old data.
Here is the detailed design doc. Please take a look and provide feedback as comments.
https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em

This is still WIP. Major things left to be done.
- [x] Implement basic functionality of state tracking, with initial RDD and timeouts
- [x] Unit tests for state tracking
- [x] Unit tests for initial RDD and timeout
- [ ] Unit tests for TrackStateRDD
       - [x] state creating, updating, removing
       - [ ] emitting
       - [ ] checkpointing
- [x] Misc unit tests for State, TrackStateSpec, etc.
- [x] Update docs and experimental tags

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #9256 from tdas/trackStateByKey.

(cherry picked from commit 99f5f98)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
asfgit pushed a commit that referenced this pull request Nov 12, 2015
Should not create SparkContext in the constructor of `TrackStateRDDSuite`. This is a follow up PR for #9256 to fix the test for maven build.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9668 from zsxwing/hotfix.

(cherry picked from commit f0d3b58)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
asfgit pushed a commit that referenced this pull request Nov 12, 2015
Should not create SparkContext in the constructor of `TrackStateRDDSuite`. This is a follow up PR for #9256 to fix the test for maven build.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9668 from zsxwing/hotfix.
dskrvk pushed a commit to dskrvk/spark that referenced this pull request Nov 13, 2015
Should not create SparkContext in the constructor of `TrackStateRDDSuite`. This is a follow up PR for apache#9256 to fix the test for maven build.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#9668 from zsxwing/hotfix.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants