forked from asonawalla/gazette
/
route_extensions.go
109 lines (97 loc) · 2.92 KB
/
route_extensions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package protocol
import (
"github.com/LiveRamp/gazette/v2/pkg/allocator"
"github.com/LiveRamp/gazette/v2/pkg/keyspace"
)
// Initialize Route with the provided allocator Assignments.
func (m *Route) Init(assignments keyspace.KeyValues) {
*m = Route{Primary: -1, Members: m.Members[:0]}
for _, kv := range assignments {
var a = kv.Decoded.(allocator.Assignment)
if a.Slot == 0 {
m.Primary = int32(len(m.Members))
}
m.Members = append(m.Members, ProcessSpec_ID{
Zone: a.MemberZone,
Suffix: a.MemberSuffix,
})
}
}
// Copy returns a deep copy of the Route.
func (m Route) Copy() Route {
return Route{
Members: append([]ProcessSpec_ID(nil), m.Members...),
Primary: m.Primary,
Endpoints: append([]Endpoint(nil), m.Endpoints...),
}
}
// AttachEndpoints maps Route members through the KeySpace to their respective
// specs, and attaches the associated Endpoint of each to the Route.
// KeySpace must already be read-locked.
func (m *Route) AttachEndpoints(ks *keyspace.KeySpace) {
if len(m.Members) != 0 {
m.Endpoints = make([]Endpoint, len(m.Members))
}
for i, b := range m.Members {
if member, ok := allocator.LookupMember(ks, b.Zone, b.Suffix); !ok {
continue // Assignment with missing Member. Ignore.
} else {
m.Endpoints[i] = member.MemberValue.(interface {
GetEndpoint() Endpoint
}).GetEndpoint()
}
}
}
// Validate returns an error if the Route is not well-formed.
func (m Route) Validate() error {
for i, b := range m.Members {
if err := b.Validate(); err != nil {
return ExtendContext(err, "Members[%d]", i)
}
if i != 0 && !m.Members[i-1].Less(b) {
return NewValidationError("Members not in unique, sorted order (index %d; %+v <= %+v)",
i, m.Members[i-1], m.Members[i])
}
}
if m.Primary < -1 || m.Primary >= int32(len(m.Members)) {
return NewValidationError("invalid Primary (%+v; expected -1 <= Primary < %d)",
m.Primary, len(m.Members))
}
if l := len(m.Endpoints); l != 0 && l != len(m.Members) {
return NewValidationError("len(Endpoints) != 0, and != len(Members) (%d vs %d)",
l, len(m.Members))
}
for i, ep := range m.Endpoints {
if ep == "" {
continue
} else if err := ep.Validate(); err != nil {
return ExtendContext(err, "Endpoints[%d]", i)
}
}
return nil
}
// Equivalent returns true if the Routes have equivalent broker Names, Zones,
// and current Primary. It does not compare broker Endpoints.
func (m Route) Equivalent(other *Route) bool {
if other == nil {
return false
} else if m.Primary != other.Primary {
return false
} else if len(m.Members) != len(other.Members) {
return false
}
for i := range m.Members {
if m.Members[i] != other.Members[i] {
return false
}
}
return true
}
// MarshalString returns the marshaled encoding of the Route as a string.
func (m Route) MarshalString() string {
var d, err = m.Marshal()
if err != nil {
panic(err.Error()) // Cannot happen, as we use no custom marshalling.
}
return string(d)
}