Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats Connection Pooling #2554

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ require (
github.com/gocql/gocql v1.6.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-jwt/jwt/v4 v4.5.0
github.com/google/uuid v1.6.0
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
Expand Down Expand Up @@ -139,11 +140,11 @@ require (
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.21.0
golang.org/x/crypto v0.22.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/net v0.23.0
golang.org/x/net v0.24.0
golang.org/x/oauth2 v0.17.0
golang.org/x/sync v0.6.0
golang.org/x/sync v0.7.0
golang.org/x/text v0.14.0
google.golang.org/api v0.162.0
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -247,7 +248,6 @@ require (
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/css v1.0.0 // indirect
Expand Down Expand Up @@ -326,11 +326,11 @@ require (
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1207,8 +1207,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1246,8 +1246,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -1266,8 +1266,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1327,8 +1327,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1337,8 +1337,8 @@ golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -1405,8 +1405,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY=
golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
13 changes: 10 additions & 3 deletions internal/impl/nats/cache_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/google/uuid"

"github.com/nats-io/nats.go"

"github.com/Jeffail/shutdown"
Expand Down Expand Up @@ -45,12 +47,17 @@ type kvCache struct {
connMut sync.RWMutex
natsConn *nats.Conn
kv nats.KeyValue

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newKVCache(conf *service.ParsedConfig, mgr *service.Resources) (*kvCache, error) {
p := &kvCache{
log: mgr.Logger(),
shutSig: shutdown.NewSignaller(),
pcid: uuid.New().String(),
}

var err error
Expand All @@ -71,7 +78,7 @@ func (p *kvCache) disconnect() {
defer p.connMut.Unlock()

if p.natsConn != nil {
p.natsConn.Close()
_ = pool.Release(p.pcid, p.connDetails)
p.natsConn = nil
}
p.kv = nil
Expand All @@ -86,13 +93,13 @@ func (p *kvCache) connect(ctx context.Context) error {
}

var err error
if p.natsConn, err = p.connDetails.get(ctx); err != nil {
if p.natsConn, err = pool.Get(ctx, p.pcid, p.connDetails); err != nil {
return err
}

defer func() {
if err != nil {
p.natsConn.Close()
_ = pool.Release(p.pcid, p.connDetails)
p.natsConn = nil
}
}()
Expand Down
59 changes: 33 additions & 26 deletions internal/impl/nats/connection.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nats

import (
"context"
"crypto/tls"
"strings"

Expand All @@ -19,58 +18,66 @@ func connectionHeadFields() []*service.ConfigField {
Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.").
Example([]string{"nats://127.0.0.1:4222"}).
Example([]string{"nats://username:password@127.0.0.1:4222"}),
service.NewStringField("name").
Description("An optional name to assign to the connection. If not set, will default to the label").
Default(""),
}
}

func connectionTailFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewTLSToggledField("tls"),
authFieldSpec(),
service.NewStringField("pool_key").
Description("The connection pool key to use. Components using the same poolKey will share their connection").
Default("default").
Advanced(),
}
}

type connectionDetails struct {
label string
logger *service.Logger
tlsConf *tls.Config
authConf authConfig
fs *service.FS
poolKey string
urls string
opts []nats.Option
authConf authConfig
}

func connectionDetailsFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (c connectionDetails, err error) {
c.label = mgr.Label()
c.fs = mgr.FS()
c.logger = mgr.Logger()

func connectionDetailsFromParsed(conf *service.ParsedConfig, mgr *service.Resources, extraOpts ...nats.Option) (c connectionDetails, err error) {
var urlList []string
if urlList, err = conf.FieldStringList("urls"); err != nil {
return
}
c.urls = strings.Join(urlList, ",")

if c.poolKey, err = conf.FieldString("pool_key"); err != nil {
return
}

var name string
if name, err = conf.FieldString("name"); err != nil {
return
}
if name == "" {
name = mgr.Label()
}
c.opts = append(c.opts, nats.Name(name))

var tlsEnabled bool
if c.tlsConf, tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil {
var tlsConf *tls.Config
if tlsConf, tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil {
return
}
if !tlsEnabled {
c.tlsConf = nil
if tlsEnabled && tlsConf != nil {
c.opts = append(c.opts, nats.Secure(tlsConf))
}

if c.authConf, err = AuthFromParsedConfig(conf.Namespace("auth")); err != nil {
return
}
return
}
c.opts = append(c.opts, authConfToOptions(c.authConf, mgr.FS())...)

func (c *connectionDetails) get(_ context.Context, extraOpts ...nats.Option) (*nats.Conn, error) {
var opts []nats.Option
if c.tlsConf != nil {
opts = append(opts, nats.Secure(c.tlsConf))
}
opts = append(opts, nats.Name(c.label))
opts = append(opts, errorHandlerOption(c.logger))
opts = append(opts, authConfToOptions(c.authConf, c.fs)...)
opts = append(opts, extraOpts...)
return nats.Connect(c.urls, opts...)
c.opts = append(c.opts, errorHandlerOption(mgr.Logger()))
c.opts = append(c.opts, extraOpts...)

return
}
11 changes: 9 additions & 2 deletions internal/impl/nats/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/google/uuid"

"github.com/nats-io/nats.go"

"github.com/benthosdev/benthos/v4/internal/component/input/span"
Expand Down Expand Up @@ -90,12 +92,17 @@ type natsReader struct {
natsChan chan *nats.Msg
interruptChan chan struct{}
interruptOnce sync.Once

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newNATSReader(conf *service.ParsedConfig, mgr *service.Resources) (*natsReader, error) {
n := natsReader{
log: mgr.Logger(),
interruptChan: make(chan struct{}),
pcid: uuid.New().String(),
}

var err error
Expand Down Expand Up @@ -141,7 +148,7 @@ func (n *natsReader) Connect(ctx context.Context) error {
var natsSub *nats.Subscription
var err error

if natsConn, err = n.connDetails.get(ctx); err != nil {
if natsConn, err = pool.Get(ctx, n.pcid, n.connDetails); err != nil {
return err
}

Expand Down Expand Up @@ -172,7 +179,7 @@ func (n *natsReader) disconnect() {
n.natsSub = nil
}
if n.natsConn != nil {
n.natsConn.Close()
_ = pool.Release(n.pcid, n.connDetails)
n.natsConn = nil
}
n.natsChan = nil
Expand Down
13 changes: 10 additions & 3 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type jetStreamReader struct {
natsSub *nats.Subscription

shutSig *shutdown.Signaller

// The pool caller id. This is a unique identifier we will provide when calling methods on the pool. This is used by
// the pool to do reference counting and ensure that connections are only closed when they are no longer in use.
pcid string
}

func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*jetStreamReader, error) {
Expand Down Expand Up @@ -222,12 +226,12 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
_ = natsSub.Drain()
}
if natsConn != nil {
natsConn.Close()
_ = pool.Release(j.pcid, j.connDetails)
}
}
}()

if natsConn, err = j.connDetails.get(ctx); err != nil {
if natsConn, err = pool.Get(ctx, j.pcid, j.connDetails); err != nil {
return err
}

Expand Down Expand Up @@ -303,7 +307,10 @@ func (j *jetStreamReader) disconnect() {
j.natsSub = nil
}
if j.natsConn != nil {
j.natsConn.Close()
if err := pool.Release(j.pcid, j.connDetails); err != nil {
j.log.Errorf("Failed to release NATS connection: %v", err)
}

j.natsConn = nil
}
}
Expand Down