Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34893][SS] Support session window natively #33081

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jun 25, 2021

Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

What changes were proposed in this pull request?

This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).

The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).

This PR leverages two different approaches on merging session windows:

  1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
  2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards

First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.

This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.

The state format is versioned, so that we can bring a new state format if we find a better one.

Why are the changes needed?

For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:

  1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
  2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
  3. (flat)MapGroupsWithState is only available in Scala/Java.

With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.

Quoting the query example from test suite:

    val inputData = MemoryStream[(String, Long)]

    // Split the lines into words, treat words as sessionId of events
    val events = inputData.toDF()
      .select($"_1".as("value"), $"_2".as("timestamp"))
      .withColumn("eventTime", $"timestamp".cast("timestamp"))
      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
      .withWatermark("eventTime", "30 seconds")

    val sessionUpdates = events
      .groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
      .agg(count("*").as("numEvents"))
      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
        "numEvents")

which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).

// Split the lines into words, treat words as sessionId of events
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
// Sessionize the events. Track number of events, start and end timestamps of session,
// and report session updates.
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
// If timed out, then remove session and send final update
if (state.hasTimedOut) {
val finalUpdate =
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
state.remove()
finalUpdate
} else {
// Update start and end timestamps in session
val timestamps = events.map(_.timestamp.getTime).toSeq
val updatedSession = if (state.exists) {
val oldSession = state.get
SessionInfo(
oldSession.numEvents + timestamps.size,
oldSession.startTimestampMs,
math.max(oldSession.endTimestampMs, timestamps.max))
} else {
SessionInfo(timestamps.size, timestamps.min, timestamps.max)
}
state.update(updatedSession)
// Set timeout such that the session will be expired if no data received for 10 seconds
state.setTimeoutDuration("10 seconds")
SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
}
}

(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)

Does this PR introduce any user-facing change?

Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".

How was this patch tested?

New test suites. Also tested with benchmark code.

@@ -181,7 +181,8 @@ class UpdatingSessionsIterator(

private val valueProj = GenerateUnsafeProjection.generate(valuesExpressions, inputSchema)
private val restoreProj = GenerateUnsafeProjection.generate(inputSchema,
groupingExpressions.map(_.toAttribute) ++ valuesExpressions.map(_.toAttribute))
groupingWithoutSession.map(_.toAttribute) ++ Seq(sessionExpression.toAttribute) ++
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a case session column is not placed at the end of the key. This line addresses such case.

@HeartSaVioR
Copy link
Contributor Author

Same here; I marked this as draft as other PRs has to be reviewed and merged earlier. I'll rebase this PR once all other PRs are merged.

@SparkQA
Copy link

SparkQA commented Jun 25, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44846/

@SparkQA
Copy link

SparkQA commented Jun 25, 2021

Test build #140314 has finished for PR 33081 at commit af538b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class SessionWindow(timeColumn: Expression, gapDuration: Long) extends UnaryExpression
  • case class SessionWindowStateStoreRestoreExec(
  • case class SessionWindowStateStoreSaveExec(

@HeartSaVioR HeartSaVioR force-pushed the SPARK-34893-SPARK-10816-PR-31570-part-5 branch from af538b9 to 0af2fd1 Compare July 14, 2021 05:15
@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45513/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45513/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Test build #140999 has finished for PR 33081 at commit 0af2fd1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45527/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45529/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45530/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45527/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45530/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45529/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Test build #141015 has finished for PR 33081 at commit 7b74ee8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Test build #141013 has finished for PR 33081 at commit 11b44bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Test build #141022 has finished for PR 33081 at commit e01a366.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45537/

@HeartSaVioR
Copy link
Contributor Author

cc. @viirya @xuanyuanking This is the last PR for session window feature. Please take a look. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45557/

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45557/

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45565/

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45623/

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45623/

@viirya
Copy link
Member

viirya commented Jul 16, 2021

I will take another looks tonight.

@HeartSaVioR HeartSaVioR force-pushed the SPARK-34893-SPARK-10816-PR-31570-part-5 branch from 4a6cff3 to e7a2a37 Compare July 16, 2021 03:13
@HeartSaVioR
Copy link
Contributor Author

Rebased with master branch to see whether the pyspark lint is incurred from other PR and being fixed or not.

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45642/

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Test build #141110 has finished for PR 33081 at commit 4a6cff3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Comment on lines 45 to 47
val providerOptions = Seq(
classOf[HDFSBackedStateStoreProvider].getCanonicalName).map { value =>
(SQLConf.STATE_STORE_PROVIDER_CLASS.key, value.stripSuffix("$"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to test against RocksDBStateStoreProvider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing spot. Nice finding!

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one question about test coverage for RocksDB. Otherwise lgtm. Thanks for the work!

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Test build #141131 has finished for PR 33081 at commit e7a2a37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45657/

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45657/

@HeartSaVioR
Copy link
Contributor Author

GA passed. Thanks! Merging to master/3.2!

HeartSaVioR added a commit that referenced this pull request Jul 16, 2021
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

### What changes were proposed in this pull request?

This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).

The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).

This PR leverages two different approaches on merging session windows:

1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards

First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.

This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.

The state format is versioned, so that we can bring a new state format if we find a better one.

### Why are the changes needed?

For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:

1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.

With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.

Quoting the query example from test suite:

```
    val inputData = MemoryStream[(String, Long)]

    // Split the lines into words, treat words as sessionId of events
    val events = inputData.toDF()
      .select($"_1".as("value"), $"_2".as("timestamp"))
      .withColumn("eventTime", $"timestamp".cast("timestamp"))
      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
      .withWatermark("eventTime", "30 seconds")

    val sessionUpdates = events
      .groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
      .agg(count("*").as("numEvents"))
      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
        "numEvents")
```

which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).

https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala#L66-L105

(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)

### Does this PR introduce _any_ user-facing change?

Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".

### How was this patch tested?

New test suites. Also tested with benchmark code.

Closes #33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f2bf8b0)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing! I merged this into master/3.2.

I filed https://issues.apache.org/jira/browse/SPARK-36172 to deal with documentation - probably need to explain about session window and how to use it in structured streaming guide doc.

@SparkQA
Copy link

SparkQA commented Jul 16, 2021

Test build #141145 has finished for PR 33081 at commit bbade35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 16, 2021

Thank you @HeartSaVioR and @xuanyuanking!

flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
Introduction: this PR is the last part of SPARK-10816 (EventTime based sessionization (session window)). Please refer apache#31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.)

This PR proposes to support native session window. Please refer the comments/design doc in SPARK-10816 for more details on the rationalization and design (could be outdated a bit compared to the PR).

The definition of the boundary of "session window" is [the timestamp of start event ~ the timestamp of last event + gap duration). That said, unlike time window, session window is a dynamic window which can expand if new input row is added to the session. To handle expansion of session window, Spark defines session window per input row, and "merge" windows if they can be merged (boundaries are overlapped).

This PR leverages two different approaches on merging session windows:

1. merging session windows with Spark's aggregation logic (a variant of sort aggregation)
2. updating session window for all rows bound to the same session, and applying aggregation logic afterwards

First one is preferable as it outperforms compared to the second one, though it can be only used if merging session window can be applied altogether with aggregation. It is not applicable on all the cases, so second one is used to cover the remaining cases.

This PR also applies the optimization on merging input rows and existing sessions with retaining the order (group keys + start timestamp of session window), leveraging the fact the number of existing sessions per group key won't be huge.

The state format is versioned, so that we can bring a new state format if we find a better one.

For now, to deal with sessionization, Spark requires end users to play with (flat)MapGroupsWithState directly which has a couple of major drawbacks:

1. (flat)MapGroupsWithState is lower level API and end users have to code everything in details for defining session window and merging windows
2. built-in aggregate functions cannot be used and end users have to deal with aggregation by themselves
3. (flat)MapGroupsWithState is only available in Scala/Java.

With native support of session window, end users simply use "session_window" like they use "window" for tumbling/sliding window, and leverage built-in aggregate functions as well as UDAFs to simply define aggregations.

Quoting the query example from test suite:

```
    val inputData = MemoryStream[(String, Long)]

    // Split the lines into words, treat words as sessionId of events
    val events = inputData.toDF()
      .select($"_1".as("value"), $"_2".as("timestamp"))
      .withColumn("eventTime", $"timestamp".cast("timestamp"))
      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
      .withWatermark("eventTime", "30 seconds")

    val sessionUpdates = events
      .groupBy(session_window($"eventTime", "10 seconds") as 'session, 'sessionId)
      .agg(count("*").as("numEvents"))
      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
        "numEvents")
```

which is same as StructuredSessionization (native session window is shorter and clearer even ignoring model classes).

https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala#L66-L105

(Worth noting that the code in StructuredSessionization only works with processing time. The code doesn't consider old event can update the start time of old session.)

Yes. This PR brings the new feature to support session window on both batch and streaming query, which adds a new function "session_window" which usage is similar with "window".

New test suites. Also tested with benchmark code.

Closes apache#33081 from HeartSaVioR/SPARK-34893-SPARK-10816-PR-31570-part-5.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants