-
Notifications
You must be signed in to change notification settings - Fork 297
/
mgmt_svc.go
405 lines (356 loc) · 10.9 KB
/
mgmt_svc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
//
// (C) Copyright 2018-2023 Intel Corporation.
//
// SPDX-License-Identifier: BSD-2-Clause-Patent
//
package server
import (
"context"
"reflect"
"strings"
"time"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/daos-stack/daos/src/control/build"
"github.com/daos-stack/daos/src/control/common"
mgmtpb "github.com/daos-stack/daos/src/control/common/proto/mgmt"
"github.com/daos-stack/daos/src/control/events"
"github.com/daos-stack/daos/src/control/lib/control"
"github.com/daos-stack/daos/src/control/lib/daos"
"github.com/daos-stack/daos/src/control/logging"
"github.com/daos-stack/daos/src/control/system"
"github.com/daos-stack/daos/src/control/system/raft"
)
const (
groupUpdateInterval = 500 * time.Millisecond
batchLoopInterval = 250 * time.Millisecond
)
type (
batchRequest struct {
msg proto.Message
ctx context.Context
respCh batchRespChan
}
batchResponse struct {
msg proto.Message
err error
}
batchRespChan chan *batchResponse
batchReqChan chan *batchRequest
batchProcessResp struct {
msgType reflect.Type
retryableReqs []*batchRequest
}
batchProcessRespChan chan *batchProcessResp
)
func (br *batchRequest) sendResponse(parent context.Context, msg proto.Message, err error) {
select {
case <-parent.Done():
return
case <-br.ctx.Done():
return
case br.respCh <- &batchResponse{msg: msg, err: err}:
}
}
// mgmtSvc implements (the Go portion of) Management Service, satisfying
// mgmtpb.MgmtSvcServer.
type mgmtSvc struct {
mgmtpb.UnimplementedMgmtSvcServer
log logging.Logger
harness *EngineHarness
membership *system.Membership // if MS leader, system membership list
sysdb *raft.Database
rpcClient control.UnaryInvoker
events *events.PubSub
systemProps daos.SystemPropertyMap
clientNetworkHint []*mgmtpb.ClientNetHint
batchInterval time.Duration
batchReqs batchReqChan
serialReqs batchReqChan
groupUpdateReqs chan bool
lastMapVer uint32
}
func newMgmtSvc(h *EngineHarness, m *system.Membership, s *raft.Database, c control.UnaryInvoker, p *events.PubSub) *mgmtSvc {
return &mgmtSvc{
log: h.log,
harness: h,
membership: m,
sysdb: s,
rpcClient: c,
events: p,
systemProps: daos.SystemProperties(),
clientNetworkHint: []*mgmtpb.ClientNetHint{new(mgmtpb.ClientNetHint)},
batchInterval: batchLoopInterval,
batchReqs: make(batchReqChan),
serialReqs: make(batchReqChan),
groupUpdateReqs: make(chan bool),
}
}
// checkSystemRequest sanity checks that a request is not nil and
// has been sent to the correct system.
func (svc *mgmtSvc) checkSystemRequest(req proto.Message) error {
if common.InterfaceIsNil(req) {
return errors.New("nil request")
}
if sReq, ok := req.(interface{ GetSys() string }); ok {
comps := strings.Split(sReq.GetSys(), "-")
sysName := comps[0]
if len(comps) > 1 {
if _, err := build.NewVersion(comps[len(comps)-1]); err == nil {
sysName = strings.Join(comps[:len(comps)-1], "-")
} else {
sysName = strings.Join(comps, "-")
}
}
if sysName != svc.sysdb.SystemName() {
return FaultWrongSystem(sysName, svc.sysdb.SystemName())
}
}
return nil
}
// checkLeaderRequest performs sanity-checking on a request that must
// be run on the current MS leader.
func (svc *mgmtSvc) checkLeaderRequest(req proto.Message) error {
unwrapped, err := svc.unwrapCheckerReq(req)
if err != nil {
return err
}
if err := svc.checkSystemRequest(unwrapped); err != nil {
return err
}
return svc.sysdb.CheckLeader()
}
// checkReplicaRequest performs sanity-checking on a request that must
// be run on a MS replica.
func (svc *mgmtSvc) checkReplicaRequest(req proto.Message) error {
unwrapped, err := svc.unwrapCheckerReq(req)
if err != nil {
return err
}
if err := svc.checkSystemRequest(unwrapped); err != nil {
return err
}
return svc.sysdb.CheckReplica()
}
// startLeaderLoops kicks off the leader-only processing loops
// that will be canceled on leadership loss.
func (svc *mgmtSvc) startLeaderLoops(ctx context.Context) {
go svc.leaderTaskLoop(ctx)
}
// startAsyncLoops kicks off the asynchronous processing loops.
func (svc *mgmtSvc) startAsyncLoops(ctx context.Context) {
go svc.batchReqLoop(ctx)
go svc.serialReqLoop(ctx)
}
// submitBatchRequest submits a message for batch processing and waits for a response.
func (svc *mgmtSvc) submitBatchRequest(ctx context.Context, msg proto.Message) (proto.Message, error) {
respCh := make(batchRespChan, 1)
select {
case <-ctx.Done():
return nil, ctx.Err()
case svc.batchReqs <- &batchRequest{msg: msg, ctx: ctx, respCh: respCh}:
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-respCh:
return resp.msg, resp.err
}
}
// submitSerialRequest submits a message for serial processing and waits for a response.
func (svc *mgmtSvc) submitSerialRequest(ctx context.Context, msg proto.Message) (proto.Message, error) {
respCh := make(batchRespChan, 1)
select {
case <-ctx.Done():
return nil, ctx.Err()
case svc.serialReqs <- &batchRequest{msg: msg, ctx: ctx, respCh: respCh}:
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-respCh:
return resp.msg, resp.err
}
}
// processBatchPoolEvictions processes a batch of PoolEvictReq messages,
// consolidating them by pool ID in order to minimize the number of
// dRPC calls to the engine.
func (svc *mgmtSvc) processBatchPoolEvictions(ctx context.Context, bprChan batchProcessRespChan, reqs []*batchRequest) []*batchRequest {
poolReqs := make(map[string][]*batchRequest)
for _, req := range reqs {
msg, ok := req.msg.(*mgmtpb.PoolEvictReq)
if !ok {
req.sendResponse(ctx, nil, errors.Errorf("unexpected message type %T", req.msg))
continue
}
poolReqs[msg.Id] = append(poolReqs[msg.Id], req)
}
for poolID, reqs := range poolReqs {
if len(reqs) == 0 {
continue
}
batchReq := &mgmtpb.PoolEvictReq{Id: poolID, Sys: reqs[0].msg.(*mgmtpb.PoolEvictReq).Sys}
for _, req := range reqs {
batchReq.Handles = append(batchReq.Handles, req.msg.(*mgmtpb.PoolEvictReq).Handles...)
}
resp, err := svc.evictPoolConnections(ctx, batchReq)
if err != nil {
svc.log.Errorf("failed to evict pool connections for pool %s: %s", poolID, err)
for _, req := range reqs {
req.sendResponse(ctx, nil, err)
}
continue
}
for _, req := range reqs {
req.sendResponse(ctx, resp, nil)
}
}
return nil
}
// processBatchJoins processes a batch of JoinReq messages.
func (svc *mgmtSvc) processBatchJoins(ctx context.Context, bprChan batchProcessRespChan, reqs []*batchRequest) []*batchRequest {
var updateNeeded bool
for _, req := range reqs {
msg, ok := req.msg.(*mgmtpb.JoinReq)
if !ok {
req.sendResponse(ctx, nil, errors.Errorf("unexpected message type %T", req.msg))
continue
}
// Get the peer's address from the request context if it wasn't
// specified in the request message.
replyAddr, err := getPeerListenAddr(req.ctx, msg.Addr)
if err != nil {
req.sendResponse(ctx, nil, errors.Wrapf(err, "failed to parse %q into a peer control address", msg.Addr))
continue
}
resp, err := svc.join(ctx, msg, replyAddr)
req.sendResponse(ctx, resp, err)
if err == nil {
updateNeeded = true
}
}
if updateNeeded {
svc.log.Debug("requesting immediate group update after join(s)")
svc.reqGroupUpdate(ctx, true)
}
return nil
}
// processBatchedMsgRequests processes a batch of requests for a given message type.
func (svc *mgmtSvc) processBatchedMsgRequests(ctx context.Context, bprChan batchProcessRespChan, msgType reflect.Type, reqs []*batchRequest) {
bpr := &batchProcessResp{msgType: msgType}
if len(reqs) == 0 {
select {
case <-ctx.Done():
return
case bprChan <- bpr:
}
return
}
switch reqs[0].msg.(type) {
case *mgmtpb.PoolEvictReq:
bpr.retryableReqs = svc.processBatchPoolEvictions(ctx, bprChan, reqs)
case *mgmtpb.JoinReq:
bpr.retryableReqs = svc.processBatchJoins(ctx, bprChan, reqs)
default:
svc.log.Errorf("no batch handler for message type %s", msgType)
}
select {
case <-ctx.Done():
return
case bprChan <- bpr:
}
}
// processSerialRequest processes a single serialized request.
func (svc *mgmtSvc) processSerialRequest(ctx context.Context, req *batchRequest) {
svc.log.Debugf("invoking serial handler for %T", req.msg)
switch msg := req.msg.(type) {
case *mgmtpb.PoolCreateReq:
resp, err := svc.poolCreate(ctx, msg)
req.sendResponse(ctx, resp, err)
default:
svc.log.Errorf("no serial handler for message type %T", req.msg)
}
}
// serialReqLoop is the main loop for processing serial requests.
func (svc *mgmtSvc) serialReqLoop(parent context.Context) {
svc.log.Debug("starting serialReqLoop")
for {
select {
case <-parent.Done():
svc.log.Debug("stopped serialReqLoop")
return
case req := <-svc.serialReqs:
svc.processSerialRequest(parent, req)
}
}
}
// batchReqLoop is the main loop for processing batched requests.
func (svc *mgmtSvc) batchReqLoop(parent context.Context) {
batchedMsgReqs := make(map[reflect.Type][]*batchRequest)
batchTimer := time.NewTicker(svc.batchInterval)
defer batchTimer.Stop()
svc.log.Debug("starting batchReqLoop")
for {
select {
case <-parent.Done():
svc.log.Debug("stopped batchReqLoop")
return
case req := <-svc.batchReqs:
msgType := reflect.TypeOf(req.msg)
if _, ok := batchedMsgReqs[msgType]; !ok {
batchedMsgReqs[msgType] = []*batchRequest{}
}
batchedMsgReqs[msgType] = append(batchedMsgReqs[msgType], req)
case <-batchTimer.C:
batchedMsgNr := len(batchedMsgReqs)
if batchedMsgNr == 0 {
continue
}
bprChan := make(batchProcessRespChan, batchedMsgNr)
for msgType, reqs := range batchedMsgReqs {
svc.log.Debugf("processing %d %s requests", len(reqs), msgType)
go svc.processBatchedMsgRequests(parent, bprChan, msgType, reqs)
}
for i := 0; i < batchedMsgNr; i++ {
bpr := <-bprChan
if len(bpr.retryableReqs) > 0 {
batchedMsgReqs[bpr.msgType] = bpr.retryableReqs
} else {
delete(batchedMsgReqs, bpr.msgType)
}
}
}
}
}
// leaderTaskLoop is the main loop for handling MS leader tasks.
func (svc *mgmtSvc) leaderTaskLoop(parent context.Context) {
var groupUpdateNeeded bool
groupUpdateTimer := time.NewTicker(groupUpdateInterval)
defer groupUpdateTimer.Stop()
svc.log.Debug("starting leaderTaskLoop")
for {
select {
case <-parent.Done():
svc.log.Debug("stopped leaderTaskLoop")
return
case immediate := <-svc.groupUpdateReqs:
groupUpdateNeeded = true
if immediate {
if err := svc.doGroupUpdate(parent, true); err != nil {
svc.log.Errorf("immediate GroupUpdate failed: %s", err)
continue
}
groupUpdateNeeded = false
}
case <-groupUpdateTimer.C:
if !groupUpdateNeeded {
continue
}
if err := svc.doGroupUpdate(parent, false); err != nil {
svc.log.Errorf("lazy GroupUpdate failed: %s", err)
continue
}
groupUpdateNeeded = false
}
}
}