-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-53941][SS] Support AQE in stateless streaming workloads #52642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6ff7c95
256860b
d342e6f
1764973
658056a
33298a7
db02d0d
f9ec484
3b86678
e373388
34bfd2e
15d99af
55a54b7
0925c92
7797609
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import org.apache.spark.internal.{Logging, LogKeys} | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec | ||
| import org.apache.spark.sql.execution.adaptive.QueryStageExec | ||
| import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec | ||
|
|
||
| /** | ||
| * This is an utility object placing methods to traverse the query plan for streaming query. | ||
| * This is used for patterns of traversal which are repeated in multiple places. | ||
| */ | ||
| object StreamingQueryPlanTraverseHelper extends Logging { | ||
| def collectFromUnfoldedPlan[B]( | ||
| executedPlan: SparkPlan)( | ||
| pf: PartialFunction[SparkPlan, B]): Seq[B] = { | ||
| executedPlan.flatMap { | ||
| // InMemoryTableScanExec is a node to represent a cached plan. The node has underlying | ||
| // actual executed plan, which we should traverse to collect the required information. | ||
| case s: InMemoryTableScanExec => collectFromUnfoldedPlan(s.relation.cachedPlan)(pf) | ||
|
|
||
| // AQE physical node contains the executed plan, pick the plan. | ||
| // In most cases, AQE physical node is expected to contain the final plan, which is | ||
| // appropriate for the caller. | ||
| // Even it does not contain the final plan (in whatever reason), we just provide the | ||
| // plan as best effort, as there is no better way around. | ||
| case a: AdaptiveSparkPlanExec => | ||
| if (!a.isFinalPlan) { | ||
| logWarning(log"AQE plan is captured, but the executed plan in AQE plan is not" + | ||
| log"the final one. Providing incomplete executed plan. AQE plan: ${MDC( | ||
| LogKeys.AQE_PLAN, a)}") | ||
| } | ||
| collectFromUnfoldedPlan(a.executedPlan)(pf) | ||
|
|
||
| // There are several AQE-specific leaf nodes which covers shuffle. We should pick the | ||
| // underlying plan of these nodes, since the underlying plan has the actual executed | ||
| // nodes which we want to collect metrics. | ||
| case e: QueryStageExec => collectFromUnfoldedPlan(e.plan)(pf) | ||
|
|
||
| case p if pf.isDefinedAt(p) => Seq(pf(p)) | ||
| case _ => Seq.empty[B] | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi | |
| import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader | ||
| import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike | ||
| import org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec, TransformWithStateInPySparkExec} | ||
| import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper | ||
| import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata} | ||
| import org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec, SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo, StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec} | ||
| import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec | ||
|
|
@@ -638,10 +639,11 @@ class IncrementalExecution( | |
| def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = { | ||
| val tentativeBatchId = currentBatchId + 1 | ||
| watermarkPropagator.propagate(tentativeBatchId, executedPlan, newMetadata.batchWatermarkMs) | ||
| executedPlan.collect { | ||
| case p: StateStoreWriter => p.shouldRunAnotherBatch( | ||
| watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId, | ||
| p.stateInfo.get.operatorId)) | ||
| }.exists(_ == true) | ||
| StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(executedPlan) { | ||
| case p: StateStoreWriter => p.shouldRunAnotherBatch( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can StateStoreWriter appear in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not, but it's more complicated to reason about when we need to unwrap some nodes and when we don't need to. Unless there is a perf issue, I'd love to see us apply the pattern consistently. |
||
| watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId, | ||
| p.stateInfo.get.operatorId)) | ||
| }.exists(_ == true) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ import org.apache.spark.internal.LogKeys | |
| import org.apache.spark.internal.LogKeys.BATCH_ID | ||
| import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
| import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, GlobalLimit, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} | ||
| import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} | ||
| import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE | ||
| import org.apache.spark.sql.catalyst.util.truncatedString | ||
|
|
@@ -344,9 +344,40 @@ class MicroBatchExecution( | |
| setLatestExecutionContext(execCtx) | ||
|
|
||
| populateStartOffsets(execCtx, sparkSessionForStream) | ||
|
|
||
| // SPARK-53941: This code path is executed for the first batch, regardless of whether it's a | ||
| // fresh new run or restart. | ||
| disableAQESupportInStatelessIfUnappropriated(sparkSessionForStream) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have any plans to try to either remove this or have a way to opt-out since other checks should theoretically be enough when choosing wither to insert an adaptive plan? We (sadly) only recently discovered the performance benefits of manually re-enabling AQE inside a FEB and it would be nice to have that work by default, and I think this is still preventing that from happening?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't prevent you to do that unless your "streaming part" (before FEB) of the query is stateful. Do I miss something?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No that's the case I'm talking about. A common use case for us is a stateful stream with a FEB to merge into a delta table, and enabling AQE inside the batch helps with the merge performance
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is very dependent to the DSv1 sink implementation - FEB sink is somehow safe because we finalize RDD plan for streaming part. If not, it should have been problematic likewise I mentioned in the PR description. I'd still leave this to user's risk - you can still override AQE config in FEB sink after this fix to get the same behavior (which I really hate that config is overridable in FEB sink but...)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess my question is why isn't https://github.com/apache/spark/pull/52642/files#diff-1473ce928d51533500dcd01054219c3b1fc8e4c69c763a68cb01a22e145ce1dcR59-R67 "enough". Why does the AQE setting also need to be disabled? |
||
|
|
||
| logInfo(log"Stream started from ${MDC(LogKeys.STREAMING_OFFSETS_START, execCtx.startOffsets)}") | ||
| execCtx | ||
| } | ||
|
|
||
| private def disableAQESupportInStatelessIfUnappropriated( | ||
| sparkSessionToRunBatches: SparkSession): Unit = { | ||
| def containsStatefulOperator(p: LogicalPlan): Boolean = { | ||
| p.exists { | ||
| case node: Aggregate if node.isStreaming => true | ||
| case node: Deduplicate if node.isStreaming => true | ||
| case node: DeduplicateWithinWatermark if node.isStreaming => true | ||
| case node: Distinct if node.isStreaming => true | ||
| case node: Join if node.left.isStreaming && node.right.isStreaming => true | ||
| case node: FlatMapGroupsWithState if node.isStreaming => true | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about AIPWS and TWSInPandas ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| case node: FlatMapGroupsInPandasWithState if node.isStreaming => true | ||
| case node: TransformWithState if node.isStreaming => true | ||
| case node: TransformWithStateInPySpark if node.isStreaming => true | ||
| case node: GlobalLimit if node.isStreaming => true | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| if (containsStatefulOperator(analyzedPlan)) { | ||
| // SPARK-53941: We disable AQE for stateful workloads as of now. | ||
| logWarning(log"Disabling AQE since AQE is not supported in stateful workloads.") | ||
| sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it makes sense to make this restriction stronger? In addition to set this conf there, we also set this AQE disabled as a property of the query, and perform a conf check every batch. This can prevent users override this config during foreachBatch and save potential support burden. I can add a followup if you think it makes sense
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While I think this direction is great in general, it does not actually prevent the issue completely, because we will check the config change after the change is applied for a single batch. If we want to prevent the issue holistically, we should actually disallow (either ignore with logging, or even fail the query) setting the configs under streaming engine's control in FEB sink. This may need more thought since when user function is executed it's not under streaming engine's control. For example, spark session from given DataFrame in user function should not allow changing a list of configs under streaming engine's control. But even with the above, we can't prevent the case of referencing external spark session in user function. That's more of a fundamental issue, so it'd probably be fine if the solution does not address this. |
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Repeatedly attempts to run batches as data arrives. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.Table | |
| import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream} | ||
| import org.apache.spark.sql.execution.{QueryExecution, StreamSourceAwareSparkPlan} | ||
| import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress} | ||
| import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper | ||
| import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata | ||
| import org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeWatermarkExec, StateStoreWriter} | ||
| import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef | ||
|
|
@@ -443,8 +444,8 @@ abstract class ProgressContext( | |
|
|
||
| val sources = newData.keys.toSet | ||
|
|
||
| val sourceToInputRowsTuples = lastExecution.executedPlan | ||
| .collect { | ||
| val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(lastExecution.executedPlan) { | ||
| case node: StreamSourceAwareSparkPlan if node.getStream.isDefined => | ||
| val numRows = node.metrics.get("numOutputRows").map(_.value).getOrElse(0L) | ||
| node.getStream.get -> numRows | ||
|
|
@@ -502,12 +503,13 @@ abstract class ProgressContext( | |
| // It's possible that multiple DataSourceV2ScanExec instances may refer to the same source | ||
| // (can happen with self-unions or self-joins). This means the source is scanned multiple | ||
| // times in the query, we should count the numRows for each scan. | ||
| val sourceToInputRowsTuples = lastExecution.executedPlan.collect { | ||
| case s: MicroBatchScanExec => | ||
| val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) | ||
| val source = s.stream | ||
| source -> numRows | ||
| } | ||
| val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(lastExecution.executedPlan) { | ||
| case s: MicroBatchScanExec => | ||
| val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) | ||
| val source = s.stream | ||
| source -> numRows | ||
| } | ||
| logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t")) | ||
| sumRows(sourceToInputRowsTuples) | ||
| } else { | ||
|
|
@@ -544,7 +546,10 @@ abstract class ProgressContext( | |
| val finalLogicalPlan = unrollCTE(lastExecution.logical) | ||
|
|
||
| val allLogicalPlanLeaves = finalLogicalPlan.collectLeaves() // includes non-streaming | ||
| val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() | ||
| val allExecPlanLeaves = StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(lastExecution.executedPlan) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be slow to traverse the plan all the time?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That should be OK, as here we only add unwrapping AQE node. The change here won't be a noticeable overhead. (That said, if we could have a new mechanism which does not traverse the executed plan to collect the accumulators but know accumulators to read after the execution, that would remove out the cost of traversing the plan. That would be non-small change.) |
||
| case p if p.children.isEmpty => p | ||
| } | ||
| if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { | ||
| val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { | ||
| case (_, ep: MicroBatchScanExec) => | ||
|
|
@@ -580,10 +585,11 @@ abstract class ProgressContext( | |
| private def extractStateOperatorMetrics( | ||
| lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = { | ||
| assert(lastExecution != null, "lastExecution is not available") | ||
| lastExecution.executedPlan.collect { | ||
| case p if p.isInstanceOf[StateStoreWriter] => | ||
| p.asInstanceOf[StateStoreWriter].getProgress() | ||
| } | ||
| StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(lastExecution.executedPlan) { | ||
| case p if p.isInstanceOf[StateStoreWriter] => | ||
| p.asInstanceOf[StateStoreWriter].getProgress() | ||
| } | ||
| } | ||
|
|
||
| /** Extracts statistics from the most recent query execution. */ | ||
|
|
@@ -609,8 +615,8 @@ abstract class ProgressContext( | |
| return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp, sinkOutput) | ||
| } | ||
|
|
||
| val eventTimeStats = lastExecution.executedPlan | ||
| .collect { | ||
| val eventTimeStats = StreamingQueryPlanTraverseHelper | ||
| .collectFromUnfoldedPlan(lastExecution.executedPlan) { | ||
| case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => | ||
| val stats = e.eventTimeStats.value | ||
| Map( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have a dedicated config for streaming AQE? It seems overkill to turn off AQE entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with introducing a config to control over streaming AQE - should we do this as layered configs? e.g. If AQE is turned off via spark.sql.adaptive.enabled, we turn off both batch and streaming. The config for streaming AQE takes effect only when the AQE config spark.sql.adaptive.enabled is turned on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
layered config SGTM.