From a10d5415f2e211c0a8433b248526a1a90f0689e8 Mon Sep 17 00:00:00 2001 From: stephen-totty-hpe Date: Fri, 28 Jul 2023 11:19:16 -0400 Subject: [PATCH] issue 814 - Add binary content mode for NATS and JetStream protocols Signed-off-by: stephen-totty-hpe --- protocol/nats_jetstream/v2/message.go | 98 ++++++++++++- protocol/nats_jetstream/v2/message_test.go | 54 +++++-- protocol/nats_jetstream/v2/options.go | 14 ++ protocol/nats_jetstream/v2/protocol.go | 13 +- protocol/nats_jetstream/v2/sender.go | 12 +- protocol/nats_jetstream/v2/write_message.go | 63 +++++++- test/integration/go.mod | 3 + test/integration/go.sum | 1 + test/integration/nats_jetstream/nats_test.go | 147 +++++++++++++++++++ 9 files changed, 379 insertions(+), 26 deletions(-) create mode 100644 test/integration/nats_jetstream/nats_test.go diff --git a/protocol/nats_jetstream/v2/message.go b/protocol/nats_jetstream/v2/message.go index 65a59288f..5b98beb80 100644 --- a/protocol/nats_jetstream/v2/message.go +++ b/protocol/nats_jetstream/v2/message.go @@ -8,13 +8,24 @@ package nats_jetstream import ( "bytes" "context" + "fmt" + "strings" "github.com/nats-io/nats.go" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" ) +const ( + // see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md + prefix = "ce-" + contentTypeHeader = "content-type" +) + +var specs = spec.WithPrefix(prefix) + // Message implements binding.Message by wrapping an *nats.Msg. // This message *can* be read several times safely type Message struct { @@ -24,8 +35,15 @@ type Message struct { // NewMessage wraps an *nats.Msg in a binding.Message. // The returned message *can* be read several times safely +// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. func NewMessage(msg *nats.Msg) *Message { - return &Message{Msg: msg, encoding: binding.EncodingStructured} + encoding := binding.EncodingStructured + if msg.Header != nil { + if msg.Header.Get(specs.PrefixedSpecVersionName()) != "" { + encoding = binding.EncodingBinary + } + } + return &Message{Msg: msg, encoding: encoding} } var _ binding.Message = (*Message)(nil) @@ -37,15 +55,91 @@ func (m *Message) ReadEncoding() binding.Encoding { // ReadStructured transfers a structured-mode event to a StructuredWriter. func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.encoding != binding.EncodingStructured { + return binding.ErrNotStructured + } return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data)) } // ReadBinary transfers a binary-mode event to an BinaryWriter. func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { - return binding.ErrNotBinary + if m.encoding != binding.EncodingBinary { + return binding.ErrNotBinary + } + + version := m.GetVersion() + if version == nil { + return binding.ErrNotBinary + } + + var err error + for k, v := range m.Msg.Header { + headerValue := v[0] + if strings.HasPrefix(k, prefix) { + attr := version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, headerValue) + } else { + err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) + } + } else if k == contentTypeHeader { + err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) + } + if err != nil { + return err + } + } + + if m.Msg.Data != nil { + err = encoder.SetData(bytes.NewBuffer(m.Msg.Data)) + } + + return err } // Finish *must* be called when message from a Receiver can be forgotten by the receiver. func (m *Message) Finish(err error) error { return nil } + +// GetAttribute implements binding.MessageMetadataReader +func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { + key := withPrefix(attributeKind.String()) + if m.Msg.Header != nil { + headerValue := m.Msg.Header.Get(key) + if headerValue != "" { + version := m.GetVersion() + return version.Attribute(key), headerValue + } + } + return nil, nil +} + +// GetExtension implements binding.MessageMetadataReader +func (m *Message) GetExtension(name string) interface{} { + key := withPrefix(name) + if m.Msg.Header != nil { + headerValue := m.Msg.Header.Get(key) + if headerValue != "" { + return headerValue + } + } + return nil +} + +// GetVersion looks for specVersion header and returns a Version object +func (m *Message) GetVersion() spec.Version { + if m.Msg.Header == nil { + return nil + } + versionValue := m.Msg.Header.Get(specs.PrefixedSpecVersionName()) + if versionValue == "" { + return nil + } + return specs.Version(versionValue) +} + +// withPrefix prepends the prefix to the attribute name +func withPrefix(attributeName string) string { + return fmt.Sprintf("%s%s", prefix, attributeName) +} diff --git a/protocol/nats_jetstream/v2/message_test.go b/protocol/nats_jetstream/v2/message_test.go index 9e8219cd6..1c69bb7e4 100644 --- a/protocol/nats_jetstream/v2/message_test.go +++ b/protocol/nats_jetstream/v2/message_test.go @@ -3,16 +3,21 @@ package nats_jetstream import ( "context" "encoding/json" - bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" "testing" + "github.com/cloudevents/sdk-go/v2/binding/spec" + bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" "github.com/nats-io/nats.go" ) var ( - outBinaryMessage = bindingtest.MockBinaryMessage{} + outBinaryMessage = bindingtest.MockBinaryMessage{ + Metadata: map[spec.Attribute]interface{}{}, + Extensions: map[string]interface{}{}, + } outStructMessage = bindingtest.MockStructuredMessage{} testEvent = test.FullEvent() @@ -31,18 +36,44 @@ var ( Subject: "hello", Data: binaryData, } + binaryConsumerMessage = &nats.Msg{ + Subject: "hello", + Data: testEvent.Data(), + Header: nats.Header{ + "ce-type": {testEvent.Type()}, + "ce-source": {testEvent.Source()}, + "ce-id": {testEvent.ID()}, + "ce-time": {test.Timestamp.String()}, + "ce-specversion": {"1.0"}, + "ce-dataschema": {test.Schema.String()}, + "ce-datacontenttype": {"text/json"}, + "ce-subject": {"receiverTopic"}, + "ce-exta": {"someext"}, + }, + } ) func TestNewMessage(t *testing.T) { tests := []struct { - name string - consumerMessage *nats.Msg - expectedEncoding binding.Encoding + name string + consumerMessage *nats.Msg + expectedEncoding binding.Encoding + expectedStructuredError error + expectedBinaryError error }{ { - name: "Structured encoding", - consumerMessage: structuredConsumerMessage, - expectedEncoding: binding.EncodingStructured, + name: "Structured encoding", + consumerMessage: structuredConsumerMessage, + expectedEncoding: binding.EncodingStructured, + expectedStructuredError: nil, + expectedBinaryError: binding.ErrNotBinary, + }, + { + name: "Binary encoding", + consumerMessage: binaryConsumerMessage, + expectedEncoding: binding.EncodingBinary, + expectedStructuredError: binding.ErrNotStructured, + expectedBinaryError: nil, }, } for _, tt := range tests { @@ -52,17 +83,16 @@ func TestNewMessage(t *testing.T) { t.Errorf("Error in NewMessage!") } err := got.ReadBinary(context.TODO(), &outBinaryMessage) - if err == nil { - t.Errorf("Response in ReadBinary should err") + if err != tt.expectedBinaryError { + t.Errorf("ReadBinary err:%s", err.Error()) } err = got.ReadStructured(context.TODO(), &outStructMessage) - if err != nil { + if err != tt.expectedStructuredError { t.Errorf("ReadStructured err:%s", err.Error()) } if got.ReadEncoding() != tt.expectedEncoding { t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) } - }) } } diff --git a/protocol/nats_jetstream/v2/options.go b/protocol/nats_jetstream/v2/options.go index 92d2d866d..f86642334 100644 --- a/protocol/nats_jetstream/v2/options.go +++ b/protocol/nats_jetstream/v2/options.go @@ -22,6 +22,20 @@ func NatsOptions(opts ...nats.Option) []nats.Option { // ProtocolOption is the function signature required to be considered an nats.ProtocolOption. type ProtocolOption func(*Protocol) error +func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption { + return func(p *Protocol) error { + p.consumerOptions = opts + return nil + } +} + +func WithSenderOptions(opts ...SenderOption) ProtocolOption { + return func(p *Protocol) error { + p.senderOptions = opts + return nil + } +} + type SenderOption func(*Sender) error type ConsumerOption func(*Consumer) error diff --git a/protocol/nats_jetstream/v2/protocol.go b/protocol/nats_jetstream/v2/protocol.go index 99367991c..fc8be507e 100644 --- a/protocol/nats_jetstream/v2/protocol.go +++ b/protocol/nats_jetstream/v2/protocol.go @@ -7,6 +7,7 @@ package nats_jetstream import ( "context" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/protocol" @@ -18,11 +19,11 @@ import ( type Protocol struct { Conn *nats.Conn - Consumer *Consumer - //consumerOptions []ConsumerOption + Consumer *Consumer + consumerOptions []ConsumerOption - Sender *Sender - //senderOptions []SenderOption + Sender *Sender + senderOptions []SenderOption connOwned bool // whether this protocol created the nats connection } @@ -55,11 +56,11 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st return nil, err } - if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts); err != nil { + if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts, p.consumerOptions...); err != nil { return nil, err } - if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts); err != nil { + if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts, p.senderOptions...); err != nil { return nil, err } diff --git a/protocol/nats_jetstream/v2/sender.go b/protocol/nats_jetstream/v2/sender.go index a9d479c29..ed63a3e57 100644 --- a/protocol/nats_jetstream/v2/sender.go +++ b/protocol/nats_jetstream/v2/sender.go @@ -91,10 +91,18 @@ func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...b }() writer := new(bytes.Buffer) - if err = WriteMsg(ctx, in, writer, transformers...); err != nil { + header, err := WriteMsg(ctx, in, writer, transformers...) + if err != nil { return err } - _, err = s.Jsm.Publish(s.Subject, writer.Bytes()) + + natsMsg := &nats.Msg{ + Subject: s.Subject, + Data: writer.Bytes(), + Header: header, + } + + _, err = s.Jsm.PublishMsg(natsMsg) return err } diff --git a/protocol/nats_jetstream/v2/write_message.go b/protocol/nats_jetstream/v2/write_message.go index 15b7f3709..2fe3c598c 100644 --- a/protocol/nats_jetstream/v2/write_message.go +++ b/protocol/nats_jetstream/v2/write_message.go @@ -7,24 +7,33 @@ package nats_jetstream import ( "context" + "fmt" + "io" + "time" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" - "io" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/nats-io/nats.go" ) // WriteMsg fills the provided writer with the bindings.Message m. // Using context you can tweak the encoding processing (more details on binding.Write documentation). -func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) error { +// The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object. +func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error) { structuredWriter := &natsMessageWriter{writer} + binaryWriter := &natsBinaryMessageWriter{ReaderFrom: writer} _, err := binding.Write( ctx, m, structuredWriter, - nil, + binaryWriter, transformers..., ) - return err + natsHeader := binaryWriter.header + + return natsHeader, err } type natsMessageWriter struct { @@ -41,3 +50,49 @@ func (w *natsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Forma } var _ binding.StructuredWriter = (*natsMessageWriter)(nil) // Test it conforms to the interface + +type natsBinaryMessageWriter struct { + io.ReaderFrom + header nats.Header +} + +// SetAttribute implements MessageMetadataWriter.SetAttribute +func (w *natsBinaryMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + prefixedName := withPrefix(attribute.Name()) + convertedValue := fmt.Sprint(value) + switch attribute.Kind().String() { + case spec.Time.String(): + timeValue := value.(time.Time) + convertedValue = timeValue.Format(time.RFC3339Nano) + } + w.header.Set(prefixedName, convertedValue) + return nil +} + +// SetExtension implements MessageMetadataWriter.SetExtension +func (w *natsBinaryMessageWriter) SetExtension(name string, value interface{}) error { + prefixedName := withPrefix(name) + convertedValue := fmt.Sprint(value) + w.header.Set(prefixedName, convertedValue) + return nil +} + +// Start implements BinaryWriter.Start +func (w *natsBinaryMessageWriter) Start(ctx context.Context) error { + w.header = nats.Header{} + return nil +} + +// SetData implements BinaryWriter.SetData +func (w *natsBinaryMessageWriter) SetData(data io.Reader) error { + if _, err := w.ReadFrom(data); err != nil { + return err + } + + return nil +} + +// End implements BinaryWriter.End +func (w *natsBinaryMessageWriter) End(ctx context.Context) error { + return nil +} diff --git a/test/integration/go.mod b/test/integration/go.mod index 6eea466b4..e29dfb2df 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -12,6 +12,8 @@ replace github.com/cloudevents/sdk-go/protocol/stan/v2 => ../../protocol/stan/v2 replace github.com/cloudevents/sdk-go/protocol/nats/v2 => ../../protocol/nats/v2 +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 => ../../protocol/nats_jetstream/v2 + replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol/kafka_sarama/v2 replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2 @@ -23,6 +25,7 @@ require ( github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0 + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.5.0 github.com/eclipse/paho.golang v0.11.0 diff --git a/test/integration/go.sum b/test/integration/go.sum index f3de066f2..ddb8ddd8a 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -663,6 +663,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/test/integration/nats_jetstream/nats_test.go b/test/integration/nats_jetstream/nats_test.go new file mode 100644 index 000000000..ca4d27c5d --- /dev/null +++ b/test/integration/nats_jetstream/nats_test.go @@ -0,0 +1,147 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "os" + "testing" + + "github.com/nats-io/nats.go" + + ce_nats "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" + bindings "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/test" + . "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + . "github.com/cloudevents/sdk-go/v2/binding/test" +) + +func TestSendReceiveStructuredAndBinary(t *testing.T) { + t.Skip("Need a running NATS server with the -js option. Turning on -js will break non jetstream tests.") + conn := testConn(t) + defer conn.Close() + + type args struct { + opts []ce_nats.ProtocolOption + bindingEncoding binding.Encoding + } + tests := []struct { + name string + args args + }{ + { + name: "regular subscriber - structured", + args: args{ + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "queue subscriber - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithQueueSubscriber(uuid.New().String()), + ), + }, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "regular subscriber - binary", + args: args{ + bindingEncoding: binding.EncodingBinary, + }, + }, { + name: "queue subscriber - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerOptions( + ce_nats.WithQueueSubscriber(uuid.New().String()), + ), + }, + bindingEncoding: binding.EncodingBinary, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanup, s, r := testProtocol(t, conn, tt.args.opts...) + defer cleanup() + EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) { + eventIn = ConvertEventExtensionsToString(t, eventIn) + + var in binding.Message + switch tt.args.bindingEncoding { + case binding.EncodingStructured: + in = MustCreateMockStructuredMessage(t, eventIn) + case binding.EncodingBinary: + in = MustCreateMockBinaryMessage(eventIn) + } + + test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), tt.args.bindingEncoding), in, s, r, func(out binding.Message) { + eventOut := MustToEvent(t, context.Background(), out) + assert.Equal(t, tt.args.bindingEncoding, out.ReadEncoding()) + AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut)) + }) + }) + }) + } +} + +func testConn(t testing.TB) *nats.Conn { + t.Helper() + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4222" + } + + conn, err := nats.Connect(s) + if err != nil { + t.Skipf("Cannot create STAN client to NATS server [%s]: %v", s, err) + } + + return conn +} + +func testProtocol(t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, + bindings.Receiver) { + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4222" + } + + stream := "test-ce-client-" + uuid.New().String() + subject := stream + ".test" + + // use NewProtocol rather than individual Consumer and Sender since this gives us more coverage + p, err := ce_nats.NewProtocol(s, stream, subject, subject, ce_nats.NatsOptions(), []nats.JSOpt{}, []nats.SubOpt{}, opts...) + require.NoError(t, err) + + go func() { + require.NoError(t, p.OpenInbound(context.TODO())) + }() + + return func() { + err = p.Close(context.TODO()) + require.NoError(t, err) + }, p.Sender, p.Consumer +} + +func BenchmarkSendReceive(b *testing.B) { + conn := testConn(b) + defer conn.Close() + c, s, r := testProtocol(b, conn) + defer c() // Cleanup + test.BenchmarkSendReceive(b, s, r) +}