/
msgschema.go
297 lines (251 loc) · 6.7 KB
/
msgschema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package nats
import (
"fmt"
"strings"
"time"
"github.com/goccy/go-json"
"github.com/google/uuid"
"github.com/nats-io/nats.go/jetstream"
)
const AllEventId = ">"
const HopsMessageId = "hops"
const DoneMessageId = "done"
const SourceEventId = "event"
type (
// HopsResultMeta is metadata included in the top level of a result message
HopsResultMeta struct {
Error string `json:"error,omitempty"`
FinishedAt time.Time `json:"finished_at"`
StartedAt time.Time `json:"started_at"`
}
MsgMeta struct {
AccountId string
AppName string
Channel string
ConsumerSequence uint64
Done bool // Message is a pipeline 'done' message
HandlerName string
InterestTopic string
MessageId string
NumPending uint64
SequenceId string
StreamSequence uint64
Timestamp time.Time
msg jetstream.Msg
}
// ResultMsg is the schema for handler call result messages
ResultMsg struct {
Body string `json:"body"`
Completed bool `json:"completed"`
Done bool `json:"done"`
Errored bool `json:"errored"`
Headers map[string]string `json:"headers,omitempty"`
Hops HopsResultMeta `json:"hops"`
JSON interface{} `json:"json,omitempty"`
StatusCode int `json:"status_code,omitempty"`
URL string `json:"url,omitempty"`
}
SourceMeta struct {
Source string `json:"source"`
Event string `json:"event"`
Action string `json:"action"`
Unique string `json:"unique,omitempty"`
}
)
func CreateSourceEvent(rawEvent map[string]any, source string, event string, action string, unique string) ([]byte, string, error) {
rawEvent["hops"] = SourceMeta{
Source: source,
Event: event,
Action: action,
// unique is used when we want identical input to be regarded as a different message.
// Any random string will do the job of changing the hash result.
Unique: unique,
}
sourceBytes, err := json.Marshal(rawEvent)
if err != nil {
return nil, "", err
}
// We don't really care about the UUID namespace, so we just use an existing one
sourceUUID := uuid.NewSHA1(uuid.NameSpaceDNS, sourceBytes)
hash := sourceUUID.String()
return sourceBytes, hash, nil
}
func Parse(msg jetstream.Msg) (*MsgMeta, error) {
message := &MsgMeta{msg: msg}
err := message.initTokens()
if err != nil {
return nil, err
}
err = message.initMetadata()
if err != nil {
return nil, err
}
return message, nil
}
func (m *MsgMeta) Msg() jetstream.Msg {
return m.msg
}
func (m *MsgMeta) ResponseSubject() string {
tokens := []string{
m.AccountId,
m.InterestTopic,
ChannelNotify,
m.SequenceId,
m.MessageId,
}
return strings.Join(tokens, ".")
}
func (m *MsgMeta) SequenceFilter() string {
tokens := []string{
m.AccountId,
m.InterestTopic,
ChannelNotify,
m.SequenceId,
">",
}
return strings.Join(tokens, ".")
}
func (m *MsgMeta) initMetadata() error {
meta, err := m.msg.Metadata()
if err != nil {
return err
}
m.StreamSequence = meta.Sequence.Stream
m.ConsumerSequence = meta.Sequence.Consumer
m.Timestamp = meta.Timestamp
m.NumPending = meta.NumPending
return nil
}
// initTokens parses tokens from a message subject into the Msg struct
//
// Example hops subjects are:
// `account_id.interest_topic.notify.sequence_id.event`
// `account_id.interest_topic.notify.sequence_id.hops`
// `account_id.interest_topic.notify.sequence_id.message_id`
// `account_id.interest_topic.request.sequence_id.message_id.app.handler`
func (m *MsgMeta) initTokens() error {
subjectTokens := strings.Split(m.msg.Subject(), ".")
if len(subjectTokens) < 5 {
return fmt.Errorf("Invalid message subject (too few tokens): %s", m.msg.Subject())
}
m.AccountId = subjectTokens[0]
m.InterestTopic = subjectTokens[1]
m.Channel = subjectTokens[2]
m.SequenceId = subjectTokens[3]
m.MessageId = subjectTokens[4]
if len(subjectTokens) == 6 {
m.Done = subjectTokens[5] == DoneMessageId
}
switch m.Channel {
case ChannelNotify:
return nil
case ChannelRequest:
if len(subjectTokens) < 7 {
return fmt.Errorf("Invalid request message subject (too few tokens): %s", m.msg.Subject())
}
m.AppName = subjectTokens[5]
m.HandlerName = subjectTokens[6]
return nil
default:
return fmt.Errorf("Invalid message subject (unknown channel %s): %s", m.Channel, m.msg.Subject())
}
}
func NewResultMsg(startedAt time.Time, result interface{}, err error) ResultMsg {
var resultJson interface{}
resultStr, ok := result.(string)
if !ok {
resultJson = result
}
errMsg := ""
if err != nil {
errMsg = err.Error()
}
resultMsg := ResultMsg{
Body: resultStr,
Completed: err == nil,
Done: true,
Errored: err != nil,
Hops: HopsResultMeta{
StartedAt: startedAt,
FinishedAt: time.Now(),
Error: errMsg,
},
JSON: resultJson,
}
return resultMsg
}
// EventLogFilterSubject returns the subject used to get events for display to the
// user in the UI.
//
// accountId: The account id to filter on
// eventFilter: either AllEventId or SourceEventId
func EventLogFilterSubject(accountId string, interestTopic string, eventFilter string) string {
tokens := []string{
accountId,
interestTopic,
"*",
"*",
eventFilter,
}
return strings.Join(tokens, ".")
}
// NotifyFilterSubject returns the filter subject to get notify messages for the account
func NotifyFilterSubject(accountId string, interestTopic string) string {
tokens := []string{
accountId,
interestTopic,
ChannelNotify,
">",
}
return strings.Join(tokens, ".")
}
func ReplayFilterSubject(accountId string, interestTopic string, sequenceId string) string {
tokens := []string{
accountId,
interestTopic,
ChannelNotify,
sequenceId,
">",
}
return strings.Join(tokens, ".")
}
// RequestFilterSubject returns the filter subject to get request messages for the account
func RequestFilterSubject(accountId string, interestTopic string) string {
tokens := []string{
accountId,
interestTopic,
ChannelRequest,
">",
}
return strings.Join(tokens, ".")
}
func SequenceHopsKeyTokens(sequenceId string) []string {
return []string{
ChannelNotify,
sequenceId,
HopsMessageId,
}
}
func SourceEventSubject(accountId string, interestTopic string, sequenceId string) string {
tokens := []string{
accountId,
interestTopic,
ChannelNotify,
sequenceId,
"event",
}
return strings.Join(tokens, ".")
}
// WorkerRequestFilterSubject returns the filter subject for the worker consumer
func WorkerRequestFilterSubject(accountId string, interestTopic string, appName string, handler string) string {
tokens := []string{
accountId,
interestTopic,
ChannelRequest,
"*",
"*",
appName,
handler,
}
return strings.Join(tokens, ".")
}