diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index ff06a858ab9..4a6616cf5f6 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -911,11 +911,11 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if nextTimer.Stop() { + if nextTimer != nil && !nextTimer.Stop() { <-nextTimer.C() } - if ttlTimer != nil && ttlTimer.Stop() { - <-ttlTimerC + if ttlTimer != nil && !ttlTimer.Stop() { + <-ttlTimer.C() } }() @@ -927,6 +927,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel case <-ttlTimerC: // proceed with reminder deletion log.Infof("Reminder %s with parameters: dueTime: %s, period: %s has expired", reminderKey, reminder.DueTime, reminder.Period) + ttlTimer = nil break L case <-stopChannel: // reminder has been already deleted @@ -937,12 +938,14 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel _, exists := a.activeReminders.Load(reminderKey) if !exists { log.Error("Could not find active reminder with key: " + reminderKey) + nextTimer = nil return } - // if all repetitions are completed, proceed with reminder deletion + // If all repetitions are completed, delete the reminder and do not execute it if reminder.RepeatsLeft() == 0 { log.Info("Reminder " + reminderKey + " has been completed") + nextTimer = nil break L } @@ -952,6 +955,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel if errors.Is(err, ErrReminderCanceled) { // The handler is explicitly canceling the timer log.Debug("Reminder " + reminderKey + " was canceled by the actor") + nextTimer = nil break L } else { log.Errorf("Error while executing reminder %s: %v", reminderKey, err) @@ -972,16 +976,15 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel } } else { log.Error("Could not find active reminder with key: " + reminderKey) + nextTimer = nil return } if reminder.TickExecuted() { + nextTimer = nil break L } - if nextTimer.Stop() { - <-nextTimer.C() - } nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now())) } @@ -1296,11 +1299,11 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now())) defer func() { - if nextTimer.Stop() { + if nextTimer != nil && !nextTimer.Stop() { <-nextTimer.C() } - if ttlTimer != nil && ttlTimer.Stop() { - <-ttlTimerC + if ttlTimer != nil && !ttlTimer.Stop() { + <-ttlTimer.C() } }() @@ -1312,6 +1315,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest case <-ttlTimerC: // timer has expired; proceed with deletion log.Infof("Timer %s with parameters: dueTime: %s, period: %s, TTL: %s has expired", timerKey, req.DueTime, req.Period, req.TTL) + ttlTimer = nil break L case <-stop: // timer has been already deleted @@ -1327,17 +1331,16 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest } } else { log.Errorf("Could not find active timer %s", timerKey) + nextTimer = nil return } if reminder.TickExecuted() { log.Infof("Timer %s has been completed", timerKey) + nextTimer = nil break L } - if nextTimer.Stop() { - <-nextTimer.C() - } nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now())) } diff --git a/pkg/actors/actors_test.go b/pkg/actors/actors_test.go index 072c09e7e23..36f5f16ef0d 100644 --- a/pkg/actors/actors_test.go +++ b/pkg/actors/actors_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "net/http" + "runtime" "strconv" "strings" "sync" @@ -3313,3 +3314,80 @@ func TestPlacementSwitchIsNotTurnedOn(t *testing.T) { assert.Empty(t, testActorsRuntime.compStore.ListStateStores()) }) } + +func TestCreateTimerReminderGoroutineLeak(t *testing.T) { + testActorsRuntime := newTestActorsRuntime() + defer testActorsRuntime.Stop() + + actorType, actorID := getTestActorTypeAndID() + fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) + + testFn := func(createFn func(i int, ttl bool) error) func(t *testing.T) { + return func(t *testing.T) { + // Get the baseline goroutines + initialCount := runtime.NumGoroutine() + + // Create 10 timers/reminders with unique names + for i := 0; i < 10; i++ { + require.NoError(t, createFn(i, false)) + } + + // Create 5 timers/reminders that override the first ones + for i := 0; i < 5; i++ { + require.NoError(t, createFn(i, false)) + } + + // Create 5 timers/reminders that have TTLs + for i := 10; i < 15; i++ { + require.NoError(t, createFn(i, true)) + } + + // Advance the clock to make the timers/reminders fire + time.Sleep(200 * time.Millisecond) + testActorsRuntime.clock.Sleep(5 * time.Second) + time.Sleep(200 * time.Millisecond) + testActorsRuntime.clock.Sleep(5 * time.Second) + + // Sleep to allow for cleanup + time.Sleep(200 * time.Millisecond) + + // Get the number of goroutines again, which should be +/- 2 the initial one (we give it some buffer) + currentCount := runtime.NumGoroutine() + if currentCount >= (initialCount+2) || currentCount <= (initialCount-2) { + t.Fatalf("Current number of goroutine %[1]d is outside of range [%[2]d-2, %[2]d+2]", currentCount, initialCount) + } + } + } + + t.Run("timers", testFn(func(i int, ttl bool) error { + req := &CreateTimerRequest{ + ActorType: actorType, + ActorID: actorID, + Name: fmt.Sprintf("timer%d", i), + Data: json.RawMessage(`"data"`), + DueTime: "2s", + } + if ttl { + req.DueTime = "1s" + req.Period = "1s" + req.TTL = "2s" + } + return testActorsRuntime.CreateTimer(context.Background(), req) + })) + + t.Run("reminders", testFn(func(i int, ttl bool) error { + req := &CreateReminderRequest{ + ActorType: actorType, + ActorID: actorID, + Name: fmt.Sprintf("reminder%d", i), + Data: json.RawMessage(`"data"`), + DueTime: "2s", + } + if ttl { + req.DueTime = "1s" + req.Period = "1s" + req.TTL = "2s" + } + return testActorsRuntime.CreateReminder(context.Background(), req) + })) +}