Skip to content

Commit

Permalink
Optimize Delay component
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrp committed Mar 27, 2024
1 parent 4872f8a commit e04ca8f
Showing 1 changed file with 1 addition and 3 deletions.
Expand Up @@ -15,7 +15,6 @@ import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyOnlyMapper

import java.time.Duration
import java.util
import java.util.Collections
import javax.annotation.Nullable

object DelayTransformer extends DelayTransformer
Expand Down Expand Up @@ -81,15 +80,14 @@ class DelayFunction(nodeCtx: FlinkCustomNodeContext, delay: Duration)
val fireTime = ctx.timestamp() + delay.toMillis

val currentState = readStateValueOrInitial(fireTime)
currentState.add(0, value.context)
currentState.add(value.context)
state.put(fireTime, currentState)

ctx.timerService().registerEventTimeTimer(fireTime)
}

override def onTimer(timestamp: Long, funCtx: FlinkTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = {
val currentState = readStateValueOrInitial(timestamp)
Collections.reverse(currentState)
currentState.forEach(emitValue(out, _))
state.remove(timestamp)
}
Expand Down

0 comments on commit e04ca8f

Please sign in to comment.