New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution #45109
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only styles, the rationale makes total sense and the code change looks great in overall. Thanks for making this change!
currentStatus = currentStatus.copy(message = message) | ||
/** Extracts observed metrics from the most recent query execution. */ | ||
private def extractObservedMetrics( | ||
lastExecution: QueryExecution): Map[String, Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
val timeTaken = math.max(endTime - startTime, 0) | ||
/** Extracts statistics from the most recent query execution. */ | ||
private def extractExecutionStats( | ||
hasNewData: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
return Map.empty | ||
/** Extract statistics about stateful operators from the executed query plan. */ | ||
private def extractStateOperatorMetrics( | ||
lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
if (!hasNewData) { | ||
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) | ||
private def warnIfFinishTriggerTakesTooLong( | ||
triggerEndTimestamp: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
* Override of finishTrigger to extract the map from IncrementalExecution. | ||
*/ | ||
def finishTrigger( | ||
hasNewData: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
to: StreamProgress, | ||
latest: StreamProgress): Unit = { | ||
def recordTriggerOffsets( | ||
from: StreamProgress, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
* during the execution lifecycle of a batch that is being processed by the streaming query | ||
*/ | ||
abstract class ProgressContext( | ||
id: UUID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
} | ||
|
||
def updateIdleness( | ||
id: UUID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
private def getStartOffset(dataStream: SparkDataStream): OffsetV2 = { | ||
val startOffsetOpt = availableOffsets.get(dataStream) | ||
private def getStartOffset( | ||
execCtx: MicroBatchExecutionContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { | ||
sinkCommitProgress = None | ||
protected def populateStartOffsets( | ||
execCtx: MicroBatchExecutionContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2 more spaces
Could you please rebase the master branch of your fork with the latest in OSS repo, and rebase your PR against the latest master as well? It may resolve issues the CI shows us as failure in linter. |
…ion/StreamExecution
0e3b8ab
to
45ae190
Compare
@HeartSaVioR thanks for your review! I have addressed your comments. PTAL! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI.
The CI only failed from pyspark-connect and failures look to be unrelated (not related to streaming). |
Thanks! Merging to master. |
What changes were proposed in this pull request?
To improve code clarity and maintainability, I propose that we move all the variables that track mutable state and metrics for a streaming query into a separate class. With this refactor, it would be easy to track and find all the mutable state a microbatch can have.
Why are the changes needed?
To improve code clarity and maintainability. All the state and metrics that is needed for the execution lifecycle of a microbatch is consolidated into one class. If we decide to modify or add additional state to a streaming query, it will be easier to determine 1) where to add it 2) what existing state are there.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests should suffice
Was this patch authored or co-authored using generative AI tooling?
No