Skip to content

Commit

Permalink
fix: changing lastResetTime to a time.Time from int64 seconds so time…
Browse files Browse the repository at this point in the history
… comparison can occur at a finer granularity (#1695)

* changing lastResetTime to a time.Time from int64 seconds so time comparison can occur at a finer granularity

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>
  • Loading branch information
juliev0 committed Mar 2, 2022
1 parent b9bbfd1 commit 35f643a
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti
log.Debugf("finished evicting %v cached IDs, time cost: %v ms", num, (time.Now().UnixNano()-now)/1000/1000)
case <-resetConditionsCh:
log.Info("reset conditions")
msgHolder.setLastResetTime(time.Now().Unix())
msgHolder.setLastResetTime(time.Now())
}
}
}()
Expand Down Expand Up @@ -260,7 +260,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
}

// Acknowledge any old messages that occurred before the last reset (standard reset after trigger or conditional reset)
if m.Timestamp/1e9 <= msgHolder.getLastResetTime() {
if m.Timestamp <= msgHolder.getLastResetTime().UnixNano() {
if depName != "" {
msgHolder.reset(depName)
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
log.Infow("trigger conditions not met", zap.Any("meetDependencies", meetDeps), zap.Any("meetEvents", meetMsgIds))
return
}
msgHolder.setLastResetTime(now)
msgHolder.setLastResetTime(time.Now())
// Trigger actions
messages := make(map[string]cloudevents.Event)
for k, v := range msgHolder.msgs {
Expand All @@ -353,7 +353,7 @@ type eventSourceMessage struct {
type eventSourceMessageHolder struct {
// time that resets conditions, usually the time all conditions meet,
// or the time getting an external signal to reset.
lastResetTime int64
lastResetTime time.Time
// if we reach this time, we reset everything (occurs 60 seconds after lastResetTime)
resetTimeout int64
expr *govaluate.EvaluableExpression
Expand Down Expand Up @@ -394,7 +394,7 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin
}

return &eventSourceMessageHolder{
lastResetTime: lastResetTime.Unix(),
lastResetTime: lastResetTime,
expr: expression,
depNames: deps,
sourceDepMap: srcDepMap,
Expand All @@ -406,19 +406,19 @@ func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr strin
}, nil
}

func (mh *eventSourceMessageHolder) getLastResetTime() int64 {
func (mh *eventSourceMessageHolder) getLastResetTime() time.Time {
mh.lock.RLock()
defer mh.lock.RUnlock()
return mh.lastResetTime
}

func (mh *eventSourceMessageHolder) setLastResetTime(t int64) {
func (mh *eventSourceMessageHolder) setLastResetTime(t time.Time) {
{
mh.lock.Lock() // since this can be called asyncronously as part of a ConditionReset, we neeed to lock this code
defer mh.lock.Unlock()
mh.lastResetTime = t
}
mh.setResetTimeout(t + 60) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime
mh.setResetTimeout(t.Add(time.Second * 60).Unix()) // failsafe condition: determine if we for some reason we haven't acknowledged all dependencies within 60 seconds of the lastResetTime
}

func (mh *eventSourceMessageHolder) setResetTimeout(t int64) {
Expand Down Expand Up @@ -472,6 +472,7 @@ func (mh *eventSourceMessageHolder) resetAll() {
for k := range mh.msgs {
delete(mh.msgs, k)
}

for k := range mh.parameters {
mh.parameters[k] = false
}
Expand Down

0 comments on commit 35f643a

Please sign in to comment.