diff --git a/go.mod b/go.mod index 1595731..8ccfa4a 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/nats-io/nats.go v1.34.1 + github.com/nats-io/nats.go v1.36.0 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.32.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df diff --git a/go.sum b/go.sum index 8f2cf9f..a05e13d 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= -github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 60e81d8..fd64d93 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -31,7 +31,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.34.1 +go get github.com/nats-io/nats.go/@v1.36.0 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 @@ -474,6 +474,19 @@ resp := &response{} err := c.RequestWithContext(ctx, "foo", req, resp) ``` +## Backwards compatibility + +In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines. +However, it's important to clarify our stance on certain types of changes: + +- **Expanding structures:** +Adding new fields to structs is not considered a breaking change. + +- **Adding methods to exported interfaces:** +Extending public interfaces with new methods is also not viewed as a breaking change within the context of this project. It is important to note that no unexported methods will be added to interfaces allowing users to implement them. + +Additionally, this library always supports at least 2 latest minor Go versions. For example, if the latest Go version is 1.22, the library will support Go 1.21 and 1.22. + ## License Unless otherwise noted, the NATS source files are distributed diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 5dfd112..f5b731d 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,20 +4,20 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.6 + github.com/klauspost/compress v1.17.8 github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.10.11 + github.com/nats-io/nats-server/v2 v2.10.16 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 - golang.org/x/text v0.14.0 + golang.org/x/text v0.15.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.3 // indirect - golang.org/x/crypto v0.19.0 // indirect - golang.org/x/sys v0.17.0 // indirect + github.com/nats-io/jwt/v2 v2.5.7 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/time v0.5.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index d28f0f6..f89d755 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -10,16 +10,16 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= -github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= -github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.10.11 h1:yKUiLVincZISpo3A4YljJQ+HfLltGAgoNNJl99KL8I0= -github.com/nats-io/nats-server/v2 v2.10.11/go.mod h1:dXtOqVWzbMTEj+tUyC/itXjJhW37xh0tUBrTAlqAfx8= +github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= +github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0= +github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= @@ -31,17 +31,17 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index 2a16040..f028594 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -22,6 +22,10 @@ var ( // API errors // ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account. + // + // Note: This error will not be returned in clustered mode, even if each + // server in the cluster does not have JetStream enabled. In clustered mode, + // requests will time out instead. ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}} // ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account. @@ -120,6 +124,9 @@ var ( // ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' '). ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"} + // ErrInvalidFilterSubject is returned when the provided filter subject is invalid. + ErrInvalidFilterSubject JetStreamError = &jsError{message: "invalid filter subject"} + // ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful. ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"} diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 94fa86c..9eb5d4b 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -106,51 +106,143 @@ type JetStreamManager interface { // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. type StreamConfig struct { - Name string `json:"name"` - Description string `json:"description,omitempty"` - Subjects []string `json:"subjects,omitempty"` - Retention RetentionPolicy `json:"retention"` - MaxConsumers int `json:"max_consumers"` - MaxMsgs int64 `json:"max_msgs"` - MaxBytes int64 `json:"max_bytes"` - Discard DiscardPolicy `json:"discard"` - DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` - MaxAge time.Duration `json:"max_age"` - MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` - MaxMsgSize int32 `json:"max_msg_size,omitempty"` - Storage StorageType `json:"storage"` - Replicas int `json:"num_replicas"` - NoAck bool `json:"no_ack,omitempty"` - Template string `json:"template_owner,omitempty"` - Duplicates time.Duration `json:"duplicate_window,omitempty"` - Placement *Placement `json:"placement,omitempty"` - Mirror *StreamSource `json:"mirror,omitempty"` - Sources []*StreamSource `json:"sources,omitempty"` - Sealed bool `json:"sealed,omitempty"` - DenyDelete bool `json:"deny_delete,omitempty"` - DenyPurge bool `json:"deny_purge,omitempty"` - AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` - Compression StoreCompression `json:"compression"` - FirstSeq uint64 `json:"first_seq,omitempty"` - - // Allow applying a subject transform to incoming messages before doing anything else. + // Name is the name of the stream. It is required and must be unique + // across the JetStream account. + // + // Name Names cannot contain whitespace, ., *, >, path separators + // (forward or backwards slash), and non-printable characters. + Name string `json:"name"` + + // Description is an optional description of the stream. + Description string `json:"description,omitempty"` + + // Subjects is a list of subjects that the stream is listening on. + // Wildcards are supported. Subjects cannot be set if the stream is + // created as a mirror. + Subjects []string `json:"subjects,omitempty"` + + // Retention defines the message retention policy for the stream. + // Defaults to LimitsPolicy. + Retention RetentionPolicy `json:"retention"` + + // MaxConsumers specifies the maximum number of consumers allowed for + // the stream. + MaxConsumers int `json:"max_consumers"` + + // MaxMsgs is the maximum number of messages the stream will store. + // After reaching the limit, stream adheres to the discard policy. + // If not set, server default is -1 (unlimited). + MaxMsgs int64 `json:"max_msgs"` + + // MaxBytes is the maximum total size of messages the stream will store. + // After reaching the limit, stream adheres to the discard policy. + // If not set, server default is -1 (unlimited). + MaxBytes int64 `json:"max_bytes"` + + // Discard defines the policy for handling messages when the stream + // reaches its limits in terms of number of messages or total bytes. + Discard DiscardPolicy `json:"discard"` + + // DiscardNewPerSubject is a flag to enable discarding new messages per + // subject when limits are reached. Requires DiscardPolicy to be + // DiscardNew and the MaxMsgsPerSubject to be set. + DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"` + + // MaxAge is the maximum age of messages that the stream will retain. + MaxAge time.Duration `json:"max_age"` + + // MaxMsgsPerSubject is the maximum number of messages per subject that + // the stream will retain. + MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"` + + // MaxMsgSize is the maximum size of any single message in the stream. + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + + // Storage specifies the type of storage backend used for the stream + // (file or memory). + Storage StorageType `json:"storage"` + + // Replicas is the number of stream replicas in clustered JetStream. + // Defaults to 1, maximum is 5. + Replicas int `json:"num_replicas"` + + // NoAck is a flag to disable acknowledging messages received by this + // stream. + // + // If set to true, publish methods from the JetStream client will not + // work as expected, since they rely on acknowledgements. Core NATS + // publish methods should be used instead. Note that this will make + // message delivery less reliable. + NoAck bool `json:"no_ack,omitempty"` + + // Duplicates is the window within which to track duplicate messages. + // If not set, server default is 2 minutes. + Duplicates time.Duration `json:"duplicate_window,omitempty"` + + // Placement is used to declare where the stream should be placed via + // tags and/or an explicit cluster name. + Placement *Placement `json:"placement,omitempty"` + + // Mirror defines the configuration for mirroring another stream. + Mirror *StreamSource `json:"mirror,omitempty"` + + // Sources is a list of other streams this stream sources messages from. + Sources []*StreamSource `json:"sources,omitempty"` + + // Sealed streams do not allow messages to be published or deleted via limits or API, + // sealed streams can not be unsealed via configuration update. Can only + // be set on already created streams via the Update API. + Sealed bool `json:"sealed,omitempty"` + + // DenyDelete restricts the ability to delete messages from a stream via + // the API. Defaults to false. + DenyDelete bool `json:"deny_delete,omitempty"` + + // DenyPurge restricts the ability to purge messages from a stream via + // the API. Defaults to false. + DenyPurge bool `json:"deny_purge,omitempty"` + + // AllowRollup allows the use of the Nats-Rollup header to replace all + // contents of a stream, or subject in a stream, with a single new + // message. + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + + // Compression specifies the message storage compression algorithm. + // Defaults to NoCompression. + Compression StoreCompression `json:"compression"` + + // FirstSeq is the initial sequence number of the first message in the + // stream. + FirstSeq uint64 `json:"first_seq,omitempty"` + + // SubjectTransform allows applying a transformation to matching + // messages' subjects. SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` - // Allow republish of the message after being sequenced and stored. + // RePublish allows immediate republishing a message to the configured + // subject after it's stored. RePublish *RePublish `json:"republish,omitempty"` - // Allow higher performance, direct access to get individual messages. E.g. KeyValue + // AllowDirect enables direct access to individual messages using direct + // get API. Defaults to false. AllowDirect bool `json:"allow_direct"` - // Allow higher performance and unified direct access for mirrors as well. + + // MirrorDirect enables direct access to individual messages from the + // origin stream using direct get API. Defaults to false. MirrorDirect bool `json:"mirror_direct"` - // Limits for consumers on this stream. + // ConsumerLimits defines limits of certain values that consumers can + // set, defaults for those who don't set these settings ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"` - // Metadata is additional metadata for the Stream. - // Keys starting with `_nats` are reserved. - // NOTE: Metadata requires nats-server v2.10.0+ + // Metadata is a set of application-defined key-value pairs for + // associating metadata on the stream. This feature requires nats-server + // v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` + + // Template identifies the template that manages the Stream. DEPRECATED: + // This feature is no longer supported. + Template string `json:"template_owner,omitempty"` } // SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received. @@ -288,9 +380,13 @@ type accountInfoResponse struct { AccountInfo } -// AccountInfo retrieves info about the JetStream usage from the current account. -// If JetStream is not enabled, this will return ErrJetStreamNotEnabled -// Other errors can happen but are generally considered retryable +// AccountInfo fetches account information from the server, containing details +// about the account associated with this JetStream connection. If account is +// not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned. +// +// If the server does not have JetStream enabled, ErrJetStreamNotEnabled is +// returned (for a single server setup). For clustered topologies, AccountInfo +// will time out. func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { @@ -410,6 +506,10 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o // if filter subject is empty or ">", use the endpoint without filter subject ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName) } else { + // safeguard against passing invalid filter subject in request subject + if cfg.FilterSubject[0] == '.' || cfg.FilterSubject[len(cfg.FilterSubject)-1] == '.' { + return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, cfg.FilterSubject) + } // if filter subject is not empty, use the endpoint with filter subject ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject) } diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 0864f30..d9f40fd 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -249,22 +249,22 @@ func purge() DeleteOpt { // KeyValueConfig is for configuring a KeyValue store. type KeyValueConfig struct { - Bucket string - Description string - MaxValueSize int32 - History uint8 - TTL time.Duration - MaxBytes int64 - Storage StorageType - Replicas int - Placement *Placement - RePublish *RePublish - Mirror *StreamSource - Sources []*StreamSource + Bucket string `json:"bucket"` + Description string `json:"description,omitempty"` + MaxValueSize int32 `json:"max_value_size,omitempty"` + History uint8 `json:"history,omitempty"` + TTL time.Duration `json:"ttl,omitempty"` + MaxBytes int64 `json:"max_bytes,omitempty"` + Storage StorageType `json:"storage,omitempty"` + Replicas int `json:"num_replicas,omitempty"` + Placement *Placement `json:"placement,omitempty"` + RePublish *RePublish `json:"republish,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` // Enable underlying stream compression. // NOTE: Compression is supported for nats-server 2.10.0+ - Compression bool + Compression bool `json:"compression,omitempty"` } // Used to watch all keys. @@ -344,8 +344,9 @@ const ( // Regex for valid keys and buckets. var ( - validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`) - validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`) + validBucketRe = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`) + validKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9]+$`) + validSearchKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9*]*[>]?$`) ) // KeyValue will lookup and bind to an existing KeyValue store. @@ -353,7 +354,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) { if !js.nc.serverMinVersion(2, 6, 2) { return nil, errors.New("nats: key-value requires at least server version 2.6.2") } - if !validBucketRe.MatchString(bucket) { + if !bucketValid(bucket) { return nil, ErrInvalidBucketName } stream := fmt.Sprintf(kvBucketNameTmpl, bucket) @@ -381,7 +382,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { if cfg == nil { return nil, ErrKeyValueConfigRequired } - if !validBucketRe.MatchString(cfg.Bucket) { + if !bucketValid(cfg.Bucket) { return nil, ErrInvalidBucketName } if _, err := js.AccountInfo(); err != nil { @@ -507,7 +508,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { // DeleteKeyValue will delete this KeyValue store (JetStream stream). func (js *js) DeleteKeyValue(bucket string) error { - if !validBucketRe.MatchString(bucket) { + if !bucketValid(bucket) { return ErrInvalidBucketName } stream := fmt.Sprintf(kvBucketNameTmpl, bucket) @@ -547,6 +548,13 @@ func (e *kve) Created() time.Time { return e.created } func (e *kve) Delta() uint64 { return e.delta } func (e *kve) Operation() KeyValueOp { return e.op } +func bucketValid(bucket string) bool { + if len(bucket) == 0 { + return false + } + return validBucketRe.MatchString(bucket) +} + func keyValid(key string) bool { if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { return false @@ -554,6 +562,13 @@ func keyValid(key string) bool { return validKeyRe.MatchString(key) } +func searchKeyValid(key string) bool { + if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { + return false + } + return validSearchKeyRe.MatchString(key) +} + // Get returns the latest value for the key. func (kv *kvs) Get(key string) (KeyValueEntry, error) { e, err := kv.get(key, kvLatestRevision) @@ -951,6 +966,9 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) { // Watch will fire the callback when a key that matches the keys pattern is updated. // keys needs to be a valid NATS subject. func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { + if !searchKeyValid(keys) { + return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject") + } var o watchOpts for _, opt := range opts { if opt != nil { diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 780fd23..d019cee 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.34.1" + Version = "1.36.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -2161,6 +2161,47 @@ func (nc *Conn) waitForExits() { nc.wg.Wait() } +// ForceReconnect forces a reconnect attempt to the server. +// This is a non-blocking call and will start the reconnect +// process without waiting for it to complete. +// +// If the connection is already in the process of reconnecting, +// this call will force an immediate reconnect attempt (bypassing +// the current reconnect delay). +func (nc *Conn) ForceReconnect() error { + nc.mu.Lock() + defer nc.mu.Unlock() + + if nc.isClosed() { + return ErrConnectionClosed + } + if nc.isReconnecting() { + // if we're already reconnecting, force a reconnect attempt + // even if we're in the middle of a backoff + if nc.rqch != nil { + close(nc.rqch) + } + return nil + } + + // Clear any queued pongs + nc.clearPendingFlushCalls() + + // Clear any queued and blocking requests. + nc.clearPendingRequestCalls() + + // Stop ping timer if set. + nc.stopPingTimer() + + // Go ahead and make sure we have flushed the outbound + nc.bw.flush() + nc.conn.Close() + + nc.changeConnStatus(RECONNECTING) + go nc.doReconnect(nil, true) + return nil +} + // ConnectedUrl reports the connected server's URL func (nc *Conn) ConnectedUrl() string { if nc == nil { @@ -2420,7 +2461,7 @@ func (nc *Conn) connect() (bool, error) { nc.setup() nc.changeConnStatus(RECONNECTING) nc.bw.switchToPending() - go nc.doReconnect(ErrNoServers) + go nc.doReconnect(ErrNoServers, false) err = nil } else { nc.current = nil @@ -2720,7 +2761,7 @@ func (nc *Conn) stopPingTimer() { // Try to reconnect using the option parameters. // This function assumes we are allowed to reconnect. -func (nc *Conn) doReconnect(err error) { +func (nc *Conn) doReconnect(err error, forceReconnect bool) { // We want to make sure we have the other watchers shutdown properly // here before we proceed past this point. nc.waitForExits() @@ -2776,7 +2817,8 @@ func (nc *Conn) doReconnect(err error) { break } - doSleep := i+1 >= len(nc.srvPool) + doSleep := i+1 >= len(nc.srvPool) && !forceReconnect + forceReconnect = false nc.mu.Unlock() if !doSleep { @@ -2803,6 +2845,12 @@ func (nc *Conn) doReconnect(err error) { select { case <-rqch: rt.Stop() + + // we need to reset the rqch channel to avoid + // closing a closed channel in the next iteration + nc.mu.Lock() + nc.rqch = make(chan struct{}) + nc.mu.Unlock() case <-rt.C: } } @@ -2872,18 +2920,19 @@ func (nc *Conn) doReconnect(err error) { // Done with the pending buffer nc.bw.doneWithPending() - // This is where we are truly connected. - nc.status = CONNECTED + // Queue up the correct callback. If we are in initial connect state + // (using retry on failed connect), we will call the ConnectedCB, + // otherwise the ReconnectedCB. + if nc.Opts.ReconnectedCB != nil && !nc.initc { + nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) }) + } else if nc.Opts.ConnectedCB != nil && nc.initc { + nc.ach.push(func() { nc.Opts.ConnectedCB(nc) }) + } // If we are here with a retry on failed connect, indicate that the // initial connect is now complete. nc.initc = false - // Queue up the reconnect callback. - if nc.Opts.ReconnectedCB != nil { - nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) }) - } - // Release lock here, we will return below. nc.mu.Unlock() @@ -2926,7 +2975,7 @@ func (nc *Conn) processOpErr(err error) { // Clear any queued pongs, e.g. pending flush calls. nc.clearPendingFlushCalls() - go nc.doReconnect(err) + go nc.doReconnect(err, false) nc.mu.Unlock() return } @@ -3753,7 +3802,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) { } // Process key fetching original case. - i := bytes.IndexByte([]byte(kv), ':') + i := strings.IndexByte(kv, ':') if i < 0 { return nil, ErrBadHeaderMsg } @@ -3766,8 +3815,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) { for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') { i++ } - value := string(kv[i:]) - m[key] = append(m[key], value) + m[key] = append(m[key], kv[i:]) if err != nil { return m, err } @@ -5435,7 +5483,7 @@ func (nc *Conn) drainConnection() { // Drain will put a connection into a drain state. All subscriptions will // immediately be put into a drain state. Upon completion, the publishers // will be drained and can not publish any additional messages. Upon draining -// of the publishers, the connection will be closed. Use the ClosedCB() +// of the publishers, the connection will be closed. Use the ClosedCB // option to know when the connection has moved from draining to closed. // // See note in Subscription.Drain for JetStream subscriptions. diff --git a/vendor/modules.txt b/vendor/modules.txt index 201fe3d..8b02364 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -26,7 +26,7 @@ github.com/mattn/go-colorable # github.com/mattn/go-isatty v0.0.19 ## explicit; go 1.15 github.com/mattn/go-isatty -# github.com/nats-io/nats.go v1.34.1 +# github.com/nats-io/nats.go v1.36.0 ## explicit; go 1.20 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin