Skip to content

Commit

Permalink
Shutting down input components before the grace period (dapr#4624)
Browse files Browse the repository at this point in the history
* Updated pinned components-contrib

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Stop pubsub subscriptions before shutdown grace period

With dapr/components-contrib#1756, it's now possible to stop receiving messages from pubsub components via context cancelation. This makes it so we won't receive more work while the sidecar is shutting down, but we can still publish messages in pubsub.

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed Jun 8, 2022
1 parent ed85ebe commit 4b631c0
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 33 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/PuerkitoBio/purell v1.1.1
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b
github.com/cenkalti/backoff/v4 v4.1.1
github.com/dapr/components-contrib v1.7.1-0.20220601224951-6c7ff60c9ea0
github.com/dapr/components-contrib v1.7.1-0.20220602220636-704f4dd7307d
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
github.com/fasthttp/router v1.3.8
github.com/fsnotify/fsnotify v1.5.4
Expand Down Expand Up @@ -280,6 +280,7 @@ require (
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/statsd_exporter v0.22.3 // indirect
github.com/rabbitmq/amqp091-go v1.3.4 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/zerolog v1.25.0 // indirect
Expand All @@ -298,7 +299,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/supplyon/gremcos v0.1.0 // indirect
github.com/tidwall/gjson v1.8.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/components-contrib v1.7.1-0.20220601224951-6c7ff60c9ea0 h1:jyhkdueI8zrjHAfWaZz/fwVaWPK/E9I60oEDCDWSoj0=
github.com/dapr/components-contrib v1.7.1-0.20220601224951-6c7ff60c9ea0/go.mod h1:bWlAjHY57DoQkiE2bykuvJnsXIIPXxO3+/wEuFuR5zA=
github.com/dapr/components-contrib v1.7.1-0.20220602220636-704f4dd7307d h1:tLSyRfQoUDwyX/UdTF8Am2NGInmC1x/49lpR3ivO8JA=
github.com/dapr/components-contrib v1.7.1-0.20220602220636-704f4dd7307d/go.mod h1:PH0Uf+OklSXofQafXRS2ReDUcxLBxAgDDwYVFosJLi4=
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 h1:M0dWIG8kUxEFU57IqTWeqptNqlBsfosFgsA5Ov7rJ8g=
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233/go.mod h1:y8r0VqUNKyd6xBXp7gQjwA59wlCLGfKzL5J8iJsN09w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -1332,6 +1332,8 @@ github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3M
github.com/prometheus/statsd_exporter v0.22.3 h1:4gxpAtnt/py8g0kUHad9f3DbU7flDeL9IQZfbbSt7h8=
github.com/prometheus/statsd_exporter v0.22.3/go.mod h1:N4Z1+iSqc9rnxlT1N8Qn3l65Vzb5t4Uq0jpg8nxyhio=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU=
github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
Expand Down Expand Up @@ -1444,8 +1446,6 @@ github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ai
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
46 changes: 27 additions & 19 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (a *DaprRuntime) sendToDeadLetterIfConfigured(name string, msg *pubsub.NewM
return true, err
}

func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
func (a *DaprRuntime) beginPubSub(subscribeCtx context.Context, name string, ps pubsub.PubSub) error {
var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
switch a.runtimeConfig.ApplicationProtocol {
case HTTPProtocol:
Expand All @@ -596,7 +596,7 @@ func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {

routeMetadata := route.metadata
routeRules := route.rules
if err := ps.Subscribe(pubsub.SubscribeRequest{
if err := ps.Subscribe(subscribeCtx, pubsub.SubscribeRequest{
Topic: topic,
Metadata: route.metadata,
}, func(ctx context.Context, msg *pubsub.NewMessage) error {
Expand Down Expand Up @@ -2088,50 +2088,55 @@ func (a *DaprRuntime) stopActor() {
}
}

// shutdownComponents allows for a graceful shutdown of all runtime internal operations or components.
func (a *DaprRuntime) shutdownComponents() error {
log.Info("Shutting down all components")
// shutdownOutputComponents allows for a graceful shutdown of all runtime internal operations of components that are not source of more work.
// These are all components except input bindings and pubsub.
func (a *DaprRuntime) shutdownOutputComponents() error {
log.Info("Shutting down all remaining components")
var merr error

// Close components if they implement `io.Closer`
for name, binding := range a.inputBindings {
for name, binding := range a.outputBindings {
if closer, ok := binding.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing input binding %s: %w", name, err)
err = fmt.Errorf("error closing output binding %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
for name, binding := range a.outputBindings {
if closer, ok := binding.(io.Closer); ok {
for name, secretstore := range a.secretStores {
if closer, ok := secretstore.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing output binding %s: %w", name, err)
err = fmt.Errorf("error closing secret store %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
for name, secretstore := range a.secretStores {
if closer, ok := secretstore.(io.Closer); ok {
for name, stateStore := range a.stateStores {
if closer, ok := stateStore.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing secret store %s: %w", name, err)
err = fmt.Errorf("error closing state store %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
}
// Close pubsub publisher
// The subscriber part is closed when a.ctx is canceled
for name, pubSub := range a.pubSubs {
if err := pubSub.Close(); err != nil {
err = fmt.Errorf("error closing pub sub %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
}
for name, stateStore := range a.stateStores {
if closer, ok := stateStore.(io.Closer); ok {
// Close bindings if they implement `io.Closer`
// TODO: Separate the input part of bindings and close output here, then close the input via cancelation of a.ctx
for name, binding := range a.inputBindings {
if closer, ok := binding.(io.Closer); ok {
if err := closer.Close(); err != nil {
err = fmt.Errorf("error closing state store %s: %w", name, err)
err = fmt.Errorf("error closing input binding %s: %w", name, err)
merr = multierror.Append(merr, err)
log.Warn(err)
}
Expand Down Expand Up @@ -2166,9 +2171,11 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) {
// Ensure the Unix socket file is removed if a panic occurs.
defer a.cleanSocket()

log.Infof("dapr shutting down.")

log.Infof("Stopping PubSub subscribers")
a.cancel()
a.stopActor()
log.Infof("dapr shutting down.")
log.Info("Stopping Dapr APIs")
for _, closer := range a.apiClosers {
if err := closer.Close(); err != nil {
Expand All @@ -2177,7 +2184,7 @@ func (a *DaprRuntime) Shutdown(duration time.Duration) {
}
log.Infof("Waiting %s to finish outstanding operations", duration)
<-time.After(duration)
a.shutdownComponents()
a.shutdownOutputComponents()
a.shutdownC <- nil
}

Expand Down Expand Up @@ -2458,8 +2465,9 @@ func componentDependency(compCategory ComponentCategory, name string) string {
}

func (a *DaprRuntime) startSubscribing() {
// PubSub subscribers are stopped via cancelation of the main runtime's context
for name, pubsub := range a.pubSubs {
if err := a.beginPubSub(name, pubsub); err != nil {
if err := a.beginPubSub(a.ctx, name, pubsub); err != nil {
log.Errorf("error occurred while beginning pubsub %s: %s", name, err)
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2884,7 +2884,7 @@ func TestPubsubWithResiliency(t *testing.T) {
},
}}

err := r.beginPubSub("failPubsub", &failingPubsub)
err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub)

assert.NoError(t, err)
assert.Equal(t, 2, failingAppChannel.Failure.CallCount["failingSubTopic"])
Expand All @@ -2906,7 +2906,7 @@ func TestPubsubWithResiliency(t *testing.T) {
}}

start := time.Now()
err := r.beginPubSub("failPubsub", &failingPubsub)
err := r.beginPubSub(context.Background(), "failPubsub", &failingPubsub)
end := time.Now()

// This is eaten, technically.
Expand Down Expand Up @@ -2944,7 +2944,7 @@ func (m *mockSubscribePubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *mockSubscribePubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *mockSubscribePubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
m.handlers[req.Topic] = handler
return nil
}
Expand Down Expand Up @@ -3590,7 +3590,7 @@ func (m *mockPublishPubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *mockPublishPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *mockPublishPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
return nil
}

Expand Down Expand Up @@ -4052,7 +4052,7 @@ func TestStopWithErrors(t *testing.T) {
require.NoError(t, rt.initSecretStore(mockSecretsComponent))
rt.nameResolver = &mockNameResolver{closeErr: testErr}

err := rt.shutdownComponents()
err := rt.shutdownOutputComponents()
assert.Error(t, err)
var merr *multierror.Error
merr, ok := err.(*multierror.Error)
Expand All @@ -4062,7 +4062,7 @@ func TestStopWithErrors(t *testing.T) {

func stopRuntime(t *testing.T, rt *DaprRuntime) {
rt.stopActor()
assert.NoError(t, rt.shutdownComponents())
assert.NoError(t, rt.shutdownOutputComponents())
}

func TestFindMatchingRoute(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/testing/pubsub_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m *MockPubSub) Publish(req *pubsub.PublishRequest) error {
}

// Subscribe is a mock subscribe method.
func (m *MockPubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (m *MockPubSub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
args := m.Called(req, handler)
return args.Error(0)
}
Expand All @@ -51,7 +51,7 @@ func (f *FailingPubsub) Publish(req *pubsub.PublishRequest) error {
return f.Failure.PerformFailure(req.Topic)
}

func (f *FailingPubsub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
func (f *FailingPubsub) Subscribe(_ context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
err := f.Failure.PerformFailure(req.Topic)
if err != nil {
return err
Expand Down

0 comments on commit 4b631c0

Please sign in to comment.