Skip to content

Commit

Permalink
Stop pubsub subscriptions before shutdown grace period
Browse files Browse the repository at this point in the history
With dapr/components-contrib#1756, it's now possible to stop receiving messages from pubsub components via context cancelation. This makes it so we won't receive more work while the sidecar is shutting down, but we can still publish messages in pubsub.

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Jun 3, 2022
1 parent 11eac1e commit 4eda0c0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
46 changes: 27 additions & 19 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (a *DaprRuntime) sendToDeadLetterIfConfigured(name string, msg *pubsub.NewM
return true, err
}

func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
func (a *DaprRuntime) beginPubSub(subscribeCtx context.Context, name string, ps pubsub.PubSub) error {
var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
switch a.runtimeConfig.ApplicationProtocol {
case HTTPProtocol:
Expand All @@ -596,7 +596,7 @@ func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {

routeMetadata := route.metadata
routeRules := route.rules
if err := ps.Subscribe(pubsub.SubscribeRequest{
if err := ps.Subscribe(subscribeCtx, pubsub.SubscribeRequest{
Topic: topic,
Metadata: route.metadata,
}, func(ctx context.Context, msg *pubsub.NewMessage) error {
Expand Down Expand Up @@ -2088,50 +2088,55 @@ func (a *DaprRuntime) stopActor() {
}
}

// shutdownComponents allows for a graceful shutdown of all runtime internal operations or components.
func (a *DaprRuntime) shutdownComponents() error {
log.Info("Shutting down all components")
// shutdownOutputComponents allows for a graceful shutdown of all runtime internal operations of components that are not source of more work.
// These are all components except input bindings and pubsub.
func (a *DaprRuntime) shutdownOutputComponents() error {
log.Info("Shutting down all remaining components")
var merr error

// Close components if they implement `io.Closer`
for name, binding := range a.inputBindings {
for name, binding := range a.outputBindings {
if closer, ok := binding.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing input binding %s: %w", name, err)
err = fmt.Errorf("error closing output binding %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
for name, binding := range a.outputBindings {
if closer, ok := binding.(io.Closer); ok {
for name, secretstore := range a.secretStores {
if closer, ok := secretstore.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing output binding %s: %w", name, err)
err = fmt.Errorf("error closing secret store %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
for name, secretstore := range a.secretStores {
if closer, ok := secretstore.(io.Closer); ok {
for name, stateStore := range a.stateStores {
if closer, ok := stateStore.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing secret store %s: %w", name, err)
err = fmt.Errorf("error closing state store %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
// Close pubsub publisher
// The subscriber part is closed when a.ctx is canceled
for name, pubSub := range a.pubSubs {
if err := pubSub.Close(); err != nil {
err = fmt.Errorf("error closing pub sub %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
for name, stateStore := range a.stateStores {
if closer, ok := stateStore.(io.Closer); ok {
// Close bindings if they implement `io.Closer`
// TODO: Separate the input part of bindings and close output here, then close the input via cancelation of a.ctx
for name, binding := range a.inputBindings {
if closer, ok := binding.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing state store %s: %w", name, err)
err = fmt.Errorf("error closing input binding %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
Expand Down Expand Up @@ -2166,9 +2171,11 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) {
// Ensure the Unix socket file is removed if a panic occurs.
defer a.cleanSocket()

log.Infof("dapr shutting down.")

log.Infof("Stopping PubSub subscribers")
a.cancel()
a.stopActor()
log.Infof("dapr shutting down.")
log.Info("Stopping Dapr APIs")
for _, closer := range a.apiClosers {
if err := closer.Close(); err != nil {
Expand All @@ -2177,7 +2184,7 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) {
}
log.Infof("Waiting %s to finish outstanding operations", duration)
<-time.After(duration)
a.shutdownComponents()
a.shutdownOutputComponents()
a.shutdownC <- nil
}

Expand Down Expand Up @@ -2458,8 +2465,9 @@ func componentDependency(compCategory ComponentCategory, name string) string {
}

func (a *DaprRuntime) startSubscribing() {
// PubSub subscribers are stopped via cancelation of the main runtime's context
for name, pubsub := range a.pubSubs {
if err := a.beginPubSub(name, pubsub); err != nil {
if err := a.beginPubSub(a.ctx, name, pubsub); err != nil {
log.Errorf("error occurred while beginning pubsub %s: %s", name, err)
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2884,7 +2884,7 @@ func TestPubsubWithResiliency(t *testing.T) {
},
}}

err := r.beginPubSub("failPubsub", &failingPubsub)
err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub)

assert.NoError(t, err)
assert.Equal(t, 2, failingAppChannel.Failure.CallCount["failingSubTopic"])
Expand All @@ -2906,7 +2906,7 @@ func TestPubsubWithResiliency(t *testing.T) {
}}

start := time.Now()
err := r.beginPubSub("failPubsub", &failingPubsub)
err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub)
end := time.Now()

// This is eaten, technically.
Expand Down Expand Up @@ -2944,7 +2944,7 @@ func (m *mockSubscribePubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *mockSubscribePubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *mockSubscribePubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
m.handlers[req.Topic] = handler
return nil
}
Expand Down Expand Up @@ -3590,7 +3590,7 @@ func (m *mockPublishPubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *mockPublishPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *mockPublishPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
return nil
}

Expand Down Expand Up @@ -4052,7 +4052,7 @@ func TestStopWithErrors(t *testing.T) {
require.NoError(t, rt.initSecretStore(mockSecretsComponent))
rt.nameResolver = &mockNameResolver{closeErr: testErr}

err := rt.shutdownComponents()
err := rt.shutdownOutputComponents()
assert.Error(t, err)
var merr *multierror.Error
merr, ok := err.(*multierror.Error)
Expand All @@ -4062,7 +4062,7 @@ func TestStopWithErrors(t *testing.T) {

func stopRuntime(t *testing.T, rt *DaprRuntime) {
rt.stopActor()
assert.NoError(t, rt.shutdownComponents())
assert.NoError(t, rt.shutdownOutputComponents())
}

func TestFindMatchingRoute(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/testing/pubsub_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m *MockPubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *MockPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *MockPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
args := m.Called(req, handler)
return args.Error(0)
}
Expand All @@ -51,7 +51,7 @@ func (f *FailingPubsub) Publish(req *pubsub.PublishRequest) error {
return f.Failure.PerformFailure(req.Topic)
}

func (f *FailingPubsub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (f *FailingPubsub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := f.Failure.PerformFailure(req.Topic)
if err != nil {
return err
Expand Down

0 comments on commit 4eda0c0

Please sign in to comment.