Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed goroutine leak in reminders and timers #6523

Merged
merged 13 commits into from
Jun 20, 2023
Merged
29 changes: 16 additions & 13 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,11 +911,11 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel

nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()

Expand All @@ -927,6 +927,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
case <-ttlTimerC:
// proceed with reminder deletion
log.Infof("Reminder %s with parameters: dueTime: %s, period: %s has expired", reminderKey, reminder.DueTime, reminder.Period)
ttlTimer = nil
break L
case <-stopChannel:
// reminder has been already deleted
Expand All @@ -937,12 +938,14 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
_, exists := a.activeReminders.Load(reminderKey)
if !exists {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}

// if all repetitions are completed, proceed with reminder deletion
// If all repetitions are completed, delete the reminder and do not execute it
if reminder.RepeatsLeft() == 0 {
log.Info("Reminder " + reminderKey + " has been completed")
nextTimer = nil
break L
}

Expand All @@ -952,6 +955,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
if errors.Is(err, ErrReminderCanceled) {
// The handler is explicitly canceling the timer
log.Debug("Reminder " + reminderKey + " was canceled by the actor")
nextTimer = nil
break L
} else {
log.Errorf("Error while executing reminder %s: %v", reminderKey, err)
Expand All @@ -972,16 +976,15 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
}
} else {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}

if reminder.TickExecuted() {
nextTimer = nil
break L
}

if nextTimer.Stop() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers: this was deleted because when we hit this stage, the reminder has already fired. Trying to drain it again would cause a deadlock.

<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}

Expand Down Expand Up @@ -1296,11 +1299,11 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest

nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()

Expand All @@ -1312,6 +1315,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
case <-ttlTimerC:
// timer has expired; proceed with deletion
log.Infof("Timer %s with parameters: dueTime: %s, period: %s, TTL: %s has expired", timerKey, req.DueTime, req.Period, req.TTL)
ttlTimer = nil
break L
case <-stop:
// timer has been already deleted
Expand All @@ -1327,17 +1331,16 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
}
} else {
log.Errorf("Could not find active timer %s", timerKey)
nextTimer = nil
return
}

if reminder.TickExecuted() {
log.Infof("Timer %s has been completed", timerKey)
nextTimer = nil
break L
}

if nextTimer.Stop() {
<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -3313,3 +3314,80 @@ func TestPlacementSwitchIsNotTurnedOn(t *testing.T) {
assert.Empty(t, testActorsRuntime.compStore.ListStateStores())
})
}

func TestCreateTimerReminderGoroutineLeak(t *testing.T) {
testActorsRuntime := newTestActorsRuntime()
defer testActorsRuntime.Stop()

actorType, actorID := getTestActorTypeAndID()
fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock)

testFn := func(createFn func(i int, ttl bool) error) func(t *testing.T) {
return func(t *testing.T) {
// Get the baseline goroutines
initialCount := runtime.NumGoroutine()

// Create 10 timers/reminders with unique names
for i := 0; i < 10; i++ {
require.NoError(t, createFn(i, false))
}

// Create 5 timers/reminders that override the first ones
for i := 0; i < 5; i++ {
require.NoError(t, createFn(i, false))
}

// Create 5 timers/reminders that have TTLs
for i := 10; i < 15; i++ {
require.NoError(t, createFn(i, true))
}

// Advance the clock to make the timers/reminders fire
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)

// Sleep to allow for cleanup
time.Sleep(200 * time.Millisecond)

// Get the number of goroutines again, which should be +/- 2 the initial one (we give it some buffer)
currentCount := runtime.NumGoroutine()
if currentCount >= (initialCount+2) || currentCount <= (initialCount-2) {
t.Fatalf("Current number of goroutine %[1]d is outside of range [%[2]d-2, %[2]d+2]", currentCount, initialCount)
}
}
}

t.Run("timers", testFn(func(i int, ttl bool) error {
req := &CreateTimerRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("timer%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateTimer(context.Background(), req)
}))

t.Run("reminders", testFn(func(i int, ttl bool) error {
req := &CreateReminderRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("reminder%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateReminder(context.Background(), req)
}))
}