From 00a4563e9936132927c97aed2171e480b5ab2cf1 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Thu, 15 Jun 2023 15:58:17 +0000 Subject: [PATCH 1/4] Fixed goroutine leak in reminders and timers Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/actors.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 8f446495013..5729af86d88 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -911,11 +911,11 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if nextTimer.Stop() { + if !nextTimer.Stop() { <-nextTimer.C() } - if ttlTimer != nil && ttlTimer.Stop() { - <-ttlTimerC + if ttlTimer != nil && !ttlTimer.Stop() { + <-ttlTimer.C() } }() @@ -979,9 +979,6 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel break L } - if nextTimer.Stop() { - <-nextTimer.C() - } nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now())) } @@ -1296,11 +1293,11 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if nextTimer.Stop() { + if !nextTimer.Stop() { <-nextTimer.C() } - if ttlTimer != nil && ttlTimer.Stop() { - <-ttlTimerC + if ttlTimer != nil && !ttlTimer.Stop() { + <-ttlTimer.C() } }() @@ -1335,9 +1332,6 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest break L } - if nextTimer.Stop() { - <-nextTimer.C() - } nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now())) } From f5b2a717d7a9ecb819bfb77d2190fd39545e2956 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 16 Jun 2023 21:46:06 +0000 Subject: [PATCH 2/4] Added unit tests + some more tweaks Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/actors.go | 8 +++- pkg/actors/actors_test.go | 78 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 5729af86d88..86da96ebf49 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -911,7 +911,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if !nextTimer.Stop() { + if nextTimer != nil && !nextTimer.Stop() { <-nextTimer.C() } if ttlTimer != nil && !ttlTimer.Stop() { @@ -927,6 +927,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel case <-ttlTimerC: // proceed with reminder deletion log.Infof("Reminder %s with parameters: dueTime: %s, period: %s has expired", reminderKey, reminder.DueTime, reminder.Period) + ttlTimer = nil break L case <-stopChannel: // reminder has been already deleted @@ -976,6 +977,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel } if reminder.TickExecuted() { + nextTimer = nil break L } @@ -1293,7 +1295,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if !nextTimer.Stop() { + if nextTimer != nil && !nextTimer.Stop() { <-nextTimer.C() } if ttlTimer != nil && !ttlTimer.Stop() { @@ -1309,6 +1311,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest case <-ttlTimerC: // timer has expired; proceed with deletion log.Infof("Timer %s with parameters: dueTime: %s, period: %s, TTL: %s has expired", timerKey, req.DueTime, req.Period, req.TTL) + ttlTimer = nil break L case <-stop: // timer has been already deleted @@ -1329,6 +1332,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest if reminder.TickExecuted() { log.Infof("Timer %s has been completed", timerKey) + nextTimer = nil break L } diff --git a/pkg/actors/actors_test.go b/pkg/actors/actors_test.go index 056bb87c179..f0f69c79af2 100644 --- a/pkg/actors/actors_test.go +++ b/pkg/actors/actors_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "net/http" + "runtime" "strconv" "strings" "sync" @@ -3313,3 +3314,80 @@ func TestPlacementSwitchIsNotTurnedOn(t *testing.T) { assert.Empty(t, testActorsRuntime.compStore.ListStateStores()) }) } + +func TestCreateTimerReminderGoroutineLeak(t *testing.T) { + testActorsRuntime := newTestActorsRuntime() + defer testActorsRuntime.Stop() + + actorType, actorID := getTestActorTypeAndID() + fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) + + testFn := func(createFn func(i int, ttl bool) error) func(t *testing.T) { + return func(t *testing.T) { + // Get the baseline goroutines + initialCount := runtime.NumGoroutine() + + // Create 10 timers/reminders with unique names + for i := 0; i < 10; i++ { + require.NoError(t, createFn(i, false)) + } + + // Create 5 timers/reminders that override the first ones + for i := 0; i < 5; i++ { + require.NoError(t, createFn(i, false)) + } + + // Create 5 timers/reminders that have TTL + for i := 10; i < 15; i++ { + require.NoError(t, createFn(i, true)) + } + + // Advance the clock to make the timers/reminders fire + time.Sleep(200 * time.Millisecond) + testActorsRuntime.clock.Sleep(5 * time.Second) + time.Sleep(200 * time.Millisecond) + testActorsRuntime.clock.Sleep(5 * time.Second) + + // Sleep to allow for cleanup + time.Sleep(200 * time.Millisecond) + + // Get the number of goroutines again, which should be +/- 2 the initial one (we give it some buffer) + currentCount := runtime.NumGoroutine() + if currentCount >= (initialCount+2) || currentCount <= (initialCount-2) { + t.Fatalf("Current number of goroutine %[1]d is outside of range [%[2]d-2, %[2]d+2]", currentCount, initialCount) + } + } + } + + t.Run("timers", testFn(func(i int, ttl bool) error { + req := &CreateTimerRequest{ + ActorType: actorType, + ActorID: actorID, + Name: fmt.Sprintf("timer%d", i), + Data: json.RawMessage(`"data"`), + DueTime: "2s", + } + if ttl { + req.DueTime = "1s" + req.Period = "1s" + req.TTL = "2s" + } + return testActorsRuntime.CreateTimer(context.Background(), req) + })) + + t.Run("reminders", testFn(func(i int, ttl bool) error { + req := &CreateReminderRequest{ + ActorType: actorType, + ActorID: actorID, + Name: fmt.Sprintf("reminder%d", i), + Data: json.RawMessage(`"data"`), + DueTime: "2s", + } + if ttl { + req.DueTime = "1s" + req.Period = "1s" + req.TTL = "2s" + } + return testActorsRuntime.CreateReminder(context.Background(), req) + })) +} From f10110d5a5aad752819375eb12d783d976f10abc Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 19 Jun 2023 05:11:51 +0000 Subject: [PATCH 3/4] Fixed last goroutine leaks Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/actors.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 86da96ebf49..94da5d932b0 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -938,12 +938,14 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel _, exists := a.activeReminders.Load(reminderKey) if !exists { log.Error("Could not find active reminder with key: " + reminderKey) + nextTimer = nil return } // if all repetitions are completed, proceed with reminder deletion if reminder.RepeatsLeft() == 0 { log.Info("Reminder " + reminderKey + " has been completed") + nextTimer = nil break L } @@ -953,6 +955,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel if errors.Is(err, ErrReminderCanceled) { // The handler is explicitly canceling the timer log.Debug("Reminder " + reminderKey + " was canceled by the actor") + nextTimer = nil break L } else { log.Errorf("Error while executing reminder %s: %v", reminderKey, err) @@ -973,6 +976,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel } } else { log.Error("Could not find active reminder with key: " + reminderKey) + nextTimer = nil return } @@ -1327,6 +1331,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest } } else { log.Errorf("Could not find active timer %s", timerKey) + nextTimer = nil return } From 8bb0288d3461d676f8996ce4bce08e88a14306ef Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 19 Jun 2023 06:15:42 +0000 Subject: [PATCH 4/4] Comments Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/actors.go | 2 +- pkg/actors/actors_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 94da5d932b0..3732c2014ed 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -942,7 +942,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel return } - // if all repetitions are completed, proceed with reminder deletion + // If all repetitions are completed, delete the reminder and do not execute it if reminder.RepeatsLeft() == 0 { log.Info("Reminder " + reminderKey + " has been completed") nextTimer = nil diff --git a/pkg/actors/actors_test.go b/pkg/actors/actors_test.go index f0f69c79af2..11739df1916 100644 --- a/pkg/actors/actors_test.go +++ b/pkg/actors/actors_test.go @@ -3337,7 +3337,7 @@ func TestCreateTimerReminderGoroutineLeak(t *testing.T) { require.NoError(t, createFn(i, false)) } - // Create 5 timers/reminders that have TTL + // Create 5 timers/reminders that have TTLs for i := 10; i < 15; i++ { require.NoError(t, createFn(i, true)) }