Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(actors): timer ticker bug #2673

Merged
merged 4 commits into from
Jan 26, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 22 additions & 17 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,10 @@ func (a *actorsRuntime) CreateReminder(ctx context.Context, req *CreateReminderR
}

func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest) error {
var (
err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a UT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it means Unit Test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unit test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dueTime, period time.Duration
)
a.activeTimersLock.Lock()
defer a.activeTimersLock.Unlock()
actorKey := a.constructCompositeKey(req.ActorType, req.ActorID)
Expand All @@ -843,22 +847,20 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
close(stopChan.(chan bool))
}

d, err := time.ParseDuration(req.Period)
if err != nil {
if period, err = time.ParseDuration(req.Period); err != nil {
return err
}
if len(req.DueTime) > 0 {
if dueTime, err = time.ParseDuration(req.DueTime); err != nil {
return err
}
}

t := a.configureTicker(d)
stop := make(chan bool, 1)
a.activeTimers.Store(timerKey, stop)

go func(ticker *time.Ticker, stop chan (bool), actorType, actorID, name, dueTime, period, callback string, data interface{}) {
if dueTime != "" {
d, err := time.ParseDuration(dueTime)
if err == nil {
time.Sleep(d)
}
}
go func(stop chan (bool), req *CreateTimerRequest) {
time.Sleep(dueTime)

// Check if timer is still active
select {
Expand All @@ -869,34 +871,37 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
break
}

err := a.executeTimer(actorType, actorID, name, dueTime, period, callback, data)
err := a.executeTimer(req.ActorType, req.ActorID, req.Name, req.DueTime,
req.Period, req.Callback, req.Data)
if err != nil {
log.Debugf("error invoking timer on actor %s: %s", actorKey, err)
}

actorKey := a.constructCompositeKey(actorType, actorID)
ticker := a.configureTicker(period)
actorKey := a.constructCompositeKey(req.ActorType, req.ActorID)

for {
select {
case <-ticker.C:
_, exists := a.actorsTable.Load(actorKey)
if exists {
err := a.executeTimer(actorType, actorID, name, dueTime, period, callback, data)
err := a.executeTimer(req.ActorType, req.ActorID, req.Name, req.DueTime,
req.Period, req.Callback, req.Data)
if err != nil {
log.Debugf("error invoking timer on actor %s: %s", actorKey, err)
}
} else {
a.DeleteTimer(ctx, &DeleteTimerRequest{
Name: name,
ActorID: actorID,
ActorType: actorType,
Name: req.Name,
ActorID: req.ActorID,
ActorType: req.ActorType,
})
}
case <-stop:
return
}
}
}(t, stop, req.ActorType, req.ActorID, req.Name, req.DueTime, req.Period, req.Callback, req.Data)
}(stop, req)
return nil
}

Expand Down