-
Notifications
You must be signed in to change notification settings - Fork 51
/
shard_spec_extensions.go
357 lines (317 loc) · 10.2 KB
/
shard_spec_extensions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package protocol
import (
"fmt"
"math"
"path"
pb "go.gazette.dev/core/broker/protocol"
)
// ShardID uniquely identifies a shard processed by a Gazette consumer.
type ShardID string
// MaxHotStandbys is a global bound on the degree of hot standbys desired
// by any ShardSpec.
var MaxHotStandbys uint32 = math.MaxUint32
// RoutedShardClient composes a ShardClient and DispatchRouter.
type RoutedShardClient interface {
ShardClient
pb.DispatchRouter
}
// NewRoutedShardClient composes a ShardClient and DispatchRouter.
func NewRoutedShardClient(sc ShardClient, dr pb.DispatchRouter) RoutedShardClient {
return struct {
ShardClient
pb.DispatchRouter
}{sc, dr}
}
// Validate returns an error if the Shard is not well-formed.
func (id ShardID) Validate() error {
if err := pb.ValidateToken(id.String(), pb.TokenSymbols, minShardNameLen, maxShardNameLen); err != nil {
return err
}
return nil
}
// String returns the Shard as a string.
func (id ShardID) String() string { return string(id) }
// Validate returns an error if the ShardSpec is not well-formed.
func (m *ShardSpec) Validate() error {
if err := m.Id.Validate(); err != nil {
return pb.ExtendContext(err, "Id")
} else if m.RecoveryLogPrefix != "" && m.RecoveryLog().Validate() != nil {
return pb.ExtendContext(m.RecoveryLog().Validate(), "RecoveryLogPrefix")
} else if m.RecoveryLogPrefix == "" && m.HintPrefix != "" {
return pb.NewValidationError("invalid non-empty HintPrefix with empty RecoveryLogPrefix (%v)", m.HintPrefix)
} else if m.RecoveryLogPrefix != "" && !isAbsoluteCleanNonDirPath(m.HintPrefix) {
return pb.NewValidationError("HintPrefix is not an absolute, clean, non-directory path (%v)", m.HintPrefix)
} else if m.HintBackups < 0 {
return pb.NewValidationError("invalid HintBackups (%d; expected >= 0)", m.HintBackups)
} else if m.MinTxnDuration < 0 {
return pb.NewValidationError("invalid MinTxnDuration (%d; expected >= 0)", m.MinTxnDuration)
} else if m.MaxTxnDuration <= 0 {
return pb.NewValidationError("invalid MaxTxnDuration (%d; expected > 0)", m.MaxTxnDuration)
} else if err = m.LabelSet.Validate(); err != nil {
return pb.ExtendContext(err, "LabelSet")
} else if err = pb.ValidateSingleValueLabels(m.LabelSet); err != nil {
return pb.ExtendContext(err, "LabelSet")
} else if len(m.LabelSet.ValuesOf("id")) != 0 {
return pb.NewValidationError(`Labels cannot include label "id"`)
}
for i := range m.Sources {
if err := m.Sources[i].Validate(); err != nil {
return pb.ExtendContext(err, "Sources[%d]", i)
} else if i != 0 && m.Sources[i].Journal <= m.Sources[i-1].Journal {
return pb.NewValidationError("Sources.Journal not in unique, sorted order (index %d; %+v <= %+v)",
i, m.Sources[i].Journal, m.Sources[i-1].Journal)
}
}
// HotStandbys, Disable, and DisableWaitForAck require no extra validation.
return nil
}
// Validate returns an error if the ShardSpec_Source is not well-formed.
func (m *ShardSpec_Source) Validate() error {
if err := m.Journal.Validate(); err != nil {
return pb.ExtendContext(err, "Journal")
} else if m.MinOffset < 0 {
return pb.NewValidationError("invalid MinOffset (%d; expected > 0)", m.MinOffset)
}
return nil
}
// MarshalString returns the marshaled encoding of the ShardSpec as a string.
func (m *ShardSpec) MarshalString() string {
var d, err = m.Marshal()
if err != nil {
panic(err.Error()) // Cannot happen, as we use no custom marshalling.
}
return string(d)
}
// DesiredReplication is the desired number of shard replicas. allocator.ItemValue implementation.
func (m *ShardSpec) DesiredReplication() int {
if m.Disable {
return 0
}
if MaxHotStandbys < m.HotStandbys {
return 1 + int(MaxHotStandbys)
}
return 1 + int(m.HotStandbys)
}
// RecoveryLog returns the Journal to which the Shard's recovery log is recorded.
// IF the Shard has no recovery log, "" is returned..
func (m *ShardSpec) RecoveryLog() pb.Journal {
if m.RecoveryLogPrefix == "" {
return ""
}
return pb.Journal(m.RecoveryLogPrefix + "/" + m.Id.String())
}
// HintPrimaryKey returns the Etcd key to which recorded, primary hints are written.
func (m *ShardSpec) HintPrimaryKey() string { return m.HintPrefix + "/" + m.Id.String() + ".primary" }
// HintBackupKeys returns Etcd keys to which verified, disaster-recovery hints are written.
func (m *ShardSpec) HintBackupKeys() []string {
var keys = make([]string, m.HintBackups)
for i := int32(0); i != m.HintBackups; i++ {
keys[i] = fmt.Sprintf("%s/%s.backup.%d", m.HintPrefix, m.Id.String(), i)
}
return keys
}
// UnionShardSpecs returns a ShardSpec combining all non-zero-valued fields
// across |a| and |b|. Where both |a| and |b| provide a non-zero value for
// a field, the value of |a| is retained.
func UnionShardSpecs(a, b ShardSpec) ShardSpec {
if len(a.Sources) == 0 {
a.Sources = append(a.Sources, b.Sources...)
}
if a.RecoveryLogPrefix == "" {
a.RecoveryLogPrefix = b.RecoveryLogPrefix
}
if a.HintPrefix == "" {
a.HintPrefix = b.HintPrefix
}
if a.HintBackups == 0 {
a.HintBackups = b.HintBackups
}
if a.MaxTxnDuration == 0 {
a.MaxTxnDuration = b.MaxTxnDuration
}
if a.MinTxnDuration == 0 {
a.MinTxnDuration = b.MinTxnDuration
}
if !a.Disable {
a.Disable = b.Disable
}
if a.HotStandbys == 0 {
a.HotStandbys = b.HotStandbys
}
a.LabelSet = pb.UnionLabelSets(a.LabelSet, b.LabelSet, pb.LabelSet{})
if !a.DisableWaitForAck {
a.DisableWaitForAck = b.DisableWaitForAck
}
if a.RingBufferSize == 0 {
a.RingBufferSize = b.RingBufferSize
}
if a.ReadChannelSize == 0 {
a.ReadChannelSize = b.ReadChannelSize
}
return a
}
// IntersectShardSpecs returns a ShardSpec having a non-zero-valued field
// for each field value which is shared between |a| and |b|.
func IntersectShardSpecs(a, b ShardSpec) ShardSpec {
if !sourcesEq(a.Sources, b.Sources) {
a.Sources = nil
}
if a.RecoveryLogPrefix != b.RecoveryLogPrefix {
a.RecoveryLogPrefix = ""
}
if a.HintPrefix != b.HintPrefix {
a.HintPrefix = ""
}
if a.HintBackups != b.HintBackups {
a.HintBackups = 0
}
if a.MaxTxnDuration != b.MaxTxnDuration {
a.MaxTxnDuration = 0
}
if a.MinTxnDuration != b.MinTxnDuration {
a.MinTxnDuration = 0
}
if a.Disable != b.Disable {
a.Disable = false
}
if a.HotStandbys != b.HotStandbys {
a.HotStandbys = 0
}
a.LabelSet = pb.IntersectLabelSets(a.LabelSet, b.LabelSet, pb.LabelSet{})
if a.DisableWaitForAck != b.DisableWaitForAck {
a.DisableWaitForAck = false
}
if a.RingBufferSize != b.RingBufferSize {
a.RingBufferSize = 0
}
if a.ReadChannelSize != b.ReadChannelSize {
a.ReadChannelSize = 0
}
return a
}
// SubtractShardSpecs returns a ShardSpec derived from |a| but having a
// zero-valued field for each field which is matched by |b|.
func SubtractShardSpecs(a, b ShardSpec) ShardSpec {
if sourcesEq(a.Sources, b.Sources) {
a.Sources = nil
}
if a.RecoveryLogPrefix == b.RecoveryLogPrefix {
a.RecoveryLogPrefix = ""
}
if a.HintPrefix == b.HintPrefix {
a.HintPrefix = ""
}
if a.HintBackups == b.HintBackups {
a.HintBackups = 0
}
if a.MaxTxnDuration == b.MaxTxnDuration {
a.MaxTxnDuration = 0
}
if a.MinTxnDuration == b.MinTxnDuration {
a.MinTxnDuration = 0
}
if a.Disable == b.Disable {
a.Disable = false
}
if a.HotStandbys == b.HotStandbys {
a.HotStandbys = 0
}
a.LabelSet = pb.SubtractLabelSet(a.LabelSet, b.LabelSet, pb.LabelSet{})
if a.DisableWaitForAck == b.DisableWaitForAck {
a.DisableWaitForAck = false
}
if a.RingBufferSize == b.RingBufferSize {
a.RingBufferSize = 0
}
if a.ReadChannelSize == b.ReadChannelSize {
a.ReadChannelSize = 0
}
return a
}
// ExtractShardSpecMetaLabels returns meta-labels of the ShardSpec, using |out| as a buffer.
func ExtractShardSpecMetaLabels(spec *ShardSpec, out pb.LabelSet) pb.LabelSet {
out.Labels = append(out.Labels[:0], pb.Label{Name: "id", Value: spec.Id.String()})
return out
}
// Validate returns an error if the ConsumerSpec is not well-formed.
func (m *ConsumerSpec) Validate() error {
if err := m.ProcessSpec.Validate(); err != nil {
return err
}
// ShardLimit requires no extra validation.
return nil
}
// MarshalString returns the marshaled encoding of the ConsumerSpec as a string.
func (m *ConsumerSpec) MarshalString() string {
var d, err = m.Marshal()
if err != nil {
panic(err.Error()) // Cannot happen, as we use no custom marshalling.
}
return string(d)
}
// ZeroLimit zeros the ConsumerSpec ShardLimit.
func (m *ConsumerSpec) ZeroLimit() { m.ShardLimit = 0 }
// ItemLimit is the maximum number of shards this consumer may process. allocator.MemberValue implementation.
func (m *ConsumerSpec) ItemLimit() int { return int(m.ShardLimit) }
// Reduce folds another ReplicaStatus into this one.
func (m *ReplicaStatus) Reduce(other *ReplicaStatus) {
if other.Code > m.Code {
m.Code = other.Code
}
for _, e := range other.Errors {
m.Errors = append(m.Errors, e)
}
}
// Validate returns an error if the ReplicaStatus is not well-formed.
func (m *ReplicaStatus) Validate() error {
if err := m.Code.Validate(); err != nil {
return pb.ExtendContext(err, "Code")
}
if len(m.Errors) == 0 {
if m.Code == ReplicaStatus_FAILED {
return pb.NewValidationError("expected non-empty Errors with Code FAILED")
}
} else if m.Code != ReplicaStatus_FAILED {
return pb.NewValidationError("expected Code FAILED with non-empty Errors")
}
return nil
}
// Validate returns an error if the ReplicaStatus_Code is not well-formed.
func (x ReplicaStatus_Code) Validate() error {
if _, ok := ReplicaStatus_Code_name[int32(x)]; !ok {
return pb.NewValidationError("invalid code (%s)", x)
}
return nil
}
// MarshalString returns the marshaled encoding of the ReplicaStatus as a string.
func (m *ReplicaStatus) MarshalString() string {
var d, err = m.Marshal()
if err != nil {
panic(err.Error()) // Cannot happen, as we use no custom marshalling.
}
return string(d)
}
// Validate returns an error if the Status is not well-formed.
func (x Status) Validate() error {
if _, ok := Status_name[int32(x)]; !ok {
return pb.NewValidationError("invalid status (%s)", x)
}
return nil
}
func sourcesEq(a, b []ShardSpec_Source) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
func isAbsoluteCleanNonDirPath(p string) bool {
return path.IsAbs(p) && path.Clean(p) == p && path.Base(p) != ""
}
const (
minShardNameLen, maxShardNameLen = 4, 512
)