/
message.go
41 lines (32 loc) · 1.03 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package nats
import (
"bytes"
"context"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/nats-io/nats.go"
)
// Message implements binding.Message by wrapping an *nats.Msg.
// This message *can* be read several times safely
type Message struct {
Msg *nats.Msg
encoding binding.Encoding
}
// NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
func NewMessage(msg *nats.Msg) *Message {
return &Message{Msg: msg, encoding: binding.EncodingStructured}
}
var _ binding.Message = (*Message)(nil)
func (m *Message) ReadEncoding() binding.Encoding {
return m.encoding
}
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data))
}
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
return binding.ErrNotBinary
}
func (m *Message) Finish(err error) error {
return nil
}