/
payload.go
313 lines (263 loc) · 8.36 KB
/
payload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package sabuhp
import (
"fmt"
"io"
"net/url"
"strings"
"time"
"github.com/influx6/npkg/nthen"
"github.com/influx6/npkg/nunsafe"
"github.com/influx6/npkg/nxid"
"github.com/influx6/npkg"
)
// WriterToSplitter takes a desired writerTo object and transforms
// the stream into a series of messages message parts which will
// be assembled on the following receiving end.
type WriterToSplitter interface {
Split(w io.WriterTo) (chan<- Message, error)
}
// BytesSplitter takes a large block of bytes returning a chan of messages which are
// part messages which represent the whole of said bytes. This allows larger messages
// be sent across message channels with ease.
type BytesSplitter interface {
SplitBytes(data []byte) (chan<- Message, error)
}
type Message struct {
// Optional future which will indicate if message delivery should
// notify attached future on result.
Future *nthen.Future
// Path of the request producing this if from http.
Path string
// IP of the request producing this if from http.
IP string
// LocalIP of the request producing this if from http.
LocalIP string
// SuggestedStatusCode is an optional field settable by the
// creator to suggest possible status code of a message.
SuggestedStatusCode int
// ContentType is an required value set default to MessageContentType.
// Its an important use in the translators where its the deciding factor
// if a message is written as a whole or just the payload into the
// response object.
ContentType string
// FormName is optional attached form name which represents this data.
FormName string
// FileName is optional attached file name which represents this data.
FileName string
// Headers are related facts attached to a message.
Headers Header
// Headers are related facts attached to a message.
//
// Only available when set, so it's very optional
Cookies []Cookie
// Form contains the parsed form data, including both the URL
// field's query parameters and the PATCH, POST, or PUT form data.
//
// Only available when set, so it's very optional
Form url.Values
// Query contains the parsed form data, including both the URL
// field's query parameters and the PATCH, POST, or PUT form data.
//
// Only available when set, so it's very optional
Query url.Values
// Within indicates senders intent on how long they are
// willing to wait for message delivery. Usually this should end
// with error resolution of attached future if present.
Within time.Duration
// Id is the unique id attached to giving message
// for tracking it's delivery and trace its different touch
// points where it was handled.
Id nxid.ID
// EndPartId is the unique id attached to giving messages which
// indicate the expected end id which when seen as the Id
// should consider a part stream as completed.
//
// This will be created from the start and then tagged to the final
// message as both the EndPartId and PartId fields, which will identify
// that a series of broken messages have been completed.
EndPartId nxid.ID
// PartId is the unique id attached to giving messages when
// they are a shared part of a larger messages. There are cases
// when a message may become sent as broken parts for recollection
// at the other end.
PartId nxid.ID
// SubscribeGroup for subscribe/unsubscribe message types which
// allow to indicate which group a topic should fall into.
SubscribeGroup string
// SubscribeTo for subscribe/unsubscribe message types which
// allow to indicate which topic should a subscribe or unsubscribe
// be applied to.
SubscribeTo string
// Topic for giving message (serving as to address).
Topic string
// FromAddr is the logical address of the sender of message.
FromAddr string
// Bytes is the payload for giving message.
Bytes []byte
// Metadata are related facts attached to a message.
Metadata Params
// Params are related facts attached to a message based on some route or
// sender and provide some values to keyed expectation, unlike metadata
// it has specific input in the message.
Params Params
// Parts are possible fragments collected of a message which was split into
// multiple parts to send over the wire and have being collected through the use
// of the PartId.
//
// We do this because we do not let handlers handle a list of messages but one
// and to accomodate large messages split in parts or messages which are logical
// parts of themselves, this field is an option, generally.
// Codecs should never read this
Parts []Message
}
// ReplyWithTopic returns a new message with provided topic.
func (m Message) ReplyWithTopic(t string) Message {
return Message{
Topic: t,
ContentType: MessageContentType,
Id: nxid.New(),
Params: Params{},
Metadata: Params{},
}
}
// ReplyTo returns a new instance of a Message using the FromAddr as the
// topic.
func (m Message) ReplyTo() Message {
return Message{
ContentType: MessageContentType,
Id: nxid.New(),
Topic: m.FromAddr,
Params: Params{},
Metadata: Params{},
}
}
// ReplyToWith returns a new instance of a Message using the FromAddr as the
// topic.
func (m Message) ReplyToWith(params Params, meta Params, payload []byte) Message {
return Message{
ContentType: MessageContentType,
Params: params,
Metadata: meta,
Id: nxid.New(),
Topic: m.FromAddr,
}
}
const (
SUBSCRIBE = "+SUB"
UNSUBSCRIBE = "-USUB"
DONE = "+OK"
NOTDONE = "-NOK"
)
const MessageContentType = "application/x-event-message"
func NewMessage(topic string, fromAddr string, payload []byte) Message {
return Message{
Id: nxid.New(),
Topic: topic,
FromAddr: fromAddr,
Bytes: payload,
ContentType: MessageContentType,
}
}
func NOTOK(message string, fromAddr string) Message {
return Message{
Id: nxid.New(),
Topic: NOTDONE,
FromAddr: fromAddr,
Bytes: []byte(message),
ContentType: MessageContentType,
}
}
func BasicMsg(topic string, message string, fromAddr string) Message {
return Message{
Id: nxid.New(),
Topic: topic,
FromAddr: fromAddr,
Bytes: []byte(message),
ContentType: MessageContentType,
}
}
func OK(message string, fromAddr string) Message {
return Message{
Id: nxid.New(),
Topic: DONE,
FromAddr: fromAddr,
Bytes: []byte(message),
ContentType: MessageContentType,
}
}
func UnsubscribeMessage(topic string, grp string, fromAddr string) Message {
return Message{
Id: nxid.New(),
Topic: UNSUBSCRIBE,
FromAddr: fromAddr,
SubscribeGroup: grp,
SubscribeTo: topic,
ContentType: MessageContentType,
}
}
func SubscribeMessage(topic string, grp string, fromAddr string) Message {
return Message{
Id: nxid.New(),
Topic: SUBSCRIBE,
FromAddr: fromAddr,
SubscribeGroup: grp,
SubscribeTo: topic,
ContentType: MessageContentType,
}
}
func (m *Message) WithId(t nxid.ID) {
m.Id = t
}
func (m *Message) WithTopic(t string) {
m.Topic = t
}
func (m *Message) WithPayload(lp []byte) {
m.Bytes = lp
}
func (m *Message) WithMetadata(meta map[string]string) {
m.Metadata = meta
}
func (m *Message) WithParams(params map[string]string) {
m.Params = params
}
func (m Message) EncodeObject(encoder npkg.ObjectEncoder) {
encoder.String("topic", m.Topic)
encoder.String("from_addr", m.FromAddr)
encoder.String("Bytes", nunsafe.Bytes2String(m.Bytes))
encoder.StringMap("meta_data", m.Metadata)
}
func (m Message) String() string {
var content strings.Builder
content.WriteString("topic: ")
content.WriteString(m.Topic)
content.WriteString(",")
content.WriteString("from: ")
content.WriteString(m.FromAddr)
content.WriteString(",")
content.WriteString("payload: ")
content.WriteString(fmt.Sprintf("%q", m.Bytes))
content.WriteString(",")
content.WriteString("local_payload: ")
content.WriteString(",")
content.WriteString("Meta: ")
for key, val := range m.Metadata {
content.WriteString(key)
content.WriteString(": ")
content.WriteString(val)
content.WriteString(",")
}
content.WriteString(";")
return content.String()
}
// Copy returns a copy of this commands with underline data
// copied across. The copy
func (m Message) Copy() Message {
var meta = map[string]string{}
for key, val := range m.Metadata {
meta[key] = val
}
var clone = m
clone.Metadata = meta
clone.Bytes = append([]byte{}, m.Bytes...)
return clone
}