forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 1
/
coordinate.go
172 lines (149 loc) · 4.67 KB
/
coordinate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
)
// coordinatesTableSchema returns a new table schema used for storing
// network coordinates.
func coordinatesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "coordinates",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
},
}
}
func init() {
registerSchema(coordinatesTableSchema)
}
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Coordinates is used when restoring from a snapshot. For general inserts, use
// CoordinateBatchUpdate. We do less vetting of the updates here because they
// already got checked on the way in during a batch update.
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
for _, update := range updates {
// Skip any bad data that may have gotten into the database from
// a bad client in the past.
if !update.Coord.IsValid() {
continue
}
if err := s.tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
}
}
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
tableIdx := maxIndexTxn(tx, "coordinates")
iter, err := tx.Get("coordinates", "node", node)
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
results := make(lib.CoordinateSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
}
return tableIdx, results, nil
}
// Coordinates queries for all nodes with coordinates.
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "coordinates")
// Pull all the coordinates.
iter, err := tx.Get("coordinates", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results structs.Coordinates
for coord := iter.Next(); coord != nil; coord = iter.Next() {
results = append(results, coord.(*structs.Coordinate))
}
return idx, results, nil
}
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single transaction.
func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Upsert the coordinates.
for _, update := range updates {
// Skip any bad data that may have gotten into the database from
// a bad client in the past.
if !update.Coord.IsValid() {
continue
}
// Since the cleanup of coordinates is tied to deletion of
// nodes, we silently drop any updates for nodes that we don't
// know about. This might be possible during normal operation
// if we happen to get a coordinate update for a node that
// hasn't been able to add itself to the catalog yet. Since we
// don't carefully sequence this, and since it will fix itself
// on the next coordinate update from that node, we don't return
// an error or log anything.
node, err := tx.First("nodes", "id", update.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
continue
}
if err := tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
}
// Update the index.
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Commit()
return nil
}