diff --git a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala index 67dc7bf3b25..5ee2410efae 100644 --- a/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala +++ b/engine/flink/components/base-unbounded/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/DelayTransformer.scala @@ -15,7 +15,6 @@ import pl.touk.nussknacker.engine.flink.util.keyed.StringKeyOnlyMapper import java.time.Duration import java.util -import java.util.Collections import javax.annotation.Nullable object DelayTransformer extends DelayTransformer @@ -81,7 +80,7 @@ class DelayFunction(nodeCtx: FlinkCustomNodeContext, delay: Duration) val fireTime = ctx.timestamp() + delay.toMillis val currentState = readStateValueOrInitial(fireTime) - currentState.add(0, value.context) + currentState.add(value.context) state.put(fireTime, currentState) ctx.timerService().registerEventTimeTimer(fireTime) @@ -89,7 +88,6 @@ class DelayFunction(nodeCtx: FlinkCustomNodeContext, delay: Duration) override def onTimer(timestamp: Long, funCtx: FlinkTimerCtx, out: Collector[ValueWithContext[AnyRef]]): Unit = { val currentState = readStateValueOrInitial(timestamp) - Collections.reverse(currentState) currentState.forEach(emitValue(out, _)) state.remove(timestamp) }