Skip to content

Commit

Permalink
[SPARK-46913][SS] Add support for processing/event time based timers …
Browse files Browse the repository at this point in the history
…with transformWithState operator

### What changes were proposed in this pull request?
Add support for processing/event time based timers with `transformWithState` operator

### Why are the changes needed?
Changes are required to add event-driven timer based support for stateful streaming applications based on arbitrary state  API with the `transformWithState` operator

As part of this change - we introduce a bunch of functions that users can use within the `StatefulProcessor` logic. Using the `StatefulProcessorHandle`, users can do the following:
- register timer at a given timestamp
- delete timer at a given timestamp
- list timers

Note that all the above operations are tied to the implicit grouping key.

In terms of the implementation, we make use of additional column families to support the operations mentioned above. For registered timers, we maintain a primary index (as a col family) that keeps the mapping between the grouping key and expiry timestamp. This col family is used to add and delete timers with direct access to the key and also for listing registered timers for a given grouping key using `prefix scan`. We also maintain a secondary index that inverts the ordering of the timestamp and grouping key. We will incorporate the use of the range scan encoder for this col family in a separate PR.

Few additional constraints:
- only registered timers are tracked and occupy storage (locally and remotely)
- col families starting with `_` are reserved and cannot be used as state variables
- timers are checkpointed as before
- users have to provide a `timeoutMode` to the operator. Currently, they can choose to not register timeouts or register timeouts that are processing-time based or event-time based. However, this mode has to be declared upfront within the operator arguments.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added unit tests as well as pseudo-integration tests

StatefulProcessorHandleSuite
```
13:58:42.463 WARN org.apache.spark.sql.execution.streaming.state.StatefulProcessorHandleSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StatefulProcessorHandleSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
[info] Run completed in 4 seconds, 559 milliseconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

TransformWithStateSuite
```
13:48:41.858 WARN org.apache.spark.sql.streaming.TransformWithStateSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.TransformWithStateSuite, threads: QueryStageCreator-0 (daemon=true), state-store-maintenance-thread-0 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), state-store-maintenance-thread-1 (daemon=true), QueryStageCreator-1 (daemon=true), rpc-boss-3-1 (daemon=true), F
orkJoinPool.commonPool-worker-3 (daemon=true), QueryStageCreator-2 (daemon=true), QueryStageCreator-3 (daemon=true), state-store-maintenance-task (daemon=true), ForkJoinPool.com...
[info] Run completed in 1 minute, 32 seconds.
[info] Total number of tests run: 20
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 20, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45051 from anishshri-db/task/SPARK-46913.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Mar 13, 2024
1 parent ebe9f66 commit 839bc9f
Show file tree
Hide file tree
Showing 24 changed files with 1,337 additions and 80 deletions.
18 changes: 18 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3491,6 +3491,24 @@
],
"sqlState" : "0A000"
},
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : {
"message" : [
"Failed to perform stateful processor operation=<operationType> with invalid handle state=<handleState>."
],
"sqlState" : "42802"
},
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE" : {
"message" : [
"Failed to perform stateful processor operation=<operationType> with invalid timeoutMode=<timeoutMode>"
],
"sqlState" : "42802"
},
"STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS" : {
"message" : [
"Failed to create column family with unsupported starting character and name=<colFamilyName>."
],
"sqlState" : "42802"
},
"STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME" : {
"message" : [
"Failed to perform column family operation=<operationName> with invalid name=<colFamilyName>. Column family name cannot be empty or include leading/trailing spaces or use the reserved keyword=default"
Expand Down
18 changes: 18 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2148,6 +2148,24 @@ The SQL config `<sqlConf>` cannot be found. Please verify that the config exists

Star (*) is not allowed in a select list when GROUP BY an ordinal position is used.

### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to perform stateful processor operation=`<operationType>` with invalid handle state=`<handleState>`.

### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to perform stateful processor operation=`<operationType>` with invalid timeoutMode=`<timeoutMode>`

### STATE_STORE_CANNOT_CREATE_COLUMN_FAMILY_WITH_RESERVED_CHARS

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Failed to create column family with unsupported starting character and name=`<colFamilyName>`.

### STATE_STORE_CANNOT_USE_COLUMN_FAMILY_WITH_INVALID_NAME

[SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,18 @@ public class TimeoutMode {
public static final TimeoutMode NoTimeouts() {
return NoTimeouts$.MODULE$;
}

/**
* Stateful processor that only registers processing time based timers
*/
public static final TimeoutMode ProcessingTime() {
return ProcessingTime$.MODULE$;
}

/**
* Stateful processor that only registers event time based timers
*/
public static final TimeoutMode EventTime() {
return EventTime$.MODULE$;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.streaming.TimeoutMode

/** Types of timeouts used in tranformWithState operator */
/** Types of timeouts used in transformWithState operator */
case object NoTimeouts extends TimeoutMode
case object ProcessingTime extends TimeoutMode
case object EventTime extends TimeoutMode
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

import java.io.Serializable

import org.apache.spark.annotation.{Evolving, Experimental}

/**
* Class used to provide access to expired timer's expiry time. These values
* are only relevant if the ExpiredTimerInfo is valid.
*/
@Experimental
@Evolving
private[sql] trait ExpiredTimerInfo extends Serializable {
/**
* Check if provided ExpiredTimerInfo is valid.
*/
def isValid(): Boolean

/**
* Get the expired timer's expiry time as milliseconds in epoch time.
*/
def getExpiryTimeInMs(): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
* Function that will be invoked as the first method that allows for users to
* initialize all their state variables and perform other init actions before handling data.
* @param outputMode - output mode for the stateful processor
* @param timeoutMode - timeout mode for the stateful processor
*/
def init(outputMode: OutputMode): Unit
def init(
outputMode: OutputMode,
timeoutMode: TimeoutMode): Unit

/**
* Function that will allow users to interact with input data rows along with the grouping key
Expand All @@ -50,12 +53,15 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
* @param inputRows - iterator of input rows associated with grouping key
* @param timerValues - instance of TimerValues that provides access to current processing/event
* time if available
* @param expiredTimerInfo - instance of ExpiredTimerInfo that provides access to expired timer
* if applicable
* @return - Zero or more output rows
*/
def handleInputRows(
key: K,
inputRows: Iterator[I],
timerValues: TimerValues): Iterator[O]
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[O]

/**
* Function called as the last method that allows for users to perform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ private[sql] trait StatefulProcessorHandle extends Serializable {
/** Function to return queryInfo for currently running task */
def getQueryInfo(): QueryInfo

/**
* Function to register a processing/event time based timer for given implicit grouping key
* and provided timestamp
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
def registerTimer(expiryTimestampMs: Long): Unit

/**
* Function to delete a processing/event time based timer for given implicit grouping key
* and provided timestamp
* @param expiryTimestampMs - timer expiry timestamp in milliseconds
*/
def deleteTimer(expiryTimestampMs: Long): Unit

/**
* Function to list all the timers registered for given implicit grouping key
* Note: calling listTimers() within the `handleInputRows` method of the StatefulProcessor
* will return all the unprocessed registered timers, including the one being fired within the
* invocation of `handleInputRows`.
* @return - list of all the registered timers for given implicit grouping key
*/
def listTimers(): Iterator[Long]

/**
* Function to delete and purge state variable if defined previously
* @param stateName - name of the state variable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode}

/**
* Class that provides a concrete implementation that can be used to provide access to expired
* timer's expiry time. These values are only relevant if the ExpiredTimerInfo
* is valid.
* @param isValid - boolean to check if the provided ExpiredTimerInfo is valid
* @param expiryTimeInMsOpt - option to expired timer's expiry time as milliseconds in epoch time
*/
class ExpiredTimerInfoImpl(
isValid: Boolean,
expiryTimeInMsOpt: Option[Long] = None,
timeoutMode: TimeoutMode = TimeoutMode.NoTimeouts()) extends ExpiredTimerInfo {

override def isValid(): Boolean = isValid

override def getExpiryTimeInMs(): Long = expiryTimeInMsOpt.getOrElse(-1L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, StatefulProcessorHandle, ValueState}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo, StatefulProcessorHandle, TimeoutMode, ValueState}
import org.apache.spark.util.Utils

/**
Expand All @@ -45,7 +46,7 @@ object ImplicitGroupingKeyTracker {
*/
object StatefulProcessorHandleState extends Enumeration {
type StatefulProcessorHandleState = Value
val CREATED, INITIALIZED, DATA_PROCESSED, CLOSED = Value
val CREATED, INITIALIZED, DATA_PROCESSED, TIMER_PROCESSED, CLOSED = Value
}

class QueryInfoImpl(
Expand Down Expand Up @@ -76,6 +77,7 @@ class StatefulProcessorHandleImpl(
store: StateStore,
runId: UUID,
keyEncoder: ExpressionEncoder[Any],
timeoutMode: TimeoutMode,
isStreaming: Boolean = true)
extends StatefulProcessorHandle with Logging {
import StatefulProcessorHandleState._
Expand Down Expand Up @@ -114,28 +116,85 @@ class StatefulProcessorHandleImpl(
def getHandleState: StatefulProcessorHandleState = currState

override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = {
verify(currState == CREATED, s"Cannot create state variable with name=$stateName after " +
"initialization is complete")
verifyStateVarOperations("get_value_state")
val resultState = new ValueStateImpl[T](store, stateName, keyEncoder, valEncoder)
resultState
}

override def getQueryInfo(): QueryInfo = currQueryInfo

private lazy val timerState = new TimerStateImpl(store, timeoutMode, keyEncoder)

private def verifyStateVarOperations(operationType: String): Unit = {
if (currState != CREATED) {
throw StateStoreErrors.cannotPerformOperationWithInvalidHandleState(operationType,
currState.toString)
}
}

private def verifyTimerOperations(operationType: String): Unit = {
if (timeoutMode == NoTimeouts) {
throw StateStoreErrors.cannotPerformOperationWithInvalidTimeoutMode(operationType,
timeoutMode.toString)
}

if (currState < INITIALIZED || currState >= TIMER_PROCESSED) {
throw StateStoreErrors.cannotPerformOperationWithInvalidHandleState(operationType,
currState.toString)
}
}

/**
* Function to register a timer for the given expiryTimestampMs
* @param expiryTimestampMs - timestamp in milliseconds for the timer to expire
*/
override def registerTimer(expiryTimestampMs: Long): Unit = {
verifyTimerOperations("register_timer")
timerState.registerTimer(expiryTimestampMs)
}

/**
* Function to delete a timer for the given expiryTimestampMs
* @param expiryTimestampMs - timestamp in milliseconds for the timer to delete
*/
override def deleteTimer(expiryTimestampMs: Long): Unit = {
verifyTimerOperations("delete_timer")
timerState.deleteTimer(expiryTimestampMs)
}

/**
* Function to retrieve all registered timers for all grouping keys
* @return - iterator of registered timers for all grouping keys
*/
def getExpiredTimers(): Iterator[(Any, Long)] = {
verifyTimerOperations("get_expired_timers")
timerState.getExpiredTimers()
}

/**
* Function to list all the registered timers for given implicit key
* Note: calling listTimers() within the `handleInputRows` method of the StatefulProcessor
* will return all the unprocessed registered timers, including the one being fired within the
* invocation of `handleInputRows`.
* @return - iterator of all the registered timers for given implicit key
*/
def listTimers(): Iterator[Long] = {
verifyTimerOperations("list_timers")
timerState.listTimers()
}

/**
* Function to delete and purge state variable if defined previously
*
* @param stateName - name of the state variable
*/
override def deleteIfExists(stateName: String): Unit = {
verify(currState == CREATED, s"Cannot delete state variable with name=$stateName after " +
"initialization is complete")
verifyStateVarOperations("delete_if_exists")
store.removeColFamilyIfExists(stateName)
}

override def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T] = {
verify(currState == CREATED, s"Cannot create state variable with name=$stateName after " +
"initialization is complete")
verifyStateVarOperations("get_list_state")
val resultState = new ListStateImpl[T](store, stateName, keyEncoder, valEncoder)
resultState
}
Expand All @@ -144,8 +203,7 @@ class StatefulProcessorHandleImpl(
stateName: String,
userKeyEnc: Encoder[K],
valEncoder: Encoder[V]): MapState[K, V] = {
verify(currState == CREATED, s"Cannot create state variable with name=$stateName after " +
"initialization is complete")
verifyStateVarOperations("get_map_state")
val resultState = new MapStateImpl[K, V](store, stateName, keyEncoder, userKeyEnc, valEncoder)
resultState
}
Expand Down

0 comments on commit 839bc9f

Please sign in to comment.