Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue 814 - Add binary content mode for NATS and JetStream protocols #929

Merged
merged 1 commit into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ jobs:
ports:
- 4222:4222

jetstream:
image: bitnami/nats:latest
env:
NATS_EXTRA_ARGS: "--jetstream --port 4223"
ports:
- 4223:4223

amqp:
image: scholzj/qpid-dispatch
env:
Expand Down
98 changes: 96 additions & 2 deletions protocol/nats_jetstream/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ package nats_jetstream
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/nats-io/nats.go"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
)

const (
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md
prefix = "ce-"
contentTypeHeader = "content-type"
)

var specs = spec.WithPrefix(prefix)

// Message implements binding.Message by wrapping an *nats.Msg.
// This message *can* be read several times safely
type Message struct {
Expand All @@ -24,8 +35,15 @@ type Message struct {

// NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.
func NewMessage(msg *nats.Msg) *Message {
return &Message{Msg: msg, encoding: binding.EncodingStructured}
encoding := binding.EncodingStructured
embano1 marked this conversation as resolved.
Show resolved Hide resolved
stephen-totty-hpe marked this conversation as resolved.
Show resolved Hide resolved
if msg.Header != nil {
if msg.Header.Get(specs.PrefixedSpecVersionName()) != "" {
encoding = binding.EncodingBinary
}
}
return &Message{Msg: msg, encoding: encoding}
}

var _ binding.Message = (*Message)(nil)
Expand All @@ -37,15 +55,91 @@ func (m *Message) ReadEncoding() binding.Encoding {

// ReadStructured transfers a structured-mode event to a StructuredWriter.
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.encoding != binding.EncodingStructured {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data))
}

// ReadBinary transfers a binary-mode event to an BinaryWriter.
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
return binding.ErrNotBinary
if m.encoding != binding.EncodingBinary {
return binding.ErrNotBinary
}

version := m.GetVersion()
if version == nil {
return binding.ErrNotBinary
}

var err error
for k, v := range m.Msg.Header {
headerValue := v[0]
if strings.HasPrefix(k, prefix) {
attr := version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, headerValue)
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
}
} else if k == contentTypeHeader {
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
}
if err != nil {
return err
}
}

if m.Msg.Data != nil {
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data))
}

return err
}

// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (m *Message) Finish(err error) error {
return nil
}

// GetAttribute implements binding.MessageMetadataReader
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) {
key := withPrefix(attributeKind.String())
if m.Msg.Header != nil {
headerValue := m.Msg.Header.Get(key)
if headerValue != "" {
version := m.GetVersion()
return version.Attribute(key), headerValue
}
}
return nil, nil
}

// GetExtension implements binding.MessageMetadataReader
func (m *Message) GetExtension(name string) interface{} {
key := withPrefix(name)
if m.Msg.Header != nil {
headerValue := m.Msg.Header.Get(key)
if headerValue != "" {
return headerValue
}
}
return nil
}

// GetVersion looks for specVersion header and returns a Version object
func (m *Message) GetVersion() spec.Version {
if m.Msg.Header == nil {
return nil
}
versionValue := m.Msg.Header.Get(specs.PrefixedSpecVersionName())
if versionValue == "" {
return nil
}
return specs.Version(versionValue)
}

// withPrefix prepends the prefix to the attribute name
func withPrefix(attributeName string) string {
return fmt.Sprintf("%s%s", prefix, attributeName)
}
54 changes: 42 additions & 12 deletions protocol/nats_jetstream/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package nats_jetstream
import (
"context"
"encoding/json"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"
"testing"

"github.com/cloudevents/sdk-go/v2/binding/spec"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
"github.com/nats-io/nats.go"
)

var (
outBinaryMessage = bindingtest.MockBinaryMessage{}
outBinaryMessage = bindingtest.MockBinaryMessage{
Metadata: map[spec.Attribute]interface{}{},
Extensions: map[string]interface{}{},
}
outStructMessage = bindingtest.MockStructuredMessage{}

testEvent = test.FullEvent()
Expand All @@ -31,18 +36,44 @@ var (
Subject: "hello",
Data: binaryData,
}
binaryConsumerMessage = &nats.Msg{
Subject: "hello",
Data: testEvent.Data(),
Header: nats.Header{
"ce-type": {testEvent.Type()},
"ce-source": {testEvent.Source()},
"ce-id": {testEvent.ID()},
"ce-time": {test.Timestamp.String()},
"ce-specversion": {"1.0"},
"ce-dataschema": {test.Schema.String()},
"ce-datacontenttype": {"text/json"},
"ce-subject": {"receiverTopic"},
"ce-exta": {"someext"},
},
}
)

func TestNewMessage(t *testing.T) {
tests := []struct {
name string
consumerMessage *nats.Msg
expectedEncoding binding.Encoding
name string
consumerMessage *nats.Msg
expectedEncoding binding.Encoding
expectedStructuredError error
expectedBinaryError error
}{
{
name: "Structured encoding",
consumerMessage: structuredConsumerMessage,
expectedEncoding: binding.EncodingStructured,
name: "Structured encoding",
consumerMessage: structuredConsumerMessage,
expectedEncoding: binding.EncodingStructured,
expectedStructuredError: nil,
expectedBinaryError: binding.ErrNotBinary,
},
{
name: "Binary encoding",
consumerMessage: binaryConsumerMessage,
expectedEncoding: binding.EncodingBinary,
expectedStructuredError: binding.ErrNotStructured,
expectedBinaryError: nil,
},
}
for _, tt := range tests {
Expand All @@ -52,17 +83,16 @@ func TestNewMessage(t *testing.T) {
t.Errorf("Error in NewMessage!")
}
err := got.ReadBinary(context.TODO(), &outBinaryMessage)
if err == nil {
t.Errorf("Response in ReadBinary should err")
if err != tt.expectedBinaryError {
t.Errorf("ReadBinary err:%s", err.Error())
}
err = got.ReadStructured(context.TODO(), &outStructMessage)
if err != nil {
if err != tt.expectedStructuredError {
t.Errorf("ReadStructured err:%s", err.Error())
}
if got.ReadEncoding() != tt.expectedEncoding {
t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding())
}

})
}
}
14 changes: 14 additions & 0 deletions protocol/nats_jetstream/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ func NatsOptions(opts ...nats.Option) []nats.Option {
// ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
type ProtocolOption func(*Protocol) error

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption {
return func(p *Protocol) error {
p.consumerOptions = opts
return nil
}
}

func WithSenderOptions(opts ...SenderOption) ProtocolOption {
return func(p *Protocol) error {
p.senderOptions = opts
return nil
}
}

type SenderOption func(*Sender) error

type ConsumerOption func(*Consumer) error
Expand Down
13 changes: 7 additions & 6 deletions protocol/nats_jetstream/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package nats_jetstream

import (
"context"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"

Expand All @@ -18,11 +19,11 @@ import (
type Protocol struct {
Conn *nats.Conn

Consumer *Consumer
//consumerOptions []ConsumerOption
Consumer *Consumer
consumerOptions []ConsumerOption

Sender *Sender
//senderOptions []SenderOption
Sender *Sender
senderOptions []SenderOption

connOwned bool // whether this protocol created the nats connection
}
Expand Down Expand Up @@ -55,11 +56,11 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st
return nil, err
}

if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts); err != nil {
if p.Consumer, err = NewConsumerFromConn(conn, stream, receiveSubject, jsOpts, subOpts, p.consumerOptions...); err != nil {
return nil, err
}

if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts); err != nil {
if p.Sender, err = NewSenderFromConn(conn, stream, sendSubject, jsOpts, p.senderOptions...); err != nil {
return nil, err
}

Expand Down
12 changes: 10 additions & 2 deletions protocol/nats_jetstream/v2/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,18 @@ func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...b
}()

writer := new(bytes.Buffer)
if err = WriteMsg(ctx, in, writer, transformers...); err != nil {
header, err := WriteMsg(ctx, in, writer, transformers...)
if err != nil {
return err
}
_, err = s.Jsm.Publish(s.Subject, writer.Bytes())

natsMsg := &nats.Msg{
Subject: s.Subject,
Data: writer.Bytes(),
Header: header,
}

_, err = s.Jsm.PublishMsg(natsMsg)

return err
}
Expand Down
Loading
Loading