forked from ThreeDotsLabs/watermill-jetstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
marshaler.go
156 lines (118 loc) · 3.98 KB
/
marshaler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package jetstream
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)
type Marshaler interface {
// Marshal transforms a watermill message into binary format.
Marshal(topic string, msg *message.Message) (*nats.Msg, error)
}
type Unmarshaler interface {
// Unmarshal extracts a watermill message from a nats message.
Unmarshal(*nats.Msg) (*message.Message, error)
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
func defaultNatsMsg(topic string, uuid string, data []byte, hdr nats.Header) *nats.Msg {
return &nats.Msg{
Subject: subject(topic, uuid),
Data: data,
Header: hdr,
}
}
func subject(topic string, uuid string) string {
return fmt.Sprintf("%s.%s", topic, uuid)
}
// GobMarshaler is marshaller which is using Gob to marshal Watermill messages.
type GobMarshaler struct{}
// Marshal transforms a watermill message into gob format.
func (GobMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error) {
buf := new(bytes.Buffer)
encoder := gob.NewEncoder(buf)
if err := encoder.Encode(msg); err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
return defaultNatsMsg(topic, msg.UUID, buf.Bytes(), nil), nil
}
// Unmarshal extracts a watermill message from a nats message.
func (GobMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error) {
buf := new(bytes.Buffer)
_, err := buf.Write(natsMsg.Data)
if err != nil {
return nil, errors.Wrap(err, "cannot write nats message data to buffer")
}
decoder := gob.NewDecoder(buf)
var decodedMsg message.Message
if err := decoder.Decode(&decodedMsg); err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// creating clean message, to avoid invalid internal state with ack
msg := message.NewMessage(decodedMsg.UUID, decodedMsg.Payload)
msg.Metadata = decodedMsg.Metadata
return msg, nil
}
// JSONMarshaler uses encoding/json to marshal Watermill messages.
type JSONMarshaler struct{}
// Marshal transforms a watermill message into JSON format.
func (JSONMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error) {
bytes, err := json.Marshal(msg)
if err != nil {
return nil, errors.Wrap(err, "cannot encode message")
}
return defaultNatsMsg(topic, msg.UUID, bytes, nil), nil
}
// Unmarshal extracts a watermill message from a nats message.
func (JSONMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error) {
var decodedMsg message.Message
err := json.Unmarshal(natsMsg.Data, &decodedMsg)
if err != nil {
return nil, errors.Wrap(err, "cannot decode message")
}
// creating clean message, to avoid invalid internal state with ack
msg := message.NewMessage(decodedMsg.UUID, decodedMsg.Payload)
msg.Metadata = decodedMsg.Metadata
return msg, nil
}
// NATSMarshaler uses NATS header to marshal directly between watermill and NATS formats.
// The watermill UUID is stored at _watermill_message_uuid
type NATSMarshaler struct{}
// can't use nats.MsgIdHeader because that opts into deduplication
const watermillUUID = "_watermill_message_uuid"
// Marshal transforms a watermill message into JSON format.
func (*NATSMarshaler) Marshal(topic string, msg *message.Message) (*nats.Msg, error) {
header := make(nats.Header)
header.Set(watermillUUID, msg.UUID)
for k, v := range msg.Metadata {
header.Set(k, v)
}
data := msg.Payload
id := msg.UUID
return defaultNatsMsg(topic, id, data, header), nil
}
// Unmarshal extracts a watermill message from a nats message.
func (*NATSMarshaler) Unmarshal(natsMsg *nats.Msg) (*message.Message, error) {
data := natsMsg.Data
hdr := natsMsg.Header
id := hdr.Get(watermillUUID)
md := make(message.Metadata)
for k, v := range hdr {
if k == watermillUUID {
continue
}
if len(v) == 1 {
md.Set(k, v[0])
} else {
return nil, errors.Errorf("multiple values received in NATS header for %q: (%+v)", k, v)
}
}
msg := message.NewMessage(id, data)
msg.Metadata = md
return msg, nil
}