Skip to content

Conversation

@jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Sep 16, 2024

What changes were proposed in this pull request?

This PR adds support to define event time column in the output dataset of TransformWithStateInPandas operator. The new event time column will be used to evaluate watermark expressions in downstream operators.

Why are the changes needed?

This change is to couple with the scala implementation of chaining of operators. PR in Scala: #45376

Does this PR introduce any user-facing change?

Yes. User can now specify a event time column as:

df.groupBy("id")
  .transformWithStateInPandas(
      statefulProcessor=stateful_processor,
      outputStructType=output_schema,
      outputMode="Update",
      timeMode=timeMode,
      eventTimeColumnName="outputTimestamp"
  )

How was this patch tested?

Integration tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@jingz-db jingz-db changed the title [SS] Add Support for Chaining of Operators in transformWithStateInPandas API [SPARK-49676][SS][PYTHON] Add Support for Chaining of Operators in transformWithStateInPandas API Sep 16, 2024
@jingz-db jingz-db marked this pull request as ready for review September 16, 2024 20:21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why wait for 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for leaving a comments on the state-v2 PR! Our test suite is not very large and 10 seconds should be quite sufficient to finish. I am also following the same set ups in the whole suite - you may find other test case is also using 10.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR is fairly light-weighted, should we include that check in the same PR? If not, please create a Jira to track the TODO and link it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also the way it is on Scala side here. Let me check if there is any specific reason for Bhuwan to leave this as a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed with Anish - the way we have it today is that we'll keep constructing new batches because we may have future timer expiring. I sent out a SPARK JIRA to keep track of this issue: https://issues.apache.org/jira/browse/SPARK-50180 and will update the comments in both Scala and Python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR is fairly light-weighted, should we include that check in the same PR?

Some additional testing on AvailableNow trigger mode may be needed together with fixing the issue and it exceeds the description of this PR. I'll leave it as it is today and let's keep track of the fix in the SPARK-50180

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate more on why we need 2 entries for TransformWithStateInPandasExec? Also, can we have TransformWithStateInPandasExec specific comments? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Deleted a case class and will add more comments.

@jingz-db jingz-db requested a review from bogao007 October 30, 2024 23:59
Copy link
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Haven't looked into test in detail as addressing review comments would simply the changeset. I'll revisit the test once the PR is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EliminateEventTimeWatermark is no longer needed as we now have this in Analyzer rule.

But maybe we could deal with this in separate PR to address other cases as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EliminateEventTimeWatermark is no longer needed as we now have this in Analyzer rule.

Is this already done? I saw we still keep the TODO here:

* TODO: add this rule into analyzer rule list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I forgot to remove the TODO. My bad. It's added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any caller for this. Have you missed to add caller?

Copy link
Contributor Author

@jingz-db jingz-db Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... It got lost in a resolve of merge conflicts. I just added back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably linter may complain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran the linter check and seems not complaining... If it is OK with you, I'll leave it here and see if it passes on the CI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this in all places, shall we find the way for pytest to perform the same with before() / after()?

Also, since we are reading from files, why not just use Trigger.AvailableNow() which will terminate the query automatically instead of doing this? sleep(10) won't be necessary as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trigger.AvailableNow() will also keep making new batches... I had a discussion with Bo previously here: #48124 (comment)
Not sure if we can do this in the in the after() as the query will keep making new batches because shouldRunAnotherBatch will always return True for processingTime mode. Scala side doesn't have the issue because we are using the testStream, CheckAnswer framework and it will shut down the query explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only happening with processing time timer mode and I don't expect a lot of tests would use that.

But I'm fine either way. I'm OK to use the same approach for all timer modes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I wonder how we could miss this... Nice catch! I was about to ask for a new test covering this, but I'll revisit once you update the test as your new test seems to also contain this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included a check on eviction of late events in the test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why Project is necessary between UpdateEventTimeColumn and TransformWithStateInPandas - could you please check the value of projectList? If that value is the same with output of TransformWithStateInPandasExec, optimizer might be able to remove out Project (in future). If that's the case, we may want to address the pattern of not having a Project in between as well.

Copy link
Contributor Author

@jingz-db jingz-db Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output we got from the projectList variable here is the output schema of the output event time column:

projectList: List(outputTimestamp#15)

I really appreciate you bring the optimization thing up here. After reading your comments, I tried to figure out where this extra projection is coming from and it took me very long to figure out - we did not add this projection anywhere in any of the event time related change. I found this is probably coming from the pushed down projection from the chaining operator. In the test case, groupBy is chained after TWS, and this groupBy only aggregate on some of the output columns of the TWS output. I am not super familiar with optimization of analyzer, but I think eventually this projection is coming from the pushed down filter from the aggregator operator. So we will see the extra layer of Projection here. That being said, the match case class that contains the Projection is not sufficient.
I added another match case class above and I also added another test case to test on cases that matches the situation without the Projection.
I did a quick test on Scala side and it doesn't have this issue. It is because TWS node is wrapped by SerializeFromObjectExec and this probably does not match with some optimizer rule.

But I think the exact match on the ProjectExec is a bit hacky - there is probably more optimization rule I am not aware of. Do you think we should do a recursive match here in case TWS node is (recursively) wrapped by other operators besides Project?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably easier to not allow pushdown between UpdateEventTimeColumnExec and TransformWithStateExec.

I see filter is not allowed to be pushed down below UpdateEventTimeColumnExec, so it is OK. This might be the case of column pruning -

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1036-L1045

This could insert the project down to children nodes; we can disallow this for UpdateEventTimeColumnExec, like following:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1031-L1032

Could you give a try with this and see you don't see Projection? If that works, we can just consider UpdateEventTimeColumnExec and TransformWithStateExec to be always close to each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the code pointers! I added a line in the Optimizer and the the column is now not pruned so UpdateEventTimeColumnExec and TransformWithStateInPandasExec are now placed together. I leave the rule inside Optimizer and IncrementalExecution will only match with only the case where UpdateEventTimeColumnExec and TransformWithStateInPandasExec are placed together with no extra layer in between.

@jingz-db jingz-db requested a review from HeartSaVioR November 12, 2024 02:58
@jingz-db
Copy link
Contributor Author

Hey @HeartSaVioR, could you take another look? Thanks!

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll review the test code once the issue about the output mode is addressed.

.count()
.writeStream.queryName("chaining_ops_query")
.foreachBatch(check_results)
.outputMode("update")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update mode? We don't support multiple stateful operators with update mode. If the query does not break, we are missing whether the operator is stateful or not. We should check UnsupportedOperator.

case p @ Project(_, _: LeafNode) => p

// Can't prune the columns on UpdateEventTimeWatermarkColumn
case p@Project(_, _: UpdateEventTimeWatermarkColumn) => p
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space between p and @, and P

case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true
case d: DeduplicateWithinWatermark if d.isStreaming => true
case t: TransformWithState if t.isStreaming => true
case t: TransformWithStateInPandas if t.isStreaming => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. It is lucky we figured out before releasing the feature. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a comparison with TWS Scala base PR and added a blocker for batch query. Thanks for the reminding!

import pyspark.sql.functions as f

input_path = tempfile.mkdtemp()
self._prepare_test_resource1(input_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We should remove this. This is no-op because the file is overwritten later, but it is definitely confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my new commits! The test suite was still under progress when you made the comments. Feel free to skip the older commits and review the newest change.

self._test_transform_with_state_in_pandas_chaining_ops(
StatefulProcessorChainingOps(), check_results, "eventTime"
)
self._test_transform_with_state_in_pandas_chaining_ops(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: either add some data for another id with having another check_results, or remove this. This is not an exhaustive test anyway as it doesn't really test we group by id as well.

Copy link
Contributor Author

@jingz-db jingz-db Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test case now includes input of different ids. The purpose of this test is checking the column pruning is correctly set and the case class in IncrementalExecution matches with all logical plans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so it's not our main interest to verify that the groupBy will produce the aggregated result per grouping key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is not our main interest as we are testing for event time watermark is propagated properly for TWS here. I believe we should have abundant tests for aggregation already. I am keeping this for safety because previously without the column pruning change, groupby on single column and multiple columns will match different case class in IncrementalExecution.

@jingz-db jingz-db requested a review from HeartSaVioR November 26, 2024 01:04
throwError("dropDuplicatesWithinWatermark is not supported with batch " +
"DataFrames/DataSets")(d)

case t: TransformWithStateInPandas =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a JIRA ticket with TODO like this.

        // TODO(SPARK-40443): support applyInPandasWithState in batch query
        throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")

I see my inconsistency here - the above is in BasicOperators in SparkStrategies.

case _: FlatMapGroupsInPandasWithState =>
// TODO(SPARK-40443): support applyInPandasWithState in batch query
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")

Shall we move this to BasicOperators with adding TODO comment like the above? Let's leave this place to be something there is a reason not to support and won't support in future.

}
assert batch_df.isEmpty()
elif batch_id == 1:
# watermark is 25 - 5 = 20, no more event for eventTime=10
Copy link
Contributor

@HeartSaVioR HeartSaVioR Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not.

  • The watermark for "late events" in batch ID "N" is the watermark for "eviction" in batch "N - 1". (default: 0)
  • The watermark for "eviction" in batch "N" is "calculated" based on the input data in batch "N - 1".

That said, the watermark for "eviction" in batch 1 is, 15 (max event time from batch 0) - 5 = 10. The watermark for "late events" in batch 0 is, 0.

With the same logic, the watermark for "eviction" in batch 2 is, 25 - 5 = 20. The watermark for "late events" in batch 2 is, the watermark for "eviction" in batch 1, hence 10.

Please update the code comment with the correction. I see both comments in batch 1 and 2 are incorrect.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one comment.

throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")
case _: TransformWithStateInPandas =>
// TODO(SPARK-50428): support TransformWithStateInPandas in batch query
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sorry you need to define another error class; it's saying applyInPandasWithState. OK to add another temporary error class as we want to fix it anyway.

Copy link
Contributor Author

@jingz-db jingz-db Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI. Thanks for the patience.

I know it's a tech debt to leave this to temp error class, so let's file a JIRA ticket to change the error class to the non-temp one if we can't find the time to address the support of batch query.

@jingz-db
Copy link
Contributor Author

jingz-db commented Nov 27, 2024

+1 pending CI. Thanks for the patience.

I know it's a tech debt to leave this to temp error class, so let's file a JIRA ticket to change the error class to the non-temp one if we can't find the time to address the support of batch query.

Created here: https://issues.apache.org/jira/browse/SPARK-50429.
cc: @bogao007

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants