diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 78638bdc05e..2d136ad6a29 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -569,7 +569,7 @@ func (a *DaprRuntime) sendToDeadLetterIfConfigured(name string, msg *pubsub.NewM return true, err } -func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error { +func (a *DaprRuntime) beginPubSub(subscribeCtx context.Context, name string, ps pubsub.PubSub) error { var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error switch a.runtimeConfig.ApplicationProtocol { case HTTPProtocol: @@ -596,7 +596,7 @@ func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error { routeMetadata := route.metadata routeRules := route.rules - if err := ps.Subscribe(pubsub.SubscribeRequest{ + if err := ps.Subscribe(subscribeCtx, pubsub.SubscribeRequest{ Topic: topic, Metadata: route.metadata, }, func(ctx context.Context, msg *pubsub.NewMessage) error { @@ -2088,39 +2088,42 @@ func (a *DaprRuntime) stopActor() { } } -// shutdownComponents allows for a graceful shutdown of all runtime internal operations or components. -func (a *DaprRuntime) shutdownComponents() error { - log.Info("Shutting down all components") +// shutdownOutputComponents allows for a graceful shutdown of all runtime internal operations of components that are not source of more work. +// These are all components except input bindings and pubsub. +func (a *DaprRuntime) shutdownOutputComponents() error { + log.Info("Shutting down all remaining components") var merr error // Close components if they implement `io.Closer` - for name, binding := range a.inputBindings { + for name, binding := range a.outputBindings { if closer, ok := binding.(io.Closer); ok { if err := closer.Close(); err != nil { - err = fmt.Errorf("error closing input binding %s: %w", name, err) + err = fmt.Errorf("error closing output binding %s: %w", name, err) merr = multierror.Append(merr, err) log.Warn(err) } } } - for name, binding := range a.outputBindings { - if closer, ok := binding.(io.Closer); ok { + for name, secretstore := range a.secretStores { + if closer, ok := secretstore.(io.Closer); ok { if err := closer.Close(); err != nil { - err = fmt.Errorf("error closing output binding %s: %w", name, err) + err = fmt.Errorf("error closing secret store %s: %w", name, err) merr = multierror.Append(merr, err) log.Warn(err) } } } - for name, secretstore := range a.secretStores { - if closer, ok := secretstore.(io.Closer); ok { + for name, stateStore := range a.stateStores { + if closer, ok := stateStore.(io.Closer); ok { if err := closer.Close(); err != nil { - err = fmt.Errorf("error closing secret store %s: %w", name, err) + err = fmt.Errorf("error closing state store %s: %w", name, err) merr = multierror.Append(merr, err) log.Warn(err) } } } + // Close pubsub publisher + // The subscriber part is closed when a.ctx is canceled for name, pubSub := range a.pubSubs { if err := pubSub.Close(); err != nil { err = fmt.Errorf("error closing pub sub %s: %w", name, err) @@ -2128,10 +2131,12 @@ func (a *DaprRuntime) shutdownComponents() error { log.Warn(err) } } - for name, stateStore := range a.stateStores { - if closer, ok := stateStore.(io.Closer); ok { + // Close bindings if they implement `io.Closer` + // TODO: Separate the input part of bindings and close output here, then close the input via cancelation of a.ctx + for name, binding := range a.inputBindings { + if closer, ok := binding.(io.Closer); ok { if err := closer.Close(); err != nil { - err = fmt.Errorf("error closing state store %s: %w", name, err) + err = fmt.Errorf("error closing input binding %s: %w", name, err) merr = multierror.Append(merr, err) log.Warn(err) } @@ -2166,9 +2171,11 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) { // Ensure the Unix socket file is removed if a panic occurs. defer a.cleanSocket() + log.Infof("dapr shutting down.") + + log.Infof("Stopping PubSub subscribers") a.cancel() a.stopActor() - log.Infof("dapr shutting down.") log.Info("Stopping Dapr APIs") for _, closer := range a.apiClosers { if err := closer.Close(); err != nil { @@ -2177,7 +2184,7 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) { } log.Infof("Waiting %s to finish outstanding operations", duration) <-time.After(duration) - a.shutdownComponents() + a.shutdownOutputComponents() a.shutdownC <- nil } @@ -2458,8 +2465,9 @@ func componentDependency(compCategory ComponentCategory, name string) string { } func (a *DaprRuntime) startSubscribing() { + // PubSub subscribers are stopped via cancelation of the main runtime's context for name, pubsub := range a.pubSubs { - if err := a.beginPubSub(name, pubsub); err != nil { + if err := a.beginPubSub(a.ctx, name, pubsub); err != nil { log.Errorf("error occurred while beginning pubsub %s: %s", name, err) } } diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index b40c281a99e..54e1dd1b21a 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -2884,7 +2884,7 @@ func TestPubsubWithResiliency(t *testing.T) { }, }} - err := r.beginPubSub("failPubsub", &failingPubsub) + err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub) assert.NoError(t, err) assert.Equal(t, 2, failingAppChannel.Failure.CallCount["failingSubTopic"]) @@ -2906,7 +2906,7 @@ func TestPubsubWithResiliency(t *testing.T) { }} start := time.Now() - err := r.beginPubSub("failPubsub", &failingPubsub) + err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub) end := time.Now() // This is eaten, technically. @@ -2944,7 +2944,7 @@ func (m *mockSubscribePubSub) Publish(req *pubsub.PublishRequest) error { } // Subscribe is a mock subscribe method. -func (m *mockSubscribePubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { +func (m *mockSubscribePubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { m.handlers[req.Topic] = handler return nil } @@ -3590,7 +3590,7 @@ func (m *mockPublishPubSub) Publish(req *pubsub.PublishRequest) error { } // Subscribe is a mock subscribe method. -func (m *mockPublishPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { +func (m *mockPublishPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { return nil } @@ -4052,7 +4052,7 @@ func TestStopWithErrors(t *testing.T) { require.NoError(t, rt.initSecretStore(mockSecretsComponent)) rt.nameResolver = &mockNameResolver{closeErr: testErr} - err := rt.shutdownComponents() + err := rt.shutdownOutputComponents() assert.Error(t, err) var merr *multierror.Error merr, ok := err.(*multierror.Error) @@ -4062,7 +4062,7 @@ func TestStopWithErrors(t *testing.T) { func stopRuntime(t *testing.T, rt *DaprRuntime) { rt.stopActor() - assert.NoError(t, rt.shutdownComponents()) + assert.NoError(t, rt.shutdownOutputComponents()) } func TestFindMatchingRoute(t *testing.T) { diff --git a/pkg/testing/pubsub_mock.go b/pkg/testing/pubsub_mock.go index a6623485afc..ce068628796 100644 --- a/pkg/testing/pubsub_mock.go +++ b/pkg/testing/pubsub_mock.go @@ -26,7 +26,7 @@ func (m *MockPubSub) Publish(req *pubsub.PublishRequest) error { } // Subscribe is a mock subscribe method. -func (m *MockPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { +func (m *MockPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { args := m.Called(req, handler) return args.Error(0) } @@ -51,7 +51,7 @@ func (f *FailingPubsub) Publish(req *pubsub.PublishRequest) error { return f.Failure.PerformFailure(req.Topic) } -func (f *FailingPubsub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { +func (f *FailingPubsub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { err := f.Failure.PerformFailure(req.Topic) if err != nil { return err