[SPARK-40437][SS][PYTHON] Support string representation of durationMs in GroupState.setTimeoutDuration#56178
Conversation
1b06aaa to
d063818
Compare
|
Could a committer please review this? It extends |
|
I think @HyukjinKwon and @HeartSaVioR should have more context as per the discussion in https://issues.apache.org/jira/browse/SPARK-40437 |
| def setTimeoutDuration(self, durationMs: Union[int, str]) -> None: | ||
| """ | ||
| Set the timeout duration in ms for this key. | ||
| Processing time timeout must be enabled. |
There was a problem hiding this comment.
shall we add a versionchanged to doc that str is supported?
There was a problem hiding this comment.
Good point, added! A versionchanged note is useful here because this is a behavioral change to an existing method — users upgrading from an older Spark version would not know that string durations are now accepted unless the API docs call it out explicitly.
…GroupState.setTimeoutDuration Allow `setTimeoutDuration` to accept a Spark interval string (e.g. '5 seconds', '1 hour 30 minutes') in addition to an integer millisecond value, matching the Scala-side overload. A Python parser converts supported time units (weeks, days, hours, minutes, seconds, milliseconds, microseconds) to milliseconds; month/year units and invalid strings raise PySparkValueError. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
d063818 to
646a31c
Compare
What changes were proposed in this pull request?
GroupState.setTimeoutDurationpreviously accepted only an integer milliseconds value. This PRextends it to also accept a Spark interval string (e.g.
"5 minutes","1 hour 30 minutes","1.5 seconds"), matching the behaviour of the Scala API'sGroupStateImpl.setTimeoutDuration(String)overload.Changes:
_parse_timeout_duration(duration: str) -> inthelper inpython/pyspark/sql/streaming/state.pythat converts a Spark interval string to milliseconds.Parsing behaviour mirrors Scala's
IntervalUtils.stringToIntervalandIntervalUtils.getDuration(31 days/month convention for structured streaming watermarks).
setTimeoutDurationto acceptUnion[int, str]and call the helper when a string ispassed.
INVALID_TIMEOUT_DURATION_STRINGerror class topython/pyspark/errors/error-conditions.json.python/pyspark/sql/tests/streaming/test_state.pywith 27 unit tests covering: allsupported units, months/years (31-day convention), negative component offsets, fractional seconds,
leading-dot decimals (
.5 seconds), explicit+/-signs, whitespace between sign andquantity, the
intervalkeyword prefix, compound durations, case-insensitivity, and variousinvalid-input cases.
Why are the changes needed?
The Scala API supports both
setTimeoutDuration(long durationMs)andsetTimeoutDuration(String duration). The Python implementation only supported the integer form,leaving users unable to use human-readable interval strings as described in SPARK-40437.
Does this PR introduce any user-facing change?
Yes.
GroupState.setTimeoutDurationnow also accepts a Spark interval string such as"5 minutes"or"1 hour 30 minutes". The integer form continues to work unchanged.This change is relative to the unreleased master branch.
How was this patch tested?
27 new pure-Python unit tests in
python/pyspark/sql/tests/streaming/test_state.py, coveringboth positive cases (all units, compound durations, fractional seconds, edge-case signs and
whitespace) and negative cases (invalid strings, non-positive durations, wrong timeout mode).
Tests can be run without a full Spark build:
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Anthropic)