Skip to content

Commit

Permalink
add parameter for getexpirytimers()
Browse files Browse the repository at this point in the history
  • Loading branch information
jingz-db committed Mar 26, 2024
1 parent 1f128d5 commit b34b66c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,13 @@ class StatefulProcessorHandleImpl(

/**
* Function to retrieve all registered timers for all grouping keys
* @param expiryTimestampMs - threshold for expired timestamp in milliseconds, this function
* will return every timers that has (strictly) smaller timestamp
* @return - iterator of registered timers for all grouping keys
*/
def getExpiredTimers(): Iterator[(Any, Long)] = {
def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
verifyTimerOperations("get_expired_timers")
timerState.getExpiredTimers()
timerState.getExpiredTimers(expiryTimestampMs)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,20 @@ class TimerStateImpl(

/**
* Function to get all the registered timers for all grouping keys
* @param expiryTimestampMs - threshold for expired timestamp in milliseconds, this function
* will return every timers that is before this threshold
* @return - iterator of all the registered timers for all grouping keys
*/
def getExpiredTimers(): Iterator[(Any, Long)] = {
def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
// this iter is increasingly sorted on timestamp
val iter = store.iterator(tsToKeyCFName)

new NextIterator[(Any, Long)] {
override protected def getNext(): (Any, Long) = {
if (iter.hasNext) {
val rowPair = iter.next()
val keyRow = rowPair.key
val result = getTimerRowFromSecIndex(keyRow)
val rowPair = if (iter.hasNext) iter.next() else null
val result: (Any, Long) =
if (rowPair != null) getTimerRowFromSecIndex(rowPair.key) else null
if (result != null && result._2 < expiryTimestampMs) {
result
} else {
finished = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,18 @@ case class TransformWithStateExec(
case ProcessingTime =>
assert(batchTimestampMs.isDefined)
val batchTimestamp = batchTimestampMs.get
val procTimeIter = processorHandle.getExpiredTimers()
procTimeIter
// procTimeIter are sorted increasingly on timestamp
.takeWhile { case (_, expiryTimestampMs) => expiryTimestampMs < batchTimestamp }
processorHandle.getExpiredTimers()
.flatMap { case (keyObj, expiryTimestampMs) =>
handleTimerRows(keyObj, expiryTimestampMs, processorHandle)
}

case EventTime =>
assert(eventTimeWatermarkForEviction.isDefined)
val watermark = eventTimeWatermarkForEviction.get
val eventTimeIter = processorHandle.getExpiredTimers()
eventTimeIter
// procTimeIter are sorted increasingly on timestamp
.takeWhile { case (_, expiryTimestampMs) => expiryTimestampMs < watermark }
processorHandle.getExpiredTimers()
.flatMap { case (keyObj, expiryTimestampMs) =>
handleTimerRows(keyObj, expiryTimestampMs, processorHandle)
}

case _ => Iterator.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class TimerSuite extends StateVariableSuiteBase {
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
timerState.registerTimer(1L * 1000)
assert(timerState.listTimers().toSet === Set(1000L))
assert(timerState.getExpiredTimers().toSet === Set(("test_key", 1000L)))
assert(timerState.getExpiredTimers(Long.MaxValue).toSeq === Seq(("test_key", 1000L)))
assert(timerState.getExpiredTimers(Long.MinValue).toSeq === Seq.empty[Long])

timerState.registerTimer(20L * 1000)
assert(timerState.listTimers().toSet === Set(20000L, 1000L))
Expand All @@ -69,8 +70,10 @@ class TimerSuite extends StateVariableSuiteBase {
timerState1.registerTimer(1L * 1000)
timerState2.registerTimer(15L * 1000)
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))
assert(timerState1.getExpiredTimers().toSet ===
Set(("test_key", 15000L), ("test_key", 1000L)))
assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq ===
Seq(("test_key", 1000L), ("test_key", 15000L)))
// if timestamp equals to expiryTimestampsMs, will not considered expired
assert(timerState1.getExpiredTimers(15000L).toSeq === Seq(("test_key", 1000L)))
assert(timerState1.listTimers().toSet === Set(15000L, 1000L))

timerState1.registerTimer(20L * 1000)
Expand Down Expand Up @@ -99,15 +102,16 @@ class TimerSuite extends StateVariableSuiteBase {
ImplicitGroupingKeyTracker.removeImplicitKey()

ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
assert(timerState1.getExpiredTimers().toSet ===
Set(("test_key2", 15000L), ("test_key1", 2000L), ("test_key1", 1000L)))
assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq ===
Seq(("test_key1", 1000L), ("test_key1", 2000L), ("test_key2", 15000L)))
assert(timerState1.getExpiredTimers(10000L).toSeq ===
Seq(("test_key1", 1000L), ("test_key1", 2000L)))
assert(timerState1.listTimers().toSet === Set(1000L, 2000L))
ImplicitGroupingKeyTracker.removeImplicitKey()

ImplicitGroupingKeyTracker.setImplicitKey("test_key2")
assert(timerState2.listTimers().toSet === Set(15000L))
assert(timerState2.getExpiredTimers().toSet ===
Set(("test_key2", 15000L), ("test_key1", 2000L), ("test_key1", 1000L)))
assert(timerState2.getExpiredTimers(1500L).toSeq === Seq(("test_key1", 1000L)))
}
}

Expand All @@ -122,8 +126,10 @@ class TimerSuite extends StateVariableSuiteBase {
val timerTimerstamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
// register/put unordered timestamp into rocksDB
timerTimerstamps.foreach(timerState.registerTimer)
// getExpiredTimers() is calling iterator() from rocksDB
assert(timerState.getExpiredTimers().toSeq.map(_._2) === timerTimerstamps.sorted)
assert(timerState.getExpiredTimers(Long.MaxValue).toSeq.map(_._2) === timerTimerstamps.sorted)
assert(timerState.getExpiredTimers(4200L).toSeq.map(_._2) ===
timerTimerstamps.sorted.takeWhile(_ < 4200L))
assert(timerState.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
ImplicitGroupingKeyTracker.removeImplicitKey()
}
}
Expand Down Expand Up @@ -153,8 +159,11 @@ class TimerSuite extends StateVariableSuiteBase {
ImplicitGroupingKeyTracker.removeImplicitKey()

ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
assert(timerState1.getExpiredTimers().toSeq.map(_._2) ===
assert(timerState1.getExpiredTimers(Long.MaxValue).toSeq.map(_._2) ===
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted)
assert(timerState1.getExpiredTimers(Long.MinValue).toSeq === Seq.empty)
assert(timerState1.getExpiredTimers(8000L).toSeq.map(_._2) ===
(timerTimestamps1 ++ timerTimestamps2 ++ timerTimerStamps3).sorted.takeWhile(_ < 8000L))
ImplicitGroupingKeyTracker.removeImplicitKey()
}
}
Expand Down

0 comments on commit b34b66c

Please sign in to comment.