From 200904d2d1bd8af23bcd8d99d50db225ad753bdc Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 20:33:11 +0000 Subject: [PATCH 01/10] First batch of fixes: 1. Deactivate all actors in the app if the placement disconnects 2. Actor object now records the idle time rather than the last activation time 3. Remove actors from the active actors table even if deactivation fails 4. Improved shutdown of actors object Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/actor.go | 45 ++++-- pkg/actors/actor_test.go | 20 +-- pkg/actors/actors.go | 166 ++++++++++++++++------- pkg/actors/actors_mock.go | 10 ++ pkg/actors/actors_test.go | 5 +- pkg/actors/internal/placement_service.go | 9 ++ pkg/actors/internal_actor_test.go | 6 +- pkg/actors/placement/placement.go | 24 +++- 8 files changed, 206 insertions(+), 79 deletions(-) diff --git a/pkg/actors/actor.go b/pkg/actors/actor.go index f8277649ff9..0dfbfac55e4 100644 --- a/pkg/actors/actor.go +++ b/pkg/actors/actor.go @@ -22,6 +22,7 @@ import ( "k8s.io/utils/clock" diag "github.com/dapr/dapr/pkg/diagnostics" + "github.com/dapr/kit/ptr" ) // ErrActorDisposed is the error when runtime tries to hold the lock of the disposed actor. @@ -39,11 +40,12 @@ type actor struct { // pendingActorCalls is the number of the current pending actor calls by turn-based concurrency. pendingActorCalls atomic.Int32 - // When consistent hashing tables are updated, actor runtime drains actor to rebalance actors - // across actor hosts after drainOngoingCallTimeout or until all pending actor calls are completed. - // lastUsedTime is the time when the last actor call holds lock. This is used to calculate - // the duration of ongoing calls to time out. - lastUsedTime time.Time + // idleTimeout is the configured max idle time for actors of this kind. + idleTimeout time.Duration + + // idleAt is the time after which this actor is considered to be idle. + // When the actor is locked, idleAt is updated by adding the idleTimeout to the current time. + idleAt atomic.Pointer[time.Time] // disposeLock guards disposed and disposeCh. disposeLock sync.RWMutex @@ -56,17 +58,33 @@ type actor struct { clock clock.Clock } -func newActor(actorType, actorID string, maxReentrancyDepth *int, cl clock.Clock) *actor { +func newActor(actorType, actorID string, maxReentrancyDepth *int, idleTimeout time.Duration, cl clock.Clock) *actor { if cl == nil { cl = &clock.RealClock{} } - return &actor{ - actorType: actorType, - actorID: actorID, - actorLock: NewActorLock(int32(*maxReentrancyDepth)), - clock: cl, - lastUsedTime: cl.Now().UTC(), + + a := &actor{ + actorType: actorType, + actorID: actorID, + actorLock: NewActorLock(int32(*maxReentrancyDepth)), + clock: cl, + idleTimeout: idleTimeout, } + a.idleAt.Store(ptr.Of(cl.Now().Add(idleTimeout))) + + return a +} + +// Key returns the key for this unique actor. +// This is implemented to comply with the queueable interface. +func (a *actor) Key() string { + return a.actorType + daprSeparator + a.actorID +} + +// ScheduledTime returns the time the actor becomes idle at. +// This is implemented to comply with the queueable interface. +func (a *actor) ScheduledTime() time.Time { + return *a.idleAt.Load() } // isBusy returns true when pending actor calls are ongoing. @@ -115,7 +133,8 @@ func (a *actor) lock(reentrancyID *string) error { a.unlock() return ErrActorDisposed } - a.lastUsedTime = a.clock.Now().UTC() + + a.idleAt.Store(ptr.Of(a.clock.Now().Add(a.idleTimeout))) return nil } diff --git a/pkg/actors/actor_test.go b/pkg/actors/actor_test.go index bd683b01461..2d5798951e0 100644 --- a/pkg/actors/actor_test.go +++ b/pkg/actors/actor_test.go @@ -26,7 +26,7 @@ import ( var reentrancyStackDepth = 32 func TestIsBusy(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) testActor.lock(nil) assert.Equal(t, true, testActor.isBusy()) @@ -34,12 +34,12 @@ func TestIsBusy(t *testing.T) { } func TestTurnBasedConcurrencyLocks(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) // first lock testActor.lock(nil) assert.Equal(t, true, testActor.isBusy()) - firstLockTime := testActor.lastUsedTime + firstIdleAt := *testActor.idleAt.Load() waitCh := make(chan bool) @@ -57,7 +57,7 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) { time.Sleep(10 * time.Millisecond) assert.Equal(t, int32(2), testActor.pendingActorCalls.Load()) assert.True(t, testActor.isBusy()) - assert.Equal(t, firstLockTime, testActor.lastUsedTime) + assert.Equal(t, firstIdleAt, *testActor.idleAt.Load()) // unlock the first lock testActor.unlock() @@ -70,12 +70,12 @@ func TestTurnBasedConcurrencyLocks(t *testing.T) { assert.Equal(t, int32(0), testActor.pendingActorCalls.Load()) assert.False(t, testActor.isBusy()) - assert.True(t, testActor.lastUsedTime.Sub(firstLockTime) >= 10*time.Millisecond) + assert.True(t, testActor.idleAt.Load().Sub(firstIdleAt) >= 10*time.Millisecond) } func TestDisposedActor(t *testing.T) { t.Run("not disposed", func(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) testActor.lock(nil) testActor.unlock() @@ -86,7 +86,7 @@ func TestDisposedActor(t *testing.T) { }) t.Run("disposed", func(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) testActor.lock(nil) ch := testActor.channel() @@ -102,7 +102,7 @@ func TestDisposedActor(t *testing.T) { func TestPendingActorCalls(t *testing.T) { t.Run("no pending actor call with new actor object", func(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) channelClosed := false select { @@ -117,7 +117,7 @@ func TestPendingActorCalls(t *testing.T) { }) t.Run("close channel before timeout", func(t *testing.T) { - testActor := newActor("testType", "testID", &reentrancyStackDepth, nil) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, nil) testActor.lock(nil) channelClosed := atomic.Bool{} @@ -139,7 +139,7 @@ func TestPendingActorCalls(t *testing.T) { t.Run("multiple listeners", func(t *testing.T) { clock := clocktesting.NewFakeClock(time.Now()) - testActor := newActor("testType", "testID", &reentrancyStackDepth, clock) + testActor := newActor("testType", "testID", &reentrancyStackDepth, time.Second, clock) testActor.lock(nil) nListeners := 10 diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 3a1c8776234..fc3142d83cd 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -155,17 +155,11 @@ func NewActors(opts ActorsOpts) ActorRuntime { } func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime { - remindersProvider := reminders.NewRemindersProvider(clock, internal.RemindersProviderOpts{ - StoreName: opts.StateStoreName, - Config: opts.Config.Config, - }) - a := &actorsRuntime{ appChannel: opts.AppChannel, grpcConnectionFn: opts.GRPCConnectionFn, actorsConfig: opts.Config, timers: timers.NewTimersProvider(clock), - actorsReminders: remindersProvider, tracingSpec: opts.TracingSpec, resiliency: opts.Resiliency, storeName: opts.StateStoreName, @@ -182,12 +176,19 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) ActorRuntime { closeCh: make(chan struct{}), } - a.timers.SetExecuteTimerFn(a.executeTimer) + // Init reminders + a.actorsReminders = reminders.NewRemindersProvider(a.clock, internal.RemindersProviderOpts{ + StoreName: a.storeName, + Config: a.actorsConfig.Config, + }) a.actorsReminders.SetExecuteReminderFn(a.executeReminder) a.actorsReminders.SetResiliencyProvider(a.resiliency) a.actorsReminders.SetStateStoreProviderFn(a.stateStore) a.actorsReminders.SetLookupActorFn(a.isActorLocallyHosted) + // Init timers + a.timers.SetExecuteTimerFn(a.executeTimer) + return a } @@ -244,15 +245,15 @@ func (a *actorsRuntime) Init(ctx context.Context) error { RuntimeHostname: a.actorsConfig.GetRuntimeHostname(), PodName: a.actorsConfig.Config.PodName, ActorTypes: a.actorsConfig.Config.HostedActorTypes.ListActorTypes(), - AppHealthFn: func(ctx context.Context) <-chan bool { - return a.getAppHealthCheckChan(ctx) - }, - Resiliency: a.resiliency, + Resiliency: a.resiliency, + AppHealthFn: a.getAppHealthCheckChan, AfterTableUpdateFn: func() { a.drainRebalancedActors() a.actorsReminders.OnPlacementTablesUpdated(ctx) }, }) + + a.placement.SetHaltActorFns(a.haltActor, a.haltAllActors) } a.wg.Add(1) @@ -264,7 +265,7 @@ func (a *actorsRuntime) Init(ctx context.Context) error { a.wg.Add(1) go func() { defer a.wg.Done() - a.deactivationTicker(a.actorsConfig, a.deactivateActor) + a.deactivationTicker(a.actorsConfig, a.haltActor) }() log.Infof("Actor runtime started. Actor idle timeout: %v. Actor scan interval: %v", @@ -299,32 +300,97 @@ func constructCompositeKey(keys ...string) string { return strings.Join(keys, daprSeparator) } -func (a *actorsRuntime) deactivateActor(actorType, actorID string) error { - req := invokev1.NewInvokeMethodRequest("actors/"+actorType+"/"+actorID). - WithActor(actorType, actorID). +// Halts an actor, removing it from the actors table and then deactivating it +func (a *actorsRuntime) haltActor(actorType, actorID string) error { + key := constructCompositeKey(actorType, actorID) + log.Debugf("Halting actor '%s'", key) + + // Remove the actor from the table + // This will forbit more state changes + actAny, ok := a.actorsTable.LoadAndDelete(key) + + // If nothing was loaded, the actor was probably already deactivated + if !ok || actAny == nil { + return nil + } + + act := actAny.(*actor) + for { + // wait until actor is not busy, then deactivate + if !act.isBusy() { + break + } + + a.clock.Sleep(time.Millisecond * 100) + } + + return a.deactivateActor(act) +} + +// Halts all actors +func (a *actorsRuntime) haltAllActors() error { + // Visit all currently active actors and deactivate them + errCh := make(chan error) + count := atomic.Int32{} + a.actorsTable.Range(func(key any, value any) bool { + count.Add(1) + go func(key any) { + actorKey := key.(string) + err := a.haltActor(a.getActorTypeAndIDFromKey(actorKey)) + if err != nil { + errCh <- fmt.Errorf("failed to deactivate actor '%s': %v", actorKey, err) + } + errCh <- nil + }(key) + return true + }) + + // Collect all errors, which also waits for all goroutines to return + errs := []error{} + for i := int32(0); i < count.Load(); i++ { + err := <-errCh + if err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + +func (a *actorsRuntime) deactivateActor(act *actor) error { + ctx := context.Background() + + // Delete the actor from the actor table regardless of the outcome of deactivation the actor in the app + actorKey := act.Key() + a.actorsTable.Delete(actorKey) + + req := invokev1.NewInvokeMethodRequest("actors/"+act.actorType+"/"+act.actorID). + WithActor(act.actorType, act.actorID). WithHTTPExtension(http.MethodDelete, ""). WithContentType(invokev1.JSONContentType) defer req.Close() - // TODO Propagate context. - ctx := context.TODO() - - resp, err := a.getAppChannel(actorType).InvokeMethod(ctx, req, "") + resp, err := a.getAppChannel(act.actorType).InvokeMethod(ctx, req, "") if err != nil { - diag.DefaultMonitoring.ActorDeactivationFailed(actorType, "invoke") + diag.DefaultMonitoring.ActorDeactivationFailed(act.actorType, "invoke") return err } defer resp.Close() if resp.Status().Code != http.StatusOK { - diag.DefaultMonitoring.ActorDeactivationFailed(actorType, "status_code_"+strconv.FormatInt(int64(resp.Status().Code), 10)) + diag.DefaultMonitoring.ActorDeactivationFailed(act.actorType, "status_code_"+strconv.FormatInt(int64(resp.Status().Code), 10)) body, _ := resp.RawDataFull() return fmt.Errorf("error from actor service: %s", string(body)) } - a.removeActorFromTable(actorType, actorID) - diag.DefaultMonitoring.ActorDeactivated(actorType) - log.Debugf("Deactivated actor type=%s, id=%s", actorType, actorID) + diag.DefaultMonitoring.ActorDeactivated(act.actorType) + log.Debugf("Deactivated actor '%s'", actorKey) + + // This uses a background context as it should be unrelated from the caller's context - once the actor is deactivated, it should be reported + err = a.placement.ReportActorDeactivation(context.Background(), act.actorType, act.actorID) + if err != nil { + return fmt.Errorf("failed to report actor deactivation for actor '%s': %w", actorKey, err) + } return nil } @@ -338,9 +404,7 @@ func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) { return typ, id } -type deactivateFn = func(actorType string, actorID string) error - -func (a *actorsRuntime) deactivationTicker(configuration Config, deactivateFn deactivateFn) { +func (a *actorsRuntime) deactivationTicker(configuration Config, haltFn internal.HaltActorFn) { ticker := a.clock.NewTicker(configuration.ActorDeactivationScanInterval) ch := ticker.C() defer ticker.Stop() @@ -348,20 +412,19 @@ func (a *actorsRuntime) deactivationTicker(configuration Config, deactivateFn de for { select { case t := <-ch: - a.actorsTable.Range(func(key, value interface{}) bool { + a.actorsTable.Range(func(key, value any) bool { actorInstance := value.(*actor) if actorInstance.isBusy() { return true } - durationPassed := t.Sub(actorInstance.lastUsedTime) - if durationPassed >= configuration.GetIdleTimeoutForType(actorInstance.actorType) { + if !t.Before(actorInstance.ScheduledTime()) { a.wg.Add(1) go func(actorKey string) { defer a.wg.Done() actorType, actorID := a.getActorTypeAndIDFromKey(actorKey) - err := deactivateFn(actorType, actorID) + err := haltFn(actorType, actorID) if err != nil { log.Errorf("failed to deactivate actor %s: %s", actorKey, err) } @@ -460,7 +523,13 @@ func (a *actorsRuntime) getOrCreateActor(act *internalv1pb.Actor) *actor { // call newActor, but this is trivial. val, ok := a.actorsTable.Load(key) if !ok { - val, _ = a.actorsTable.LoadOrStore(key, newActor(act.ActorType, act.ActorId, a.actorsConfig.GetReentrancyForType(act.ActorType).MaxStackDepth, a.clock)) + actorInstance := newActor( + act.ActorType, act.ActorId, + a.actorsConfig.GetReentrancyForType(act.ActorType).MaxStackDepth, + a.actorsConfig.GetIdleTimeoutForType(act.ActorType), + a.clock, + ) + val, _ = a.actorsTable.LoadOrStore(key, actorInstance) } return val.(*actor) @@ -750,7 +819,7 @@ func (a *actorsRuntime) drainRebalancedActors() { a.actorsTable.Range(func(key any, value any) bool { wg.Add(1) - go func(key any, value any, wg *sync.WaitGroup) { + go func(key any, value any) { defer wg.Done() // for each actor, deactivate if no longer hosted locally actorKey := key.(string) @@ -764,38 +833,28 @@ func (a *actorsRuntime) drainRebalancedActors() { // each item in reminders contain a struct with some metadata + the actual reminder struct a.actorsReminders.DrainRebalancedReminders(actorType, actorID) - actor := value.(*actor) + act := value.(*actor) if a.actorsConfig.GetDrainRebalancedActorsForType(actorType) { // wait until actor isn't busy or timeout hits - if actor.isBusy() { + if act.isBusy() { select { case <-a.clock.After(a.actorsConfig.Config.DrainOngoingCallTimeout): break - case <-actor.channel(): + case <-act.channel(): // if a call comes in from the actor for state changes, that's still allowed break } } } - // don't allow state changes - a.actorsTable.Delete(key) - diag.DefaultMonitoring.ActorRebalanced(actorType) - for { - // wait until actor is not busy, then deactivate - if !actor.isBusy() { - err := a.deactivateActor(actorType, actorID) - if err != nil { - log.Errorf("failed to deactivate actor %s: %s", actorKey, err) - } - break - } - a.clock.Sleep(time.Millisecond * 500) + err := a.haltActor(actorType, actorID) + if err != nil { + log.Errorf("Failed to deactivate actor '%s': %v", actorKey, err) } } - }(key, value, &wg) + }(key, value) return true }) @@ -1008,9 +1067,14 @@ func (a *actorsRuntime) Close() error { if a.closed.CompareAndSwap(false, true) { defer close(a.closeCh) + errs := []error{} if a.placement != nil { - return a.placement.Close() + err := a.placement.Close() + if err != nil { + errs = append(errs, fmt.Errorf("failed to close placement service: %w", err)) + } } + return errors.Join(errs...) } return nil diff --git a/pkg/actors/actors_mock.go b/pkg/actors/actors_mock.go index 84b0b958c47..2680917df6e 100644 --- a/pkg/actors/actors_mock.go +++ b/pkg/actors/actors_mock.go @@ -89,6 +89,16 @@ func (*MockPlacement) WaitUntilReady(ctx context.Context) error { return nil } +// ReportActorDeactivation implements implements internal.PlacementService +func (*MockPlacement) ReportActorDeactivation(ctx context.Context, actorType, actorID string) error { + return nil +} + +// SetHaltActorFns implements implements internal.PlacementService +func (*MockPlacement) SetHaltActorFns(haltFn internal.HaltActorFn, haltAllFn internal.HaltAllActorsFn) { + // Nop +} + // MockActors is an autogenerated mock type for the Actors type type MockActors struct { mock.Mock diff --git a/pkg/actors/actors_test.go b/pkg/actors/actors_test.go index 26e0d93642d..7b0638baef1 100644 --- a/pkg/actors/actors_test.go +++ b/pkg/actors/actors_test.go @@ -300,7 +300,8 @@ func fakeStore() state.Store { func fakeCallAndActivateActor(actors *actorsRuntime, actorType, actorID string, clock kclock.WithTicker) { actorKey := constructCompositeKey(actorType, actorID) - actors.actorsTable.LoadOrStore(actorKey, newActor(actorType, actorID, &reentrancyStackDepth, clock)) + act := newActor(actorType, actorID, &reentrancyStackDepth, actors.actorsConfig.GetIdleTimeoutForType(actorType), clock) + actors.actorsTable.LoadOrStore(actorKey, act) } func deactivateActorWithDuration(testActorsRuntime *actorsRuntime, actorType, actorID string) <-chan struct{} { @@ -810,7 +811,7 @@ func TestCallLocalActor(t *testing.T) { defer testActorsRuntime.Close() actorKey := constructCompositeKey(testActorType, testActorID) - act := newActor(testActorType, testActorID, &reentrancyStackDepth, testActorsRuntime.clock) + act := newActor(testActorType, testActorID, &reentrancyStackDepth, 2*time.Second, testActorsRuntime.clock) // add test actor testActorsRuntime.actorsTable.LoadOrStore(actorKey, act) diff --git a/pkg/actors/internal/placement_service.go b/pkg/actors/internal/placement_service.go index 8edb34aa3be..659df109de1 100644 --- a/pkg/actors/internal/placement_service.go +++ b/pkg/actors/internal/placement_service.go @@ -19,6 +19,12 @@ import ( "time" ) +// HaltActorFn is the signature of the function invoked when the placement service requires an actor to be deactivated. +type HaltActorFn = func(actorType string, actorID string) error + +// HaltAllActorsFn is the signature of the function invoked when the placement service requires all actors to be deactivated. +type HaltAllActorsFn = func() error + // PlacementService allows for interacting with the actor placement service. type PlacementService interface { io.Closer @@ -27,6 +33,9 @@ type PlacementService interface { WaitUntilReady(ctx context.Context) error LookupActor(ctx context.Context, req LookupActorRequest) (LookupActorResponse, error) AddHostedActorType(actorType string, idleTimeout time.Duration) error + ReportActorDeactivation(ctx context.Context, actorType, actorID string) error + + SetHaltActorFns(haltFn HaltActorFn, haltAllFn HaltAllActorsFn) } // LookupActorRequest is the request for LookupActor. diff --git a/pkg/actors/internal_actor_test.go b/pkg/actors/internal_actor_test.go index 5d42b75fe74..59cfb34747f 100644 --- a/pkg/actors/internal_actor_test.go +++ b/pkg/actors/internal_actor_test.go @@ -241,7 +241,11 @@ func TestInternalActorDeactivation(t *testing.T) { assert.NoError(t, err) // Deactivate the actor, ensuring no errors and that the correct actor ID was provided. - err = testActorRuntime.deactivateActor(testActorType, testActorID) + actAny, ok := testActorRuntime.actorsTable.Load(constructCompositeKey(testActorType, testActorID)) + require.True(t, ok) + act, ok := actAny.(*actor) + require.True(t, ok) + err = testActorRuntime.deactivateActor(act) require.NoError(t, err) if assert.Len(t, ia.DeactivationCalls, 1) { assert.Equal(t, testActorID, ia.DeactivationCalls[0]) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index 19fd0f78467..5353a40a2d3 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -91,6 +91,9 @@ type actorPlacement struct { // such as draining actors and resetting reminders. afterTableUpdateFn func() + // callback invoked to halt all active actors + haltAllActorsFn internal.HaltAllActorsFn + // shutdown is the flag when runtime is being shutdown. shutdown atomic.Bool // shutdownConnLoop is the wait group to wait until all connection loop are done @@ -246,9 +249,15 @@ func (p *actorPlacement) Start(ctx context.Context) error { if !p.appHealthy.Load() { // app is unresponsive, close the stream and disconnect from the placement service. // Then Placement will remove this host from the member list. - log.Debug("disconnecting from placement service by the unhealthy app.") + log.Debug("Disconnecting from placement service by the unhealthy app") p.client.disconnect() + if p.haltAllActorsFn != nil { + haltErr := p.haltAllActorsFn() + if haltErr != nil { + log.Errorf("Failed to deactivate all actors: %v", haltErr) + } + } continue } @@ -266,7 +275,7 @@ func (p *actorPlacement) Start(ctx context.Context) error { err := p.client.send(&host) if err != nil { diag.DefaultMonitoring.ActorStatusReportFailed("send", "status") - log.Debugf("failed to report status to placement service : %v", err) + log.Errorf("Failed to report status to placement service : %v", err) } // No delay if stream connection is not alive. @@ -480,6 +489,17 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { log.Infof("Placement tables updated, version: %s", in.GetVersion()) } +func (p *actorPlacement) ReportActorDeactivation(ctx context.Context, actorType, actorID string) error { + // Nop in this implementation + return nil +} + +func (p *actorPlacement) SetHaltActorFns(haltFn internal.HaltActorFn, haltAllFn internal.HaltAllActorsFn) { + // haltFn isn't used in this implementation + p.haltAllActorsFn = haltAllFn + return +} + // addDNSResolverPrefix add the `dns://` prefix to the given addresses func addDNSResolverPrefix(addr []string) []string { resolvers := make([]string, 0, len(addr)) From 98a7b4daa496d36effb4ad2073ecbc46dd586738 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 20:37:07 +0000 Subject: [PATCH 02/10] Fixed: placement waits for the first table dissemination before reporting readiness Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/placement/placement.go | 64 ++++++-- pkg/actors/placement/placement_test.go | 204 +++++++++++++++++-------- 2 files changed, 186 insertions(+), 82 deletions(-) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index 5353a40a2d3..86043b1a068 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -75,11 +75,11 @@ type actorPlacement struct { placementTables *hashing.ConsistentHashTables // placementTableLock is the lock for placementTables. placementTableLock sync.RWMutex + // hasPlacementTablesCh is closed when the placement tables have been received. + hasPlacementTablesCh chan struct{} // unblockSignal is the channel to unblock table locking. unblockSignal chan struct{} - // tableIsBlocked is the status of table lock. - tableIsBlocked atomic.Bool // operationUpdateLock is the lock for three stage commit. operationUpdateLock sync.Mutex @@ -130,6 +130,7 @@ func NewActorPlacement(opts ActorPlacementOpts) internal.PlacementService { client: newPlacementClient(getGrpcOptsGetter(servers, opts.Security)), placementTables: &hashing.ConsistentHashTables{Entries: make(map[string]*hashing.Consistent)}, + unblockSignal: make(chan struct{}, 1), appHealthFn: opts.AppHealthFn, afterTableUpdateFn: opts.AfterTableUpdateFn, closeCh: make(chan struct{}), @@ -156,6 +157,7 @@ func (p *actorPlacement) Start(ctx context.Context) error { p.serverIndex.Store(0) p.shutdown.Store(false) p.appHealthy.Store(true) + p.hasPlacementTablesCh = make(chan struct{}) if !p.establishStreamConn(ctx) { return nil @@ -252,6 +254,10 @@ func (p *actorPlacement) Start(ctx context.Context) error { log.Debug("Disconnecting from placement service by the unhealthy app") p.client.disconnect() + p.placementTableLock.Lock() + p.placementTables = nil + p.hasPlacementTablesCh = make(chan struct{}) + p.placementTableLock.Unlock() if p.haltAllActorsFn != nil { haltErr := p.haltAllActorsFn() if haltErr != nil { @@ -303,11 +309,24 @@ func (p *actorPlacement) Close() error { // WaitUntilReady waits until placement table is until table lock is unlocked. func (p *actorPlacement) WaitUntilReady(ctx context.Context) error { - if !p.tableIsBlocked.Load() { + p.placementTableLock.RLock() + hasTablesCh := p.hasPlacementTablesCh + p.placementTableLock.RUnlock() + + select { + case p.unblockSignal <- struct{}{}: + <-p.unblockSignal + // continue + case <-ctx.Done(): + return ctx.Err() + } + + if hasTablesCh == nil { return nil } + select { - case <-p.unblockSignal: + case <-hasTablesCh: return nil case <-ctx.Done(): return ctx.Err() @@ -386,7 +405,7 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b if err != nil { if !logFailureShown { - log.Debugf("error connecting to placement service (will retry to connect in background): %v", err) + log.Debugf("Error connecting to placement service (will retry to connect in background): %v", err) // Don't show the debug log more than once per each reconnection attempt logFailureShown = true } @@ -395,7 +414,7 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b continue } - log.Debug("established connection to placement service at " + p.client.clientConn.Target()) + log.Debug("Established connection to placement service at " + p.client.clientConn.Target()) return true } @@ -410,12 +429,12 @@ func (p *actorPlacement) onPlacementError(err error) { if ok && s.Code() == codes.FailedPrecondition { p.serverIndex.Store((p.serverIndex.Load() + 1) % int32(len(p.serverAddr))) } else { - log.Debugf("disconnected from placement: %v", err) + log.Debugf("Disconnected from placement: %v", err) } } func (p *actorPlacement) onPlacementOrder(in *v1pb.PlacementOrder) { - log.Debugf("placement order received: %s", in.Operation) + log.Debugf("Placement order received: %s", in.Operation) diag.DefaultMonitoring.ActorPlacementTableOperationReceived(in.Operation) // lock all incoming calls when an updated table arrives @@ -445,13 +464,20 @@ func (p *actorPlacement) onPlacementOrder(in *v1pb.PlacementOrder) { } func (p *actorPlacement) blockPlacements() { - p.unblockSignal = make(chan struct{}) - p.tableIsBlocked.Store(true) + select { + case p.unblockSignal <- struct{}{}: + // Now blocked + default: + // Was already blocked + } } func (p *actorPlacement) unblockPlacements() { - if p.tableIsBlocked.CompareAndSwap(true, false) { - close(p.unblockSignal) + select { + case <-p.unblockSignal: + // Now unblocked + default: + // Was already unblocked } } @@ -465,9 +491,11 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { return } - tables := &hashing.ConsistentHashTables{Entries: make(map[string]*hashing.Consistent)} + tables := &hashing.ConsistentHashTables{ + Entries: make(map[string]*hashing.Consistent, len(in.Entries)), + } for k, v := range in.Entries { - loadMap := map[string]*hashing.Host{} + loadMap := make(map[string]*hashing.Host, len(v.LoadMap)) for lk, lv := range v.LoadMap { loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port) } @@ -477,6 +505,10 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { p.placementTables = tables p.placementTables.Version = in.Version updated = true + if p.hasPlacementTablesCh != nil { + close(p.hasPlacementTablesCh) + p.hasPlacementTablesCh = nil + } }() if !updated { @@ -484,7 +516,9 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { } // May call LookupActor inside, so should not do this with placementTableLock locked. - p.afterTableUpdateFn() + if p.afterTableUpdateFn != nil { + p.afterTableUpdateFn() + } log.Infof("Placement tables updated, version: %s", in.GetVersion()) } diff --git a/pkg/actors/placement/placement_test.go b/pkg/actors/placement/placement_test.go index a685c187759..efab1c918fe 100644 --- a/pkg/actors/placement/placement_test.go +++ b/pkg/actors/placement/placement_test.go @@ -69,15 +69,14 @@ func TestPlacementStream_RoundRobin(t *testing.T) { } testPlacement := NewActorPlacement(ActorPlacementOpts{ - ServerAddrs: address, - AppID: "testAppID", - RuntimeHostname: "127.0.0.1:1000", - PodName: "testPodName", - ActorTypes: []string{"actorOne", "actorTwo"}, - AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, - AfterTableUpdateFn: func() {}, - Security: testSecurity(t), - Resiliency: resiliency.New(logger.NewLogger("test")), + ServerAddrs: address, + AppID: "testAppID", + RuntimeHostname: "127.0.0.1:1000", + PodName: "testPodName", + ActorTypes: []string{"actorOne", "actorTwo"}, + AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, + Security: testSecurity(t), + Resiliency: resiliency.New(logger.NewLogger("test")), }).(*actorPlacement) t.Run("found leader placement in a round robin way", func(t *testing.T) { @@ -126,15 +125,14 @@ func TestAppHealthyStatus(t *testing.T) { appHealthCh := make(chan bool) testPlacement := NewActorPlacement(ActorPlacementOpts{ - ServerAddrs: []string{address}, - AppID: "testAppID", - RuntimeHostname: "127.0.0.1:1000", - PodName: "testPodName", - ActorTypes: []string{"actorOne", "actorTwo"}, - AppHealthFn: func(ctx context.Context) <-chan bool { return appHealthCh }, - AfterTableUpdateFn: func() {}, - Security: testSecurity(t), - Resiliency: resiliency.New(logger.NewLogger("test")), + ServerAddrs: []string{address}, + AppID: "testAppID", + RuntimeHostname: "127.0.0.1:1000", + PodName: "testPodName", + ActorTypes: []string{"actorOne", "actorTwo"}, + AppHealthFn: func(ctx context.Context) <-chan bool { return appHealthCh }, + Security: testSecurity(t), + Resiliency: resiliency.New(logger.NewLogger("test")), }).(*actorPlacement) // act @@ -175,7 +173,14 @@ func TestOnPlacementOrder(t *testing.T) { testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{ Operation: "lock", }) - assert.True(t, testPlacement.tableIsBlocked.Load()) + + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + t.Fatal("Should be blocked") + default: + // All good + } }) t.Run("update operation", func(t *testing.T) { @@ -207,25 +212,39 @@ func TestOnPlacementOrder(t *testing.T) { testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{ Operation: "unlock", }) - assert.False(t, testPlacement.tableIsBlocked.Load()) + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + // All good + default: + t.Fatal("Should not have been blocked") + } }) } func TestWaitUntilPlacementTableIsReady(t *testing.T) { testPlacement := NewActorPlacement(ActorPlacementOpts{ - ServerAddrs: []string{}, - AppID: "testAppID", - RuntimeHostname: "127.0.0.1:1000", - PodName: "testPodName", - ActorTypes: []string{"actorOne", "actorTwo"}, - AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, - AfterTableUpdateFn: func() {}, - Security: testSecurity(t), - Resiliency: resiliency.New(logger.NewLogger("test")), + ServerAddrs: []string{}, + AppID: "testAppID", + RuntimeHostname: "127.0.0.1:1000", + PodName: "testPodName", + ActorTypes: []string{"actorOne", "actorTwo"}, + AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, + Security: testSecurity(t), + Resiliency: resiliency.New(logger.NewLogger("test")), }).(*actorPlacement) + // Set the hasPlacementTablesCh channel to nil for the first tests, indicating that the placement tables already exist + testPlacement.hasPlacementTablesCh = nil + t.Run("already unlocked", func(t *testing.T) { - require.False(t, testPlacement.tableIsBlocked.Load()) + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + // All good + default: + t.Fatal("Should not have been blocked") + } err := testPlacement.WaitUntilReady(context.Background()) assert.NoError(t, err) @@ -234,16 +253,19 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) { t.Run("wait until ready", func(t *testing.T) { testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "lock"}) - testSuccessCh := make(chan struct{}) + testSuccessCh := make(chan error) go func() { - err := testPlacement.WaitUntilReady(context.Background()) - if assert.NoError(t, err) { - testSuccessCh <- struct{}{} - } + testSuccessCh <- testPlacement.WaitUntilReady(context.Background()) }() time.Sleep(50 * time.Millisecond) - require.True(t, testPlacement.tableIsBlocked.Load()) + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + t.Fatal("Should be blocked") + default: + // All good + } // unlock testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "unlock"}) @@ -251,28 +273,37 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) { // ensure that it is unlocked select { case <-time.After(500 * time.Millisecond): - t.Fatal("placement table not unlocked in 500ms") - case <-testSuccessCh: - // all good + t.Fatal("Placement table not unlocked in 500ms") + case err := <-testSuccessCh: + require.NoError(t, err) } - assert.False(t, testPlacement.tableIsBlocked.Load()) + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + // All good + default: + t.Fatal("Should not have been blocked") + } }) t.Run("abort on context canceled", func(t *testing.T) { testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "lock"}) - testSuccessCh := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) + testSuccessCh := make(chan error) go func() { - err := testPlacement.WaitUntilReady(ctx) - if assert.ErrorIs(t, err, context.Canceled) { - testSuccessCh <- struct{}{} - } + testSuccessCh <- testPlacement.WaitUntilReady(ctx) }() time.Sleep(50 * time.Millisecond) - require.True(t, testPlacement.tableIsBlocked.Load()) + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + t.Fatal("Should be blocked") + default: + // All good + } // cancel context cancel() @@ -281,25 +312,62 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) { select { case <-time.After(500 * time.Millisecond): t.Fatal("did not return in 500ms") - case <-testSuccessCh: + case err := <-testSuccessCh: + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + } + + select { + case testPlacement.unblockSignal <- struct{}{}: + <-testPlacement.unblockSignal + t.Fatal("Should be blocked") + default: + // All good + } + + // Unblock for the next test + <-testPlacement.unblockSignal + }) + + t.Run("blocks until tables have been received", func(t *testing.T) { + hasPlacementTablesCh := make(chan struct{}) + testPlacement.hasPlacementTablesCh = hasPlacementTablesCh + + testSuccessCh := make(chan error) + go func() { + testSuccessCh <- testPlacement.WaitUntilReady(context.Background()) + }() + + // No signal for now + select { + case <-time.After(500 * time.Millisecond): // all good + case <-testSuccessCh: + t.Fatal("Received an unexpected signal") } - assert.True(t, testPlacement.tableIsBlocked.Load()) + // Close the channel + close(hasPlacementTablesCh) + + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("did not return in 500ms") + case err := <-testSuccessCh: + require.NoError(t, err) + } }) } func TestLookupActor(t *testing.T) { testPlacement := NewActorPlacement(ActorPlacementOpts{ - ServerAddrs: []string{}, - AppID: "testAppID", - RuntimeHostname: "127.0.0.1:1000", - PodName: "testPodName", - ActorTypes: []string{"actorOne", "actorTwo"}, - AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, - AfterTableUpdateFn: func() {}, - Security: testSecurity(t), - Resiliency: resiliency.New(logger.NewLogger("test")), + ServerAddrs: []string{}, + AppID: "testAppID", + RuntimeHostname: "127.0.0.1:1000", + PodName: "testPodName", + ActorTypes: []string{"actorOne", "actorTwo"}, + AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, + Security: testSecurity(t), + Resiliency: resiliency.New(logger.NewLogger("test")), }).(*actorPlacement) t.Run("Placement table is unset", func(t *testing.T) { @@ -346,18 +414,20 @@ func TestLookupActor(t *testing.T) { func TestConcurrentUnblockPlacements(t *testing.T) { testPlacement := NewActorPlacement(ActorPlacementOpts{ - ServerAddrs: []string{}, - AppID: "testAppID", - RuntimeHostname: "127.0.0.1:1000", - PodName: "testPodName", - ActorTypes: []string{"actorOne", "actorTwo"}, - AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, - AfterTableUpdateFn: func() {}, - Security: testSecurity(t), - Resiliency: resiliency.New(logger.NewLogger("test")), + ServerAddrs: []string{}, + AppID: "testAppID", + RuntimeHostname: "127.0.0.1:1000", + PodName: "testPodName", + ActorTypes: []string{"actorOne", "actorTwo"}, + AppHealthFn: func(ctx context.Context) <-chan bool { return nil }, + Security: testSecurity(t), + Resiliency: resiliency.New(logger.NewLogger("test")), }).(*actorPlacement) - t.Run("concurrent_unlock", func(t *testing.T) { + // Set the hasPlacementTablesCh channel to nil for the first tests, indicating that the placement tables already exist + testPlacement.hasPlacementTablesCh = nil + + t.Run("concurrent unlock", func(t *testing.T) { for i := 0; i < 10000; i++ { testPlacement.blockPlacements() wg := sync.WaitGroup{} From 5590dcea1b1f8340124636e316c57421bdb9e186 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 21:29:10 +0000 Subject: [PATCH 03/10] Some improvements to memory efficiency and tests Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/placement/placement.go | 23 +++++++++++-------- pkg/http/api_test.go | 2 ++ pkg/placement/placement.go | 4 ++-- .../framework/process/placement/placement.go | 10 ++++++++ 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index 86043b1a068..343348332a9 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "golang.org/x/exp/maps" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -157,7 +158,7 @@ func (p *actorPlacement) Start(ctx context.Context) error { p.serverIndex.Store(0) p.shutdown.Store(false) p.appHealthy.Store(true) - p.hasPlacementTablesCh = make(chan struct{}) + p.resetPlacementTables() if !p.establishStreamConn(ctx) { return nil @@ -255,8 +256,7 @@ func (p *actorPlacement) Start(ctx context.Context) error { p.client.disconnect() p.placementTableLock.Lock() - p.placementTables = nil - p.hasPlacementTablesCh = make(chan struct{}) + p.resetPlacementTables() p.placementTableLock.Unlock() if p.haltAllActorsFn != nil { haltErr := p.haltAllActorsFn() @@ -481,6 +481,14 @@ func (p *actorPlacement) unblockPlacements() { } } +// Resets the placement tables. +// Note that this method should be invoked by a caller that owns a lock. +func (p *actorPlacement) resetPlacementTables() { + p.hasPlacementTablesCh = make(chan struct{}) + maps.Clear(p.placementTables.Entries) + p.placementTables.Version = "" +} + func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { updated := false func() { @@ -491,19 +499,16 @@ func (p *actorPlacement) updatePlacements(in *v1pb.PlacementTables) { return } - tables := &hashing.ConsistentHashTables{ - Entries: make(map[string]*hashing.Consistent, len(in.Entries)), - } + maps.Clear(p.placementTables.Entries) + p.placementTables.Version = in.Version for k, v := range in.Entries { loadMap := make(map[string]*hashing.Host, len(v.LoadMap)) for lk, lv := range v.LoadMap { loadMap[lk] = hashing.NewHost(lv.Name, lv.Id, lv.Load, lv.Port) } - tables.Entries[k] = hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap) + p.placementTables.Entries[k] = hashing.NewFromExisting(v.Hosts, v.SortedSet, loadMap) } - p.placementTables = tables - p.placementTables.Version = in.Version updated = true if p.hasPlacementTablesCh != nil { close(p.hasPlacementTablesCh) diff --git a/pkg/http/api_test.go b/pkg/http/api_test.go index 04cad69d4e9..a8fdfa50a2e 100644 --- a/pkg/http/api_test.go +++ b/pkg/http/api_test.go @@ -2665,6 +2665,8 @@ func TestV1Beta1Workflow(t *testing.T) { Resiliency: resiliencyConfig, }, } + testAPI.universal.InitUniversalAPI() + testAPI.universal.SetActorsInitDone() fakeServer.StartServer(testAPI.constructWorkflowEndpoints(), nil) diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 63c38916180..437b835f204 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -152,12 +152,12 @@ func (p *Service) Run(ctx context.Context, port string, sec security.Handler) er placementv1pb.RegisterPlacementServer(grpcServer, p) - log.Infof("starting placement service started on port %d", serverListener.Addr().(*net.TCPAddr).Port) + log.Infof("Placement service started on port %d", serverListener.Addr().(*net.TCPAddr).Port) errCh := make(chan error) go func() { errCh <- grpcServer.Serve(serverListener) - log.Info("placement service stopped") + log.Info("Placement service stopped") }() <-ctx.Done() diff --git a/tests/integration/framework/process/placement/placement.go b/tests/integration/framework/process/placement/placement.go index 34f50f6f606..c3a1149465a 100644 --- a/tests/integration/framework/process/placement/placement.go +++ b/tests/integration/framework/process/placement/placement.go @@ -18,6 +18,7 @@ import ( "fmt" "net/http" "strconv" + "sync/atomic" "testing" "time" @@ -34,6 +35,7 @@ import ( type Placement struct { exec process.Interface freeport *util.FreePort + running atomic.Bool id string port int @@ -92,11 +94,19 @@ func New(t *testing.T, fopts ...Option) *Placement { } func (p *Placement) Run(t *testing.T, ctx context.Context) { + if !p.running.CompareAndSwap(false, true) { + t.Fatal("Process is already running") + } + p.freeport.Free(t) p.exec.Run(t, ctx) } func (p *Placement) Cleanup(t *testing.T) { + if !p.running.CompareAndSwap(true, false) { + return + } + p.exec.Cleanup(t) } From 60e3e8dffaee307dea47512bdc1eb6ba9145e56b Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 22:35:05 +0000 Subject: [PATCH 04/10] Halt all actors if connection to placement fails Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/placement/placement.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index 343348332a9..869c9a062cc 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -380,6 +380,11 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b logFailureShown := false for !p.shutdown.Load() { + // Do not retry to connect if context is canceled + if ctx.Err() != nil { + return false + } + // Stop reconnecting to placement until app is healthy. if !p.appHealthy.Load() { // We are not using an exponential backoff here because we haven't begun to establish connections yet @@ -387,11 +392,6 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b continue } - // Do not retry to connect if context is canceled - if ctx.Err() != nil { - return false - } - serverAddr := p.serverAddr[p.serverIndex.Load()] if !logFailureShown { @@ -409,7 +409,17 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b // Don't show the debug log more than once per each reconnection attempt logFailureShown = true } + + // Try a different instance of the placement service p.serverIndex.Store((p.serverIndex.Load() + 1) % int32(len(p.serverAddr))) + + // Halt all active actors, then reset the placement tables + if p.haltAllActorsFn != nil { + p.haltAllActorsFn() + } + p.resetPlacementTables() + + // Sleep with an exponential backoff time.Sleep(bo.NextBackOff()) continue } @@ -484,6 +494,9 @@ func (p *actorPlacement) unblockPlacements() { // Resets the placement tables. // Note that this method should be invoked by a caller that owns a lock. func (p *actorPlacement) resetPlacementTables() { + if p.hasPlacementTablesCh != nil { + close(p.hasPlacementTablesCh) + } p.hasPlacementTablesCh = make(chan struct{}) maps.Clear(p.placementTables.Entries) p.placementTables.Version = "" From facb35f202b85c01199809f7fc86940a2f9cec3d Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 22:35:22 +0000 Subject: [PATCH 05/10] Added test for deactivation Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../actors/healthz/deactivate-on-unhealthy.go | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go diff --git a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go new file mode 100644 index 00000000000..214aad32757 --- /dev/null +++ b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go @@ -0,0 +1,158 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthz + +import ( + "context" + "fmt" + "io" + "log" + "net/http" + "strconv" + "sync/atomic" + "testing" + "time" + + chi "github.com/go-chi/chi/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + prochttp "github.com/dapr/dapr/tests/integration/framework/process/http" + "github.com/dapr/dapr/tests/integration/framework/process/placement" + "github.com/dapr/dapr/tests/integration/framework/util" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(deactivateOnUnhealthy)) +} + +// deactivateOnUnhealthy ensures that all active actors are deactivated if the app is reported as unhealthy. +type deactivateOnUnhealthy struct { + daprd *daprd.Daprd + place *placement.Placement + isHealthy atomic.Bool + invokedActorsCh chan string + deactivatedActorsCh chan string +} + +func (h *deactivateOnUnhealthy) Setup(t *testing.T) []framework.Option { + h.isHealthy.Store(true) + h.invokedActorsCh = make(chan string, 2) + h.deactivatedActorsCh = make(chan string, 2) + + r := chi.NewRouter() + r.Get("/dapr/config", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"entities": ["myactortype"]}`)) + }) + r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { + if h.isHealthy.Load() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + }) + r.Get("/", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`OK`)) + }) + r.Delete("/actors/{actorType}/{actorId}", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + act := fmt.Sprintf("%s/%s", chi.URLParam(r, "actorType"), chi.URLParam(r, "actorId")) + log.Printf("Received deactivation for actor %s", act) + h.deactivatedActorsCh <- act + }) + r.Put("/actors/{actorType}/{actorId}/method/foo", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`bar`)) + act := fmt.Sprintf("%s/%s", chi.URLParam(r, "actorType"), chi.URLParam(r, "actorId")) + log.Printf("Received invocation for actor %s", act) + h.invokedActorsCh <- act + }) + + srv := prochttp.New(t, prochttp.WithHandler(r)) + h.place = placement.New(t) + h.daprd = daprd.New(t, daprd.WithResourceFiles(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: mystore +spec: + type: state.in-memory + version: v1 + metadata: + - name: actorStateStore + value: true +`), + daprd.WithPlacementAddresses("localhost:"+strconv.Itoa(h.place.Port())), + daprd.WithAppProtocol("http"), + daprd.WithAppPort(srv.Port()), + ) + + return []framework.Option{ + framework.WithProcesses(h.place, srv, h.daprd), + } +} + +func (h *deactivateOnUnhealthy) Run(t *testing.T, ctx context.Context) { + h.place.WaitUntilRunning(t, ctx) + h.daprd.WaitUntilRunning(t, ctx) + + client := util.HTTPClient(t) + + // Sleep a bit at the beginning as it takes time for the actor host to be registered + time.Sleep(500 * time.Millisecond) + + // Activate 2 actors + for i := 0; i < 2; i++ { + assert.EventuallyWithT(t, func(t *assert.CollectT) { + daprdURL := fmt.Sprintf("http://localhost:%d/v1.0/actors/myactortype/myactor%d/method/foo", h.daprd.HTTPPort(), i) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, daprdURL, nil) + assert.NoError(t, err) + resp, err := client.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equalf(t, http.StatusOK, resp.StatusCode, "Response body: %v", string(body)) + }, 10*time.Second, 100*time.Millisecond, "actor not ready") + } + + // Validate invocations + invoked := make([]string, 2) + for i := 0; i < 2; i++ { + invoked[i] = <-h.invokedActorsCh + } + slices.Sort(invoked) + assert.Equal(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, invoked) + + // Terminate the placement process to simulate a failure + h.place.Cleanup(t) + + // Ensure actors get deactivated + deactivated := make([]string, 0, 2) + require.Eventually(t, func() bool { + select { + case act := <-h.deactivatedActorsCh: + deactivated = append(deactivated, act) + case <-time.After(50 * time.Millisecond): + return false + } + + return len(deactivated) == 2 + }, 5*time.Second, 50*time.Millisecond, "Did not receive 2 deactivation signals") + slices.Sort(deactivated) + assert.Equal(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, deactivated) +} From 9d435206809cd807b0d55b3a6242d3c65457b385 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 23:24:30 +0000 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=92=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- tests/integration/suite/actors/grpc/ttl.go | 2 +- tests/integration/suite/actors/http/ttl.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/suite/actors/grpc/ttl.go b/tests/integration/suite/actors/grpc/ttl.go index 7f9da08ff92..9cc0298f621 100644 --- a/tests/integration/suite/actors/grpc/ttl.go +++ b/tests/integration/suite/actors/grpc/ttl.go @@ -65,7 +65,7 @@ spec: w.Write([]byte(`{"entities": ["myactortype"]}`)) }) handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("")) + w.WriteHeader(http.StatusOK) }) handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`OK`)) diff --git a/tests/integration/suite/actors/http/ttl.go b/tests/integration/suite/actors/http/ttl.go index e06a882ba9b..7a4595fee6d 100644 --- a/tests/integration/suite/actors/http/ttl.go +++ b/tests/integration/suite/actors/http/ttl.go @@ -62,7 +62,7 @@ spec: w.Write([]byte(`{"entities": ["myactortype"]}`)) }) handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("")) + w.WriteHeader(http.StatusOK) }) handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`OK`)) From fbf6b5100f726458f36a8ca5b23036bc3ad6ce0a Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Tue, 31 Oct 2023 00:07:40 +0000 Subject: [PATCH 07/10] =?UTF-8?q?=F0=9F=92=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/placement/placement.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index 869c9a062cc..d056d3889fa 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -392,6 +392,11 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b continue } + // Check for context validity again, after sleeping + if ctx.Err() != nil { + return false + } + serverAddr := p.serverAddr[p.serverIndex.Load()] if !logFailureShown { From 730b009e2beb4443e56775b6525aec6dd2788090 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 30 Oct 2023 17:18:02 -0700 Subject: [PATCH 08/10] Update tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> --- .../suite/actors/healthz/deactivate-on-unhealthy.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go index 214aad32757..52c6c80361a 100644 --- a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go +++ b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go @@ -112,9 +112,6 @@ func (h *deactivateOnUnhealthy) Run(t *testing.T, ctx context.Context) { client := util.HTTPClient(t) - // Sleep a bit at the beginning as it takes time for the actor host to be registered - time.Sleep(500 * time.Millisecond) - // Activate 2 actors for i := 0; i < 2; i++ { assert.EventuallyWithT(t, func(t *assert.CollectT) { From ffa00b866f50e73fafb944448fb8bcc95a0ecd5e Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Tue, 31 Oct 2023 16:02:48 +0000 Subject: [PATCH 09/10] Address review feedback Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- pkg/actors/placement/placement.go | 11 ++++- .../actors/healthz/deactivate-on-unhealthy.go | 41 +++++++------------ 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/pkg/actors/placement/placement.go b/pkg/actors/placement/placement.go index d056d3889fa..0c60aa68ea9 100644 --- a/pkg/actors/placement/placement.go +++ b/pkg/actors/placement/placement.go @@ -315,7 +315,10 @@ func (p *actorPlacement) WaitUntilReady(ctx context.Context) error { select { case p.unblockSignal <- struct{}{}: - <-p.unblockSignal + select { + case <-p.unblockSignal: + default: + } // continue case <-ctx.Done(): return ctx.Err() @@ -425,7 +428,11 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b p.resetPlacementTables() // Sleep with an exponential backoff - time.Sleep(bo.NextBackOff()) + select { + case <-time.After(bo.NextBackOff()): + case <-ctx.Done(): + return false + } continue } diff --git a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go index 52c6c80361a..86ccededbb0 100644 --- a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go +++ b/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go @@ -20,14 +20,11 @@ import ( "log" "net/http" "strconv" - "sync/atomic" "testing" "time" chi "github.com/go-chi/chi/v5" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" "github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework/process/daprd" @@ -45,13 +42,11 @@ func init() { type deactivateOnUnhealthy struct { daprd *daprd.Daprd place *placement.Placement - isHealthy atomic.Bool invokedActorsCh chan string deactivatedActorsCh chan string } func (h *deactivateOnUnhealthy) Setup(t *testing.T) []framework.Option { - h.isHealthy.Store(true) h.invokedActorsCh = make(chan string, 2) h.deactivatedActorsCh = make(chan string, 2) @@ -60,14 +55,7 @@ func (h *deactivateOnUnhealthy) Setup(t *testing.T) []framework.Option { w.Write([]byte(`{"entities": ["myactortype"]}`)) }) r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { - if h.isHealthy.Load() { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - }) - r.Get("/", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`OK`)) + w.WriteHeader(http.StatusOK) }) r.Delete("/actors/{actorType}/{actorId}", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -130,26 +118,25 @@ func (h *deactivateOnUnhealthy) Run(t *testing.T, ctx context.Context) { // Validate invocations invoked := make([]string, 2) for i := 0; i < 2; i++ { - invoked[i] = <-h.invokedActorsCh + select { + case invoked[i] = <-h.invokedActorsCh: + case <-time.After(time.Second * 5): + assert.Fail(t, "failed to invoke actor in time") + } } - slices.Sort(invoked) - assert.Equal(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, invoked) + assert.ElementsMatch(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, invoked) // Terminate the placement process to simulate a failure h.place.Cleanup(t) // Ensure actors get deactivated - deactivated := make([]string, 0, 2) - require.Eventually(t, func() bool { + deactivated := make([]string, 2) + for i := range deactivated { select { - case act := <-h.deactivatedActorsCh: - deactivated = append(deactivated, act) - case <-time.After(50 * time.Millisecond): - return false + case deactivated[i] = <-h.deactivatedActorsCh: + case <-time.After(5 * time.Second): + assert.Fail(t, "Did not receive deactivation signal in time") } - - return len(deactivated) == 2 - }, 5*time.Second, 50*time.Millisecond, "Did not receive 2 deactivation signals") - slices.Sort(deactivated) - assert.Equal(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, deactivated) + } + assert.ElementsMatch(t, []string{"myactortype/myactor0", "myactortype/myactor1"}, deactivated) } From 927ab4bc9d83b734663ac2178f86cd13547f3106 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Mon, 6 Nov 2023 17:06:33 +0000 Subject: [PATCH 10/10] Renamed test Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- ...on-unhealthy.go => deactivate-on-placement-fail.go} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename tests/integration/suite/actors/healthz/{deactivate-on-unhealthy.go => deactivate-on-placement-fail.go} (92%) diff --git a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go b/tests/integration/suite/actors/healthz/deactivate-on-placement-fail.go similarity index 92% rename from tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go rename to tests/integration/suite/actors/healthz/deactivate-on-placement-fail.go index 86ccededbb0..71551412a25 100644 --- a/tests/integration/suite/actors/healthz/deactivate-on-unhealthy.go +++ b/tests/integration/suite/actors/healthz/deactivate-on-placement-fail.go @@ -35,18 +35,18 @@ import ( ) func init() { - suite.Register(new(deactivateOnUnhealthy)) + suite.Register(new(deactivateOnPlacementFail)) } -// deactivateOnUnhealthy ensures that all active actors are deactivated if the app is reported as unhealthy. -type deactivateOnUnhealthy struct { +// deactivateOnPlacementFail ensures that all active actors are deactivated if the connection with Placement fails. +type deactivateOnPlacementFail struct { daprd *daprd.Daprd place *placement.Placement invokedActorsCh chan string deactivatedActorsCh chan string } -func (h *deactivateOnUnhealthy) Setup(t *testing.T) []framework.Option { +func (h *deactivateOnPlacementFail) Setup(t *testing.T) []framework.Option { h.invokedActorsCh = make(chan string, 2) h.deactivatedActorsCh = make(chan string, 2) @@ -94,7 +94,7 @@ spec: } } -func (h *deactivateOnUnhealthy) Run(t *testing.T, ctx context.Context) { +func (h *deactivateOnPlacementFail) Run(t *testing.T, ctx context.Context) { h.place.WaitUntilRunning(t, ctx) h.daprd.WaitUntilRunning(t, ctx)