forked from asonawalla/gazette
/
routines.go
177 lines (149 loc) · 5.07 KB
/
routines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package message
import (
"bufio"
"fmt"
"hash/fnv"
"io"
"math/rand"
"sync"
"github.com/LiveRamp/gazette/v2/pkg/client"
"github.com/LiveRamp/gazette/v2/pkg/labels"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
)
// Publish maps the Message to its target journal and begins an Append of the
// Message's marshaled content under the mapped journal framing. If Message
// implements Validate, the message is first validated and any error returned.
func Publish(broker client.AsyncJournalClient, mapping MappingFunc, msg Message) (*client.AsyncAppend, error) {
if v, ok := msg.(interface{ Validate() error }); ok {
if err := v.Validate(); err != nil {
return nil, err
}
}
var journal, framing, err = mapping(msg)
if err != nil {
return nil, err
}
var aa = broker.StartAppend(journal)
aa.Require(framing.Marshal(msg, aa.Writer()))
if err = aa.Release(); err != nil {
return nil, err
}
return aa, nil
}
// FramingByContentType returns the Framing having the corresponding |contentType|,
// or returns an error if none match.
func FramingByContentType(contentType string) (Framing, error) {
switch contentType {
case labels.ContentType_ProtoFixed:
return FixedFraming, nil
case labels.ContentType_JSONLines:
return JSONFraming, nil
default:
return nil, fmt.Errorf(`unrecognized %s (%s)`, labels.ContentType, contentType)
}
}
// UnpackLine returns bytes through to the first encountered newline "\n". If
// the complete line is in the Reader buffer, no alloc or copy is needed.
func UnpackLine(r *bufio.Reader) ([]byte, error) {
// Fast path: a line is fully contained in the buffer.
var line, err = r.ReadSlice('\n')
if err == bufio.ErrBufferFull {
// Slow path: the line spills across multiple buffer fills.
err = nil
line = append([]byte(nil), line...) // Copy as |line| references an internal buffer.
var rest []byte
if rest, err = r.ReadBytes('\n'); err == nil {
line = append(line, rest...)
}
}
if err == io.EOF && len(line) != 0 {
// If we read at least one byte, then an EOF is unexpected (it should
// occur only on whole-message boundaries).
err = io.ErrUnexpectedEOF
}
return line, err
}
// RandomMapping returns a MappingFunc which maps a Message to a randomly
// selected Journal of the PartitionsFunc.
func RandomMapping(partitions PartitionsFunc) MappingFunc {
return func(msg Message) (journal pb.Journal, framing Framing, err error) {
var parts = partitions()
if len(parts.Journals) == 0 {
err = ErrEmptyListResponse
return
}
var ind = rand.Intn(len(parts.Journals))
journal = parts.Journals[ind].Spec.Name
var ct = parts.Journals[ind].Spec.LabelSet.ValueOf(labels.ContentType)
framing, err = FramingByContentType(ct)
return
}
}
// ModuloMapping returns a MappingFunc which maps a Message into a stable
// Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the
// MappingKeyFunc and modulo arithmetic.
func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc {
return func(msg Message) (journal pb.Journal, framing Framing, err error) {
var parts = partitions()
if len(parts.Journals) == 0 {
err = ErrEmptyListResponse
return
}
var h = fnv.New32a()
_, _ = h.Write(key(msg, make([]byte, 0, 32)))
var ind = int(h.Sum32()) % len(parts.Journals)
journal = parts.Journals[ind].Spec.Name
var ct = parts.Journals[ind].Spec.LabelSet.ValueOf(labels.ContentType)
framing, err = FramingByContentType(ct)
return
}
}
// RendezvousMapping returns a MappingFunc which maps a Message into a stable
// Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the
// MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is
// more expensive to compute than using modulo arithmetic, but is still efficient
// and minimizes reassignments which occur when journals are added or removed.
func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc {
// We cache hashes derived from ListResponses. So long as the PartitionsFunc
// result is pointer-equal, derived hashes can be cheaply re-used.
var lastLR *pb.ListResponse
var lastHashes []uint32
var mu sync.Mutex
var partitionsAndHashes = func() (lr *pb.ListResponse, hashes []uint32) {
lr = partitions()
mu.Lock()
if lr != lastLR {
// Recompute hashes of each journal name.
lastLR, lastHashes = lr, make([]uint32, len(lr.Journals))
for i, journal := range lr.Journals {
var h = fnv.New32a()
_, _ = h.Write([]byte(journal.Spec.Name))
lastHashes[i] = h.Sum32()
}
}
hashes = lastHashes
mu.Unlock()
return
}
return func(msg Message) (journal pb.Journal, framing Framing, err error) {
var lr, hashes = partitionsAndHashes()
if len(lr.Journals) == 0 {
err = ErrEmptyListResponse
return
}
var h = fnv.New32a()
_, _ = h.Write(key(msg, make([]byte, 0, 32)))
var sum = h.Sum32()
var hrw uint32
var ind int
for i := range lr.Journals {
if w := sum ^ hashes[i]; w > hrw {
hrw, ind = w, i
}
}
journal = lr.Journals[ind].Spec.Name
var ct = lr.Journals[ind].Spec.LabelSet.ValueOf(labels.ContentType)
framing, err = FramingByContentType(ct)
return
}
}