Skip to content

Commit

Permalink
Fixed goroutine leak in reminders and timers (dapr#6523)
Browse files Browse the repository at this point in the history
* Fixed goroutine leak in reminders and timers

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added unit tests + some more tweaks

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed last goroutine leaks

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Comments

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 20, 2023
1 parent e00f51d commit 3895d3e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 14 deletions.
31 changes: 17 additions & 14 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,11 +906,11 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel

nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()

Expand All @@ -922,6 +922,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
case <-ttlTimerC:
// proceed with reminder deletion
log.Infof("Reminder %s with parameters: dueTime: %s, period: %s has expired", reminderKey, reminder.DueTime, reminder.Period)
ttlTimer = nil
break L
case <-stopChannel:
// reminder has been already deleted
Expand All @@ -932,20 +933,23 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
_, exists := a.activeReminders.Load(reminderKey)
if !exists {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}

// if all repetitions are completed, proceed with reminder deletion
// If all repetitions are completed, delete the reminder and do not execute it
if reminder.RepeatsLeft() == 0 {
log.Info("Reminder " + reminderKey + " has been completed")
nextTimer = nil
break L
}

err = a.executeReminder(reminder, false)
if err != nil {
if errors.Is(err, ErrReminderCanceled) {
// The handler is explicitly canceling the timer
log.Info("Reminder " + reminderKey + " was canceled by the actor")
log.Debug("Reminder " + reminderKey + " was canceled by the actor")
nextTimer = nil
break L
} else {
log.Errorf("Error while executing reminder %s: %v", reminderKey, err)
Expand All @@ -966,16 +970,15 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
}
} else {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}

if reminder.TickExecuted() {
nextTimer = nil
break L
}

if nextTimer.Stop() {
<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}

Expand Down Expand Up @@ -1269,11 +1272,11 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest

nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()

Expand All @@ -1285,6 +1288,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
case <-ttlTimerC:
// timer has expired; proceed with deletion
log.Infof("Timer %s with parameters: dueTime: %s, period: %s, TTL: %s has expired", timerKey, req.DueTime, req.Period, req.TTL)
ttlTimer = nil
break L
case <-stop:
// timer has been already deleted
Expand All @@ -1299,17 +1303,16 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
}
} else {
log.Errorf("Could not find active timer %s", timerKey)
nextTimer = nil
return
}

if reminder.TickExecuted() {
log.Infof("Timer %s has been completed", timerKey)
nextTimer = nil
break L
}

if nextTimer.Stop() {
<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -3078,3 +3079,80 @@ func TestPlacementSwitchIsNotTurnedOn(t *testing.T) {
assert.Nil(t, testActorsRuntime.store)
})
}

func TestCreateTimerReminderGoroutineLeak(t *testing.T) {
testActorsRuntime := newTestActorsRuntime()
defer testActorsRuntime.Stop()

actorType, actorID := getTestActorTypeAndID()
fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock)

testFn := func(createFn func(i int, ttl bool) error) func(t *testing.T) {
return func(t *testing.T) {
// Get the baseline goroutines
initialCount := runtime.NumGoroutine()

// Create 10 timers/reminders with unique names
for i := 0; i < 10; i++ {
require.NoError(t, createFn(i, false))
}

// Create 5 timers/reminders that override the first ones
for i := 0; i < 5; i++ {
require.NoError(t, createFn(i, false))
}

// Create 5 timers/reminders that have TTLs
for i := 10; i < 15; i++ {
require.NoError(t, createFn(i, true))
}

// Advance the clock to make the timers/reminders fire
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)

// Sleep to allow for cleanup
time.Sleep(200 * time.Millisecond)

// Get the number of goroutines again, which should be +/- 2 the initial one (we give it some buffer)
currentCount := runtime.NumGoroutine()
if currentCount >= (initialCount+2) || currentCount <= (initialCount-2) {
t.Fatalf("Current number of goroutine %[1]d is outside of range [%[2]d-2, %[2]d+2]", currentCount, initialCount)
}
}
}

t.Run("timers", testFn(func(i int, ttl bool) error {
req := &CreateTimerRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("timer%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateTimer(context.Background(), req)
}))

t.Run("reminders", testFn(func(i int, ttl bool) error {
req := &CreateReminderRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("reminder%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateReminder(context.Background(), req)
}))
}

0 comments on commit 3895d3e

Please sign in to comment.