Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10816][SS] SessionWindow support for Structure Streaming #31570

Closed
wants to merge 34 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Feb 16, 2021

What changes were proposed in this pull request?

This patch proposes to add session window feature to Structured Streaming.

The code is based on previous PR #22583. Credits to original authors and they should be listed as major authors in the commit.

Changes:

  • Remove unnecessary partial aggregation.
  • Configurable state store format: in case we want to use different state format for session window in the next releases. We should let it configurable like aggregation. Current the format config value is 1.
  • Replace state store format: session windows per key are not stored as a long array anymore. Instead, we store states to two state stores, one for key -> list of start time, and one for (key, start time) -> state value.

Why are the changes needed?

Support session window feature in Structure Streaming. This feature is well-known one in streaming processing. Based on previous discussion and new customer requirement, it seems to be a solid reason to have this in Spark.

Does this PR introduce any user-facing change?

Yes, users may be able to use session window feature in Structure Streaming.

How was this patch tested?

Unit test currently. We may put it to staging test internally.

liyuanjian and others added 17 commits September 28, 2018 14:21
Change-Id: Ie9669ee7827f0e52d2837ddd8a840a07543850ba
1. acu-bsc-21, SessionWindowStateStoreRestore by huangtengfei02 <huangtengfei02@baidu.com>
2. acu-bsc-22, SessionWindowStateStoreSave of complete / append mode
3. acu-bsc-18, new Strategy for Streaming query with SessionWindow
4. acu-bsc-17, generate new aggregate physical plan in AggUtils
…tate in append mode

Change-Id: I93bb8435c6ff667ee025a0659cfec2e925e984dd
acu-bsc-75, fix bug in SessionWindowStateStoreSaveExec, when update state in append mode
Change-Id: I564062ba02947b133d9d850977d38470dc84ef5e
Fix performance problem in SessionWindow.
)

Change-Id: I0f7e420049f7c1da45257505e6fea4804c8d9fd3
@SparkQA
Copy link

SparkQA commented Feb 16, 2021

Test build #135160 has started for PR 31570 at commit a5efe1b.

Comment on lines +266 to +269
* Plans a streaming aggregation with Session Window using the following progression:
* - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
* - Partial Aggregation (now there is at most 1 tuple per group)
* - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from
Copy link
Member Author

@viirya viirya Feb 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This process was simplified, compared with original PR. Removed unnecessary PartialMerges so the session window streaming aggregation has the same physical structure as general streaming aggregation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it removed unnecessary PartialMerge, I think this could improve the performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we see a bottleneck in the future. We can add the pre-shuffle PartialMerge back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @xuanyuanking stated, we can say that's a simplification, but it is arguable that PartialMerge is "unnecessary". The sort operation is necessary to aggregate before shuffling so that is a trade-off, but it's known that less amount of shuffle data brings performance benefits.

@viirya viirya changed the title [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming Feb 16, 2021
@SparkQA

This comment has been minimized.

@viirya
Copy link
Member Author

viirya commented Feb 16, 2021

cc @xuanyuanking

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@viirya
Copy link
Member Author

viirya commented Feb 16, 2021

retest this please

@SparkQA
Copy link

SparkQA commented Mar 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40734/

@SparkQA
Copy link

SparkQA commented Mar 17, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40734/

@SparkQA
Copy link

SparkQA commented Mar 17, 2021

Test build #136153 has finished for PR 31570 at commit cf65c2a.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • case class WindowRecord(start: Long, end: Long, isNew: Boolean, row: UnsafeRow)

@viirya viirya changed the title [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming \[SPARK-10816][SS] SessionWindow support for Structure Streaming Mar 17, 2021
@viirya viirya changed the title \[SPARK-10816][SS] SessionWindow support for Structure Streaming [SPARK-10816][SS] SessionWindow support for Structure Streaming Mar 17, 2021
@viirya
Copy link
Member Author

viirya commented Mar 17, 2021

@xuanyuanking @HeartSaVioR State store format config was added. State store format was revised with two stores instead of a long array of all states together. Appreciate if you have time to look at this.

cc @gaborgsomogyi @zsxwing @tdas @Ngone51

@xuanyuanking
Copy link
Member

I'm reviewing this. Thank you @viirya !!

@HeartSaVioR
Copy link
Contributor

Thanks for taking up the missing piece in SS! I'm happy to see we continue the investment for this feature. Let me take a look.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finish a rough round. Will take a detailed look for StreamingSessionWindowStateManager.

trait StateStoreType

/** Helper trait for invoking common functionalities of a state store. */
abstract class StateStoreHandler extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we can describe a little more how we reuse the original streaming join StateStoreHandler. Seems it's an important implementation detail.

*
* Overridden by concrete implementations of SparkPlan.
*/
override protected def doExecute(): RDD[InternalRow] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another difference with the @HeartSaVioR's. We use the pattern of SQL window function, while Jungtaek's one is a sub-class for aggregation iterator.
Pros:
Less code change. Don't mess up with aggregation logic.
Cons:
We keep the original row for a while. Might involve more memory cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That may not just bring additional memory cost. That may bring spill, which is something we'd like to avoid at all cost. There's a trade-off, complexity vs optimization.

I'm OK to move forward to make it work, and evaluate the value of the trade-off. Except state format we could change everything afterwards, so OK with that.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if I understand correctly, this needs to copy all input rows because of the buffering. Now I could reload the context of my patch a bit; MergingSessionsIterator only copies the rows which are the first row of session, as it only needs to retain the last session to compare with current input row. That was the reason I chose such complexity. Performance wise, and also there was concerns on JIRA issue about memory usage on the flight.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's valid to compare this with UpdatingSessionIterator, which is used to support aggregation with one distinct and technically does the same with this. The major difference between twos looks to be that UpdatingSessionIterator doesn't try to memorize entire parts of row - it only memorizes the value part of row, as key part should be just all same with current session, and when session is closed, it restores the memorized rows with key & value parts.

I guess there're pros and cons against twos but not that outstanding (memory usage may be better for UpdatingSessionIterator, but we have to pay cost for restoring), and at least this is simpler, I'm OK to pick this up for the replacement of UpdatingSessionIterator. I'm feeling that MergingSessionsIterator is something we should revisit, but as I said, if we feel that's a blocker on moving forward, I'll take it up after this lands to the codebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. It is a trade-off. For session window use case, I image that most cases the buffer should not hold too many rows. It's hardly to think about that a session window has rows causing serious problem. Sounds like a rare case to me.

I think I understand the design thinking here. The approach MergingSessionsIterator takes increases complexity by mixing aggregating and session window operations. If don't mix with aggregation, currently I don't have idea how to avoid buffering. I may not worry much too early about this before we have real customer complaints on this issue.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a one of concerns. Another concern is, to buffer row you'll need to "copy" the row, which makes entire input rows going through buffer being copied. (Doesn't matter how many rows are buffered at specific time.) I see there're multiple physical ops to buffer rows, which makes me wondering about the performance and resource usage.

I'll need to check the performance is really on par with mine - I think the major complexity of mine was introduced on linked-list of state format. Migrating state format to the one in agreement here would reduce the complexity significantly, so after applying the change on mine, we could reevaluate both properly.


// Data should be sorted, so we can merge session window directly.
// TODO: use this requirement for simplicity, not necessary to sort the whole dataset,
// try better way later.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we can already get rid of this ordering. Due to the session window in state store is in order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, we still have to sort to handle new event earlier than session stored in state store. "event time processing" means events can be "out of order".

Comment on lines +266 to +269
* Plans a streaming aggregation with Session Window using the following progression:
* - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
* - Partial Aggregation (now there is at most 1 tuple per group)
* - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we see a bottleneck in the future. We can add the pre-shuffle PartialMerge back.

* as the current Aggregate's new child. It will throw [[AnalysisException]] while
* [[SessionWindowExpression]] is the only column in group by.
*/
object ResolveSessionWindow extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the logic in TimeWindowing is a bit long and you may not want to add additional complexity there, but I feel this still has to be consolidated with TimeWindowing (that said TimeWindowing needs to refactor a bit after adding logic on session window). Because there're lots of similarities and the limitation is applied altogether, like only allowing a time window should include session window, and vice versa.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, at least counting windows should consider both.

}.isDefined
}

private final val WINDOW_COL_NAME = "session_window"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how to deal with meta fields (start, end) is slightly different from time window. Would the difference be bugging when we consolidate twos into one? I have no specific preference, but would like to be consistent with both so that we don't deal with differences in maintenance.

That said, let's change both (OK to do it on follow-up PR), or follow the current approach.

private final val WINDOW_COL_NAME = "session_window"

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case p @ Aggregate(groupingExpr, aggregateExpr, _) if hasWindowFunction(groupingExpr) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasWindowFunction and windowExpressions should count time window as well. I guess it'll be simpler to resolve when we consolidate, but it would depend on how more complicated it will become.

// check partitionExpression in groupingExpr
val partitionExpression = groupingExpr.filterNot(hasWindowFunction)
if (partitionExpression.isEmpty) {
p.failAnalysis("Cannot use session_window as the only group by column.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is less clearer, probably better to mention another key column(s) are required? Like "Cannot use session_window without additional key column(s)", or if we assume end users know about the concept of "global aggregation", "Cannot apply session_window on global aggregation".

@@ -94,7 +94,47 @@ case class TimeWindow(
}
}

object TimeWindow {
case class SessionWindowExpression(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's OK to just say SessionWindow like we do for TimeWindow, but if we don't feel it's clear, we need to rename TimeWindow to TimeWindowExpression as well for consistency.

@@ -673,6 +673,19 @@ case class Window(
def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
}

case class SessionWindow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK this is conflicting with SessionWindowExpression when renaming. OK to leave as it is.


override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made me stop to think, and looks like this is guaranteed per micro-batch. Probably ideal to leave a brief explanation how it is guaranteed.


override def outputOrdering: Seq[SortOrder] = child.outputOrdering

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't seem to be needed.

* The physical plan for streaming query, merge session window after restore from state store.
* Note: the end time of window that restore from statestore has already contain session windowGap
*
* @param windowExpressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add explanation on each param, or simply remove

@@ -348,7 +348,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
rewrittenResultExpressions,
stateVersion,
planLater(child))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing. Please consider the files what I've not commented are not reviewed yet.

Comment on lines +266 to +269
* Plans a streaming aggregation with Session Window using the following progression:
* - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
* - Partial Aggregation (now there is at most 1 tuple per group)
* - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @xuanyuanking stated, we can say that's a simplification, but it is arguable that PartialMerge is "unnecessary". The sort operation is necessary to aggregate before shuffling so that is a trade-off, but it's known that less amount of shuffle data brings performance benefits.

*
* Overridden by concrete implementations of SparkPlan.
*/
override protected def doExecute(): RDD[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That may not just bring additional memory cost. That may bring spill, which is something we'd like to avoid at all cost. There's a trade-off, complexity vs optimization.

I'm OK to move forward to make it work, and evaluate the value of the trade-off. Except state format we could change everything afterwards, so OK with that.


test("session window in SQL with single key as session window key") {
withTempTable { table =>
val a = spark.sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead code?

numOutputRows += 1
Seq(row)
} else {
val outputs = savedState :+ row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, I don't think this retains the overall order hence requiring additional sort. savedState can be injected anywhere in input rows.

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Mar 19, 2021
HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Mar 20, 2021
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 20, 2021

I did some performance tests I did before, and I observed outstanding difference between revised my PR (my PR + state format used here) vs this PR.

revised my PR (versioned as 3.2.0-SPARK-10816-heartsavior)

https://github.com/HeartSaVioR/spark/tree/SPARK-10816-heartsavior-rebase-apply-PR-31570-versioned

this PR (versioned as 3.2.0-PR-31570)

https://github.com/HeartSaVioR/spark/tree/PR-31570-versioned

benchmark code

https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/tree/benchmarking-SPARK-10816

I built the benchmark code against locally installed Spark artifacts for both (that said, I built the benchmark code per each).

Simple, change built.sbt to update Spark version to the custom one, and run sbt clean assembly.

machine to run benchmark

  • AMD Ryzen 5600X (no overclock, 3.7 Ghz to 4.6 Ghz, 6 physical cores, 12 logical cores)
  • DDR4 3200Mhz 16 GB * 2
  • Ubuntu 20.04

Giving local[*] showed instability on performance so fixed the value to 8. There're not many physical cores so I reduced the number of partitions down to 5 as well.

plenty of rows in session

./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_rows_in_session.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/a.json --rate-row-per-second 200000 --rate-ramp-up-time-second 10

plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt

plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt

  • mine showed 160,000+ on processedRowsPerSecond.
  • PR-31570 didn't reach 60,000 on processedRowsPerSecond.

mine showed 150% (2.5x) ~ 200% (3x) faster.

plenty of keys

./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_keys.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/b.json --rate-row-per-second 15000000 --rate-ramp-up-time-second 10

plenty-of-keys-append-mode-mine-rate-15000000-v1.txt

plenty-of-keys-append-mode-PR-31570-rate-15000000-v1.txt

  • mine showed "over" 13,000,000 on processedRowsPerSecond. (max peak exceeds 15,000,000)
  • PR-31570 didn't reach 9,000,000 on processedRowsPerSecond.

mine showed around 50%+ (1.5x) faster.

It'd be appreciated if anyone in reviewing can take the chance on performance test on their site and update the result. I'd love to see the result objecting my perf test (either my tests with different env/config or new tests), but if no one proves the result objecting mine, I guess we all know we need to make effort on the right direction.

@viirya
Copy link
Member Author

viirya commented Mar 22, 2021

Thanks for re-evaluating two approaches. It is valuable.

Basically by leveraging the new state store format, two previous efforts are now pretty close, except for how they handle session merging. I can easily replace some exec nodes here from the other PR.

No worry. The precondition to picking the simpler approach, is that two approaches have similar performance. I remember this was claimed in the JIRA. Re-evaluation gives us a different number.

I ran the benchmark locally. Due to the difference of machines, I cannot get the same numbers but I can see there is significant difference between two approaches, i.e., 1) merging then aggregating, 2) merging with aggregating.

I think we have a few options.

  1. Replace with merging with aggregating (MergingSessionsIterator). I'm doing it locally to see if we can get a similar number. It'd be good too @HeartSaVioR would like to create PR against this. So it is easier to incorporate authored commits from all parties. It is also fine if @HeartSaVioR wants to work on it after merging this.
  2. Switch to the other previous effort + new state store format.

Either works for me. Actually two options are basically the same logic to me, except for some cosmetic difference.

@xuanyuanking WDYT?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 22, 2021

I can provide a new PR based on mine + proposed state format here. (For sure, I'm willing to add both of you to co-authors.)

Now I'm dealing with update mode (required) and probably supporting global aggregation on streaming query (optional) which might take a couple of days, but I can raise a PR on WIP and continue working on it as well.

@xuanyuanking
Copy link
Member

xuanyuanking commented Mar 22, 2021

@viirya @HeartSaVioR Agree with both of you. I'm also running the local benchmark. The only concern is on the MergingSessionsIterator detail, it would be great if we can separate the code in the first stage and maybe refractory later.

I'm dealing with update mode (required)

If it takes too much time, I think the append mode and complete mode (maybe also optional) are good enough.

@viirya
Copy link
Member Author

viirya commented Mar 22, 2021

@HeartSaVioR @xuanyuanking Okay. Sounds good to me. I also think append and complete mode might be enough and we can work on update mode later if it takes longer.

@HeartSaVioR
Copy link
Contributor

Just submitted mine - #31937
Update mode is addressed. Global aggregation for streaming query is not supported yet, though it's still supported on batch query, so probably wouldn't matter.

@xuanyuanking
Copy link
Member

Just verified the benchmark. Similar results on my side: The plenty of rows in the session test shows about 2X (70000 vs 37000). I can see many spilling logs for ExternalAppendOnlyMap. Maybe I can do some turning and performance analysis later, will review #31937 first.

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45109/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45109/

@viirya viirya closed this Jul 13, 2021
@viirya viirya deleted the SPARK-10816-pr22583 branch December 27, 2023 18:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants