diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index b940bc5d3a..7be2b7b9d9 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -190,7 +190,7 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti log.Debugf("finished evicting %v cached IDs, time cost: %v ms", num, (time.Now().UnixNano()-now)/1000/1000) case <-resetConditionsCh: log.Info("reset conditions") - msgHolder.setLastResetTime(time.Now().Unix()) + msgHolder.setLastResetTime(time.Now()) } } }() @@ -260,7 +260,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc } // Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset) - if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { + if m.Timestamp <= msgHolder.getLastResetTime().UnixNano() { if depName != "" { msgHolder.reset(depName) } @@ -326,7 +326,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc log.Infow("trigger conditions not met", zap.Any("meetDependencies", meetDeps), zap.Any("meetEvents", meetMsgIds)) return } - msgHolder.setLastResetTime(now) + msgHolder.setLastResetTime(time.Now()) // Trigger actions messages := make(map[string]cloudevents.Event) for k, v := range msgHolder.msgs { @@ -353,7 +353,7 @@ type eventSourceMessage struct { type eventSourceMessageHolder struct { // time that resets conditions, usually the time all conditions meet, // or the time getting an external signal to reset. - lastResetTime int64 + lastResetTime time.Time // if we reach this time, we reset everything (occurs 60 seconds after lastResetTime) resetTimeout int64 expr *govaluate.EvaluableExpression @@ -394,7 +394,7 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin } return &eventSourceMessageHolder{ - lastResetTime: lastResetTime.Unix(), + lastResetTime: lastResetTime, expr: expression, depNames: deps, sourceDepMap: srcDepMap, @@ -406,19 +406,19 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin }, nil } -func (mh *eventSourceMessageHolder) getLastResetTime() int64 { +func (mh *eventSourceMessageHolder) getLastResetTime() time.Time { mh.lock.RLock() defer mh.lock.RUnlock() return mh.lastResetTime } -func (mh *eventSourceMessageHolder) setLastResetTime(t int64) { +func (mh *eventSourceMessageHolder) setLastResetTime(t time.Time) { { mh.lock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code defer mh.lock.Unlock() mh.lastResetTime = t } - mh.setResetTimeout(t + 60) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime + mh.setResetTimeout(t.Add(time.Second * 60).Unix()) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime } func (mh *eventSourceMessageHolder) setResetTimeout(t int64) { @@ -472,6 +472,7 @@ func (mh *eventSourceMessageHolder) resetAll() { for k := range mh.msgs { delete(mh.msgs, k) } + for k := range mh.parameters { mh.parameters[k] = false }