Skip to content

Commit

Permalink
fix: adding retry logic for Stream creation in the case that another …
Browse files Browse the repository at this point in the history
…client is trying to create the Stream at the same time (#1854)

* fix: adding retry logic for Stream creation in the case that another client is trying to create the Stream at the same time

Signed-off-by: Julie Vogelman <julie_vogelman@intuit.com>
  • Loading branch information
juliev0 committed Apr 20, 2022
1 parent c70b340 commit 4f8b17e
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions eventbus/jetstream/base/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
return nil
}
if err != nil && err != nats.ErrStreamNotFound {
stream.Logger.Errorw("Error calling StreamInfo for Stream '%s': %v", common.JetStreamStreamName, err)
stream.Logger.Warnf(`Error calling StreamInfo for Stream '%s' (this can happen if another Jetstream client "
is trying to create the Stream at the same time): %v`, common.JetStreamStreamName, err)
}

// unmarshal settings
Expand All @@ -141,11 +142,18 @@ func (stream *Jetstream) CreateStream(conn *JetstreamConnection) error {
}
stream.Logger.Infof("Will use this stream config:\n '%v'", streamConfig)

options := make([]nats.JSOpt, 0)

_, err = conn.JSContext.AddStream(&streamConfig, options...)
if err != nil {
return errors.Errorf("Failed to add Jetstream stream '%s': %v for connection %+v", common.JetStreamStreamName, err, conn)
connectErr := common.Connect(nil, func() error { // exponential backoff if it fails the first time
_, err = conn.JSContext.AddStream(&streamConfig)
if err != nil {
errStr := fmt.Sprintf(`Failed to add Jetstream stream '%s'for connection %+v: err=%v`,
common.JetStreamStreamName, conn, err)
return errors.New(errStr)
} else {
return nil
}
})
if connectErr != nil {
return connectErr
}

stream.Logger.Infof("Created Jetstream stream '%s' for connection %+v", common.JetStreamStreamName, conn)
Expand Down

0 comments on commit 4f8b17e

Please sign in to comment.