Skip to content

Conversation

@sahnib
Copy link
Contributor

@sahnib sahnib commented May 31, 2024

What changes were proposed in this pull request?

This PR removes the TimeMode None from supported time modes in TransformWithState operator. A structured streaming query works in either Processing time mode, or event Time mode depending on whether eventTime has been specified, hence this change aligns TimeMode properly with Streaming query semantics.

Note that if eventTimeColumn is specified for output dataset in transformWithState operator. TimeMode defaults to EventTime.

Why are the changes needed?

These changes are needed to align TimeMode values with how time flows in Streaming query.

Does this PR introduce any user-facing change?

Yes, modifies the TimeMode semantic for transformWithState.

How was this patch tested?

All existing unit test pass.

Was this patch authored or co-authored using generative AI tooling?

No

@sahnib sahnib force-pushed the remove-timeMode-None branch from 4981316 to b00a034 Compare May 31, 2024 21:12
@sahnib sahnib marked this pull request as ready for review May 31, 2024 21:14
@sahnib sahnib changed the title Remove TimeMode None from TransformWithState. [SS] Remove TimeMode None from TransformWithState. May 31, 2024
@HyukjinKwon
Copy link
Member

Mind filing a JIRA please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When did we add this? If this is already released out, we can't just remove and break it.

Copy link
Contributor Author

@sahnib sahnib Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added this on April 7, 2024 as part of TTL support for Streaming State with TransformWithState operator. This operator is a new addition to Spark Structured Streaming, and is planned to be released in Spark 4.1. The API is currently in heavy development and evolving. I think its safe to make this change at this point, given its still unreleased in a Spark Major version.

cc: @anishshri-db

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea not released yet. This is only in the 4.0 branch.
@sahnib - while we are here - is there a way to make this package private to sql too on the Java side ?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doesn't support package private with specifying ancestor package. It's always limited to the same package and it won't may not work for our case.
(It might change in recent versions though I haven't heard about such change.)

@sahnib sahnib force-pushed the remove-timeMode-None branch from b00a034 to 05ff15b Compare June 3, 2024 15:02
@sahnib sahnib changed the title [SS] Remove TimeMode None from TransformWithState. [SS][SPARK-48511] Remove TimeMode None from TransformWithState. Jun 3, 2024
@sahnib
Copy link
Contributor Author

sahnib commented Jun 3, 2024

Mind filing a JIRA please?

Filed SPARK-48511. Thanks.

@sahnib
Copy link
Contributor Author

sahnib commented Jun 4, 2024

@HeartSaVioR PTAL, thanks.

@HeartSaVioR HeartSaVioR changed the title [SS][SPARK-48511] Remove TimeMode None from TransformWithState. [SPARK-48511][SS] Remove TimeMode None from TransformWithState. Jun 7, 2024
watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
p.stateInfo.get.operatorId))
}.exists(_ == true)
}.contains(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unrelated change? Please file a new minor PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove these minor changes from this PR.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. Left comments. General comments are 1) whether we can use any time mode where we could use None, or only ProcessingTime, and 2) why we need manual clock while we just removed out time mode None (doesn't sound to be relevant).

class ExpiredTimerInfoImpl(
isValid: Boolean,
expiryTimeInMsOpt: Option[Long] = None,
timeMode: TimeMode = TimeMode.None()) extends ExpiredTimerInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we assume we don't need to provide either it was from event time semantic vs processing time semantic? What was the rationale to add this and why this could be removed while we just remove out None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. TimeMode is passed in the StatefulProcessor.init function to the user. We were also passing it to the ExpiredTimerInfoImpl but the interface ExpiredTimerInfo does not even expose it to the user, and we never set this variable with proper timeMode. Hence, I have removed it. I dont think this was intended to here. cc: @anishshri-db can you confirm?

val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
// need to check the execution plan of the previous batch
execCtx.previousContext.map { plan =>
execCtx.previousContext.exists { plan =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: again, separate minor PR for unrelated change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

Dataset<String> transformWithStateMapped = grouped.transformWithState(
new TestStatefulProcessorWithInitialState(),
TimeMode.None(),
TimeMode.ProcessingTime(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just to be fully sure, either ProcessingTime or EventTime works, do I understand correctly? If either one only works for replacement of None, should be better to be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either ProcessingTime or EventTime works as a replacement.

val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.ProcessingTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same, wanted to know whether this is just a convenience or required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a convenience.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why the test code change is required given the code change is just to remove TimeMode None. Do we fix some test flakiness as well here, or what is the rationale of manual clock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransformWithStateExec currently returns true for ProcessingTime timeMode in shouldRunAnotherBatch. (Note that this behavior is same as FMGWS). I had to add a manual clock to ensure that data processing is triggerred before Check/Expect blocks (otherwise AddData does not finish). Let me know if that answers your question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 7, 2024

Also let's revisit the UX. If they use neither timeout nor TTL, they could just do None regardless they have event time column or not. Given we remove None, what is the expectation of UX? Does user have to specify either one based on what they have (event time column is set or not), even though they never use timeout? If that is the case, that doesn't sound like a better UX.

@sahnib sahnib requested a review from HeartSaVioR June 12, 2024 15:38
@sahnib
Copy link
Contributor Author

sahnib commented Jun 12, 2024

Also let's revisit the UX. If they use neither timeout nor TTL, they could just do None regardless they have event time column or not. Given we remove None, what is the expectation of UX? Does user have to specify either one based on what they have (event time column is set or not), even though they never use timeout? If that is the case, that doesn't sound like a better UX.

Even with TimeMode None -> the query has eventTime (if eventTimeColumn is specified), and processingTime (based on driver's clock). I think this makes TimeMode None confusing.
As a general rule of thumb, customer should set TimeMode as EventTime if eventTimeColumn is specified, else use ProcessingTime.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 13, 2024

Just leaving a history here: currently processing time mode triggers the batch continuously which is not acceptable if there is no timer/TTL to check. We are now discussing how to handle the case.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 22, 2024
@github-actions github-actions bot closed this Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants