Skip to content

Commit

Permalink
Add a NATS error handler option when connecting to NATS
Browse files Browse the repository at this point in the history
When a NATS error occurs, it was not logged using the logrus formatter.
This commits fixes it.
  • Loading branch information
albinou committed Jan 19, 2023
1 parent f5a98ba commit 45f785a
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 0 deletions.
37 changes: 37 additions & 0 deletions internal/impl/nats/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package nats

import (
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/public/service"
"github.com/nats-io/nats.go"
)

func errorHandlerOption(logger *service.Logger) nats.Option {
return nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
if nc != nil {
logger = logger.With("connection-status", nc.Status())
}
if sub != nil {
logger = logger.With("subject", sub.Subject)
if c, ok := sub.ConsumerInfo(); ok != nil {
logger = logger.With("consumer", c.Name)
}
}
logger.Errorf("nats operation failed: %v\n", err)
})
}

func errorHandlerOptionFromModularLogger(logger log.Modular) nats.Option {
return nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
if nc != nil {
logger = logger.With("connection-status", nc.Status())
}
if sub != nil {
logger = logger.With("subject", sub.Subject)
if c, ok := sub.ConsumerInfo(); ok != nil {
logger = logger.With("consumer", c.Name)
}
}
logger.Errorf("nats operation failed: %v\n", err)
})
}
1 change: 1 addition & 0 deletions internal/impl/nats/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (n *natsReader) Connect(ctx context.Context) error {
}

opts = append(opts, authConfToOptions(n.authConf, n.fs)...)
opts = append(opts, errorHandlerOption(n.log))

if natsConn, err = nats.Connect(n.urls, opts...); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (j *jetStreamReader) Connect(ctx context.Context) error {
opts = append(opts, nats.Secure(j.tlsConf))
}
opts = append(opts, authConfToOptions(j.authConf, j.fs)...)
opts = append(opts, errorHandlerOption(j.log))
if natsConn, err = nats.Connect(j.urls, opts...); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions internal/impl/nats/input_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (n *natsStreamReader) Connect(ctx context.Context) error {
}

opts = append(opts, authConfToOptions(n.conf.Auth, n.fs)...)
opts = append(opts, errorHandlerOptionFromModularLogger(n.log))

natsConn, err := nats.Connect(n.urls, opts...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/impl/nats/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (n *natsWriter) Connect(ctx context.Context) error {
}

opts = append(opts, authConfToOptions(n.authConf, n.fs)...)
opts = append(opts, errorHandlerOption(n.log))

if n.natsConn, err = nats.Connect(n.urls, opts...); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions internal/impl/nats/output_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (j *jetStreamOutput) Connect(ctx context.Context) error {
opts = append(opts, nats.Secure(j.tlsConf))
}
opts = append(opts, authConfToOptions(j.authConf, j.fs)...)
opts = append(opts, errorHandlerOption(j.log))
if natsConn, err = nats.Connect(j.urls, opts...); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions internal/impl/nats/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (n *natsStreamWriter) Connect(ctx context.Context) error {
}

opts = append(opts, authConfToOptions(n.conf.Auth, n.fs)...)
opts = append(opts, errorHandlerOptionFromModularLogger(n.log))

natsConn, err := nats.Connect(n.urls, opts...)
if err != nil {
Expand Down

0 comments on commit 45f785a

Please sign in to comment.