-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
usage.go
270 lines (235 loc) · 7.62 KB
/
usage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
const (
serviceNamesUsageTable = "service-names"
)
// usageTableSchema returns a new table schema used for tracking various indexes
// for the Raft log.
func usageTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "usage",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Lowercase: true,
},
},
},
}
}
func init() {
registerSchema(usageTableSchema)
}
// UsageEntry represents a count of some arbitrary identifier within the
// state store, along with the last seen index.
type UsageEntry struct {
ID string
Index uint64
Count int
}
// ServiceUsage contains all of the usage data related to services
type ServiceUsage struct {
Services int
ServiceInstances int
EnterpriseServiceUsage
}
type uniqueServiceState int
const (
NoChange uniqueServiceState = 0
Deleted uniqueServiceState = 1
Created uniqueServiceState = 2
)
// updateUsage takes a set of memdb changes and computes a delta for specific
// usage metrics that we track.
func updateUsage(tx WriteTxn, changes Changes) error {
usageDeltas := make(map[string]int)
serviceNameChanges := make(map[structs.ServiceName]int)
for _, change := range changes.Changes {
var delta int
if change.Created() {
delta = 1
} else if change.Deleted() {
delta = -1
}
switch change.Table {
case "nodes":
usageDeltas[change.Table] += delta
case "services":
svc := changeObject(change).(*structs.ServiceNode)
usageDeltas[change.Table] += delta
addEnterpriseServiceInstanceUsage(usageDeltas, change)
// Construct a mapping of all of the various service names that were
// changed, in order to compare it with the finished memdb state.
// Make sure to account for the fact that services can change their names.
if serviceNameChanged(change) {
serviceNameChanges[svc.CompoundServiceName()] += 1
before := change.Before.(*structs.ServiceNode)
serviceNameChanges[before.CompoundServiceName()] -= 1
} else {
serviceNameChanges[svc.CompoundServiceName()] += delta
}
}
}
serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges)
if err != nil {
return err
}
addEnterpriseServiceUsage(usageDeltas, serviceStates)
idx := changes.Index
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
if idx == 0 {
idx = maxIndexTxn(tx, "nodes", servicesTableName)
}
return writeUsageDeltas(tx, idx, usageDeltas)
}
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
for svc, delta := range serviceNameChanges {
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta)
if err != nil {
return nil, err
}
// Count the number of service instances associated with the given service
// name at the end of this transaction, and compare that with how many were
// added/removed during the transaction. This allows us to handle a single
// transaction committing multiple changes related to a single service
// name.
var svcCount int
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
svcCount += 1
}
var serviceState uniqueServiceState
switch {
case svcCount == 0:
// If no services exist, we know we deleted the last service
// instance.
serviceState = Deleted
usageDeltas[serviceNamesUsageTable] -= 1
case svcCount == delta:
// If the current number of service instances equals the number added,
// than we know we created a new service name.
serviceState = Created
usageDeltas[serviceNamesUsageTable] += 1
default:
serviceState = NoChange
}
serviceStates[svc] = serviceState
}
return serviceStates, nil
}
// serviceNameChanged returns a boolean that indicates whether the
// provided change resulted in an update to the service's service name.
func serviceNameChanged(change memdb.Change) bool {
if change.Updated() {
before := change.Before.(*structs.ServiceNode)
after := change.After.(*structs.ServiceNode)
return before.ServiceName != after.ServiceName
}
return false
}
// writeUsageDeltas will take in a map of IDs to deltas and update each
// entry accordingly, checking for integer underflow. The index that is
// passed in will be recorded on the entry as well.
func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error {
for id, delta := range usageDeltas {
u, err := tx.First("usage", "id", id)
if err != nil {
return fmt.Errorf("failed to retrieve existing usage entry: %s", err)
}
if u == nil {
if delta < 0 {
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
delta = 0
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Count: delta,
Index: idx,
})
if err != nil {
return fmt.Errorf("failed to update usage entry: %s", err)
}
} else if cur, ok := u.(*UsageEntry); ok {
updated := cur.Count + delta
if updated < 0 {
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
updated = 0
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Count: updated,
Index: idx,
})
if err != nil {
return fmt.Errorf("failed to update usage entry: %s", err)
}
}
}
return nil
}
// NodeCount returns the latest seen Raft index, a count of the number of nodes
// registered, and any errors.
func (s *Store) NodeCount() (uint64, int, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
nodeUsage, err := firstUsageEntry(tx, "nodes")
if err != nil {
return 0, 0, fmt.Errorf("failed nodes lookup: %s", err)
}
return nodeUsage.Index, nodeUsage.Count, nil
}
// ServiceUsage returns the latest seen Raft index, a compiled set of service
// usage data, and any errors.
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
serviceInstances, err := firstUsageEntry(tx, servicesTableName)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
services, err := firstUsageEntry(tx, serviceNamesUsageTable)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
usage := ServiceUsage{
ServiceInstances: serviceInstances.Count,
Services: services.Count,
}
results, err := compileEnterpriseUsage(tx, usage)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
return serviceInstances.Index, results, nil
}
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
usage, err := tx.First("usage", "id", id)
if err != nil {
return nil, err
}
// If no elements have been inserted, the usage entry will not exist. We
// return a valid value so that can be certain the return value is not nil
// when no error has occurred.
if usage == nil {
return &UsageEntry{ID: id, Count: 0}, nil
}
realUsage, ok := usage.(*UsageEntry)
if !ok {
return nil, fmt.Errorf("failed usage lookup: type %T is not *UsageEntry", usage)
}
return realUsage, nil
}