-
Notifications
You must be signed in to change notification settings - Fork 20
/
proposer.go
529 lines (469 loc) · 15.8 KB
/
proposer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
package paxos
import (
"fmt"
capn "github.com/glycerine/go-capnproto"
mdbs "github.com/msackman/gomdb/server"
"goshawkdb.io/common"
"goshawkdb.io/server"
msgs "goshawkdb.io/server/capnp"
"goshawkdb.io/server/configuration"
eng "goshawkdb.io/server/txnengine"
"log"
)
type ProposerMode uint8
const (
ProposerActiveVoter ProposerMode = iota
ProposerActiveLearner ProposerMode = iota
ProposerPassiveLearner ProposerMode = iota
proposerTLCSender ProposerMode = iota
)
type Proposer struct {
proposerManager *ProposerManager
mode ProposerMode
txn *eng.Txn
txnId *common.TxnId
acceptors common.RMIds
topology *configuration.Topology
fInc int
currentState proposerStateMachineComponent
proposerAwaitBallots
proposerReceiveOutcomes
proposerAwaitLocallyComplete
proposerReceiveGloballyComplete
proposerAwaitFinished
}
// NB, active just means that in the txn allocs we're active. But if
// we receive outcomes before the txn itself, we do not vote. So you
// can be active, but not a voter.
func NewProposer(pm *ProposerManager, txn *eng.TxnReader, mode ProposerMode, topology *configuration.Topology) *Proposer {
txnCap := txn.Txn
p := &Proposer{
proposerManager: pm,
mode: mode,
txnId: txn.Id,
acceptors: GetAcceptorsFromTxn(txnCap),
topology: topology,
fInc: int(txnCap.FInc()),
}
if mode == ProposerActiveVoter {
p.txn = eng.TxnFromReader(pm.Exe, pm.VarDispatcher, p, pm.RMId, txn)
}
p.init()
return p
}
func ProposerFromData(pm *ProposerManager, txnId *common.TxnId, data []byte, topology *configuration.Topology) (*Proposer, error) {
seg, _, err := capn.ReadFromMemoryZeroCopy(data)
if err != nil {
return nil, err
}
// If we were on disk, then that means we must be locally complete
// and just need to send out TLCs.
state := msgs.ReadRootProposerState(seg)
acceptorsCap := state.Acceptors()
acceptors := make([]common.RMId, acceptorsCap.Len())
for idx := range acceptors {
acceptors[idx] = common.RMId(acceptorsCap.At(idx))
}
// We were on disk. Thus we received outcomes from all
// acceptors. So we don't need to worry about the outcome
// accumulator's fInc, hence just use -1 here.
p := &Proposer{
proposerManager: pm,
mode: proposerTLCSender,
txnId: txnId,
acceptors: acceptors,
topology: topology,
fInc: -1,
}
p.init()
p.allAcceptorsAgreed = true
return p, nil
}
func (p *Proposer) init() {
p.proposerAwaitBallots.init(p)
p.proposerReceiveOutcomes.init(p)
p.proposerAwaitLocallyComplete.init(p)
p.proposerReceiveGloballyComplete.init(p)
p.proposerAwaitFinished.init(p)
}
func (p *Proposer) Start() {
if p.currentState != nil {
return
}
switch p.mode {
case ProposerActiveVoter:
p.currentState = &p.proposerAwaitBallots
case ProposerActiveLearner:
p.currentState = &p.proposerReceiveOutcomes
case ProposerPassiveLearner:
p.currentState = &p.proposerReceiveOutcomes
case proposerTLCSender:
p.currentState = &p.proposerReceiveGloballyComplete
}
if p.topology != nil {
topology := p.topology
p.topology = nil
p.TopologyChange(topology)
}
p.currentState.start()
}
func (p *Proposer) Status(sc *server.StatusConsumer) {
sc.Emit(fmt.Sprintf("Proposer for %v", p.txnId))
sc.Emit(fmt.Sprintf("- Mode: %v", p.mode))
sc.Emit(fmt.Sprintf("- Current state: %v", p.currentState))
sc.Emit("- Outcome Accumulator")
p.outcomeAccumulator.Status(sc.Fork())
sc.Emit(fmt.Sprintf("- Locally Complete? %v", p.locallyCompleted))
if p.txn != nil {
sc.Emit("- Txn")
p.txn.Status(sc.Fork())
}
sc.Join()
}
func (p *Proposer) TopologyChange(topology *configuration.Topology) {
if topology == p.topology {
return
}
p.topology = topology
rmsRemoved := topology.RMsRemoved()
server.Log("proposer", p.txnId, "in", p.currentState, "sees loss of", rmsRemoved)
if _, found := rmsRemoved[p.proposerManager.RMId]; found {
return
}
// create new acceptors slice because the initial slice can be
// shared with proposals.
acceptors := make([]common.RMId, 0, len(p.acceptors))
for _, rmId := range p.acceptors {
if _, found := rmsRemoved[rmId]; !found {
acceptors = append(acceptors, rmId)
}
}
p.acceptors = acceptors
switch p.currentState {
case &p.proposerAwaitBallots, &p.proposerReceiveOutcomes, &p.proposerAwaitLocallyComplete:
if p.outcomeAccumulator.TopologyChange(topology) {
p.allAcceptorsAgree()
}
case &p.proposerReceiveGloballyComplete:
for rmId := range rmsRemoved {
p.TxnGloballyCompleteReceived(rmId)
}
case &p.proposerAwaitFinished:
// do nothing
}
}
type proposerStateMachineComponent interface {
init(*Proposer)
start()
proposerStateMachineComponentWitness()
}
func (p *Proposer) nextState() {
switch p.currentState {
case &p.proposerAwaitBallots:
p.currentState = &p.proposerReceiveOutcomes
case &p.proposerReceiveOutcomes:
p.currentState = &p.proposerAwaitLocallyComplete
case &p.proposerAwaitLocallyComplete:
p.currentState = &p.proposerReceiveGloballyComplete
case &p.proposerReceiveGloballyComplete:
p.currentState = &p.proposerAwaitFinished
case &p.proposerAwaitFinished:
p.currentState = nil
return
}
p.currentState.start()
}
// await ballots
type proposerAwaitBallots struct {
*Proposer
submitter common.RMId
submitterBootCount uint32
}
func (pab *proposerAwaitBallots) init(proposer *Proposer) {
pab.Proposer = proposer
}
func (pab *proposerAwaitBallots) start() {
pab.txn.Start(true)
txnCap := pab.txn.TxnReader.Txn
pab.submitter = common.RMId(txnCap.Submitter())
pab.submitterBootCount = txnCap.SubmitterBootCount()
if pab.txn.Retry {
// We need to observe whether or not the submitter dies. If it
// does die, we should tidy up (abort) asap otherwise we have a
// leak which may never trigger.
pab.proposerManager.AddServerConnectionSubscriber(pab)
}
}
func (pab *proposerAwaitBallots) proposerStateMachineComponentWitness() {}
func (pab *proposerAwaitBallots) String() string {
return "proposerAwaitBallots"
}
func (pab *proposerAwaitBallots) TxnBallotsComplete(ballots ...*eng.Ballot) {
if pab.currentState == pab {
server.Log(pab.txnId, "TxnBallotsComplete callback. Acceptors:", pab.acceptors)
if !pab.allAcceptorsAgreed {
pab.proposerManager.NewPaxosProposals(pab.txn.TxnReader, pab.fInc, ballots, pab.acceptors, pab.proposerManager.RMId, true)
}
pab.nextState()
} else if pab.txn.Retry && pab.currentState == &pab.proposerReceiveOutcomes {
server.Log(pab.txnId, "TxnBallotsComplete (retry) callback with existing proposals")
if !pab.allAcceptorsAgreed {
pab.proposerManager.AddToPaxosProposals(pab.txnId, ballots, pab.proposerManager.RMId)
}
} else if !pab.txn.Retry {
log.Printf("Error: %v TxnBallotsComplete callback invoked in wrong state (%v)\n",
pab.txnId, pab.currentState)
}
}
func (pab *proposerAwaitBallots) Abort() {
if pab.currentState == pab && !pab.allAcceptorsAgreed {
server.Log(pab.txnId, "Proposer Aborting")
txn := pab.txn.TxnReader
alloc := AllocForRMId(txn.Txn, pab.proposerManager.RMId)
ballots := MakeAbortBallots(txn, alloc)
pab.TxnBallotsComplete(ballots...)
}
}
func (pab *proposerAwaitBallots) ConnectedRMs(conns map[common.RMId]Connection) {
if conn, found := conns[pab.submitter]; !found || conn.BootCount() != pab.submitterBootCount {
pab.maybeAbortRetry()
}
}
func (pab *proposerAwaitBallots) ConnectionLost(rmId common.RMId, conns map[common.RMId]Connection) {
if rmId == pab.submitter {
pab.maybeAbortRetry()
}
}
func (pab *proposerAwaitBallots) ConnectionEstablished(rmId common.RMId, conn Connection, conns map[common.RMId]Connection, done func()) {
if rmId == pab.submitter && conn.BootCount() != pab.submitterBootCount {
pab.maybeAbortRetry()
}
done()
}
func (pab *proposerAwaitBallots) maybeAbortRetry() {
pab.proposerManager.Exe.Enqueue(pab.Abort)
}
// receive outcomes
type proposerReceiveOutcomes struct {
*Proposer
outcomeAccumulator *OutcomeAccumulator
outcome *msgs.Outcome
}
func (pro *proposerReceiveOutcomes) init(proposer *Proposer) {
pro.Proposer = proposer
pro.outcomeAccumulator = NewOutcomeAccumulator(pro.fInc, pro.acceptors)
}
func (pro *proposerReceiveOutcomes) start() {
if pro.txn != nil && pro.txn.Retry {
pro.proposerManager.RemoveServerConnectionSubscriber(&pro.proposerAwaitBallots)
}
if pro.outcome != nil {
// we've received enough outcomes already!
pro.nextState()
}
}
func (pro *proposerReceiveOutcomes) proposerStateMachineComponentWitness() {}
func (pro *proposerReceiveOutcomes) String() string {
return "proposerReceiveOutcomes"
}
func (pro *proposerReceiveOutcomes) BallotOutcomeReceived(sender common.RMId, outcome *msgs.Outcome) {
server.Log(pro.txnId, "Ballot outcome received from", sender)
if pro.mode == proposerTLCSender {
// Consensus already reached and we've been to disk. So this
// *must* be a duplicate: safe to ignore.
// Even in the case where it's a retry, we actually don't care
// that we could be receiving this *after* sending a TLC because
// all we need to know is that it aborted, not the details.
return
}
outcome, allAgreed := pro.outcomeAccumulator.BallotOutcomeReceived(sender, outcome)
if allAgreed {
pro.allAcceptorsAgree()
}
if outcome == nil && pro.mode == ProposerPassiveLearner {
if knownAcceptors := pro.outcomeAccumulator.IsAllAborts(); knownAcceptors != nil {
// As a passiveLearner, we started this proposer through
// receiving a commit outcome. However, that has changed, due
// to failures and every outcome we have is for the same
// abort. Therefore we're abandoning this learner, and
// sending TLCs immediately to everyone we've received the
// abort outcome from.
server.Log(pro.txnId, "abandoning learner with all aborts", knownAcceptors)
pro.proposerManager.FinishProposers(pro.txnId)
pro.proposerManager.TxnFinished(pro.txnId)
tlcMsg := MakeTxnLocallyCompleteMsg(pro.txnId)
// We are destroying out state here. Thus even if this msg
// goes missing, if the acceptor sends us further 2Bs then
// we'll send back further TLCs from proposer manager. So the
// use of OSS here is correct.
NewOneShotSender(tlcMsg, pro.proposerManager, knownAcceptors...)
return
}
}
if pro.outcome == nil && outcome != nil {
pro.outcome = outcome
// It's possible that we're an activeVoter, and whilst our vars
// are figuring out their votes, we receive enough ballot
// outcomes from acceptors to determine the overall outcome. We
// should only advance to the next state if we're currently
// waiting for ballot outcomes.
if pro.currentState == pro {
pro.nextState()
} else if pro.currentState == &pro.proposerAwaitBallots && pro.txn.Retry {
// Advance currentState to proposerReceiveOutcomes, the
// start() of which will immediately call nextState() again.
pro.nextState()
}
}
}
// await locally complete
type proposerAwaitLocallyComplete struct {
*Proposer
allAcceptorsAgreed bool
callbackInvoked bool
}
func (palc *proposerAwaitLocallyComplete) init(proposer *Proposer) {
palc.Proposer = proposer
}
func (palc *proposerAwaitLocallyComplete) start() {
server.Log(palc.txnId, "Outcome for txn determined")
if palc.txn == nil && palc.outcome.Which() == msgs.OUTCOME_COMMIT {
// We are a learner (either active or passive), and the result
// has turned out to be a commit.
txn := eng.TxnReaderFromData(palc.outcome.Txn())
pm := palc.proposerManager
palc.txn = eng.TxnFromReader(pm.Exe, pm.VarDispatcher, palc.Proposer, pm.RMId, txn)
palc.txn.Start(false)
}
if palc.txn == nil {
palc.TxnLocallyComplete(palc.txn)
} else {
palc.txn.BallotOutcomeReceived(palc.outcome)
}
}
func (palc *proposerAwaitLocallyComplete) proposerStateMachineComponentWitness() {}
func (palc *proposerAwaitLocallyComplete) String() string {
return "proposerAwaitLocallyComplete"
}
func (palc *proposerAwaitLocallyComplete) TxnLocallyComplete(*eng.Txn) {
if palc.currentState == palc && !palc.callbackInvoked {
server.Log(palc.txnId, "Txn locally completed")
palc.callbackInvoked = true
palc.maybeWriteToDisk()
}
}
func (palc *proposerAwaitLocallyComplete) allAcceptorsAgree() {
if !palc.allAcceptorsAgreed {
palc.allAcceptorsAgreed = true
palc.proposerManager.FinishProposers(palc.txnId)
palc.maybeWriteToDisk()
}
}
func (palc *proposerAwaitLocallyComplete) maybeWriteToDisk() {
if !(palc.currentState == palc && palc.callbackInvoked && palc.allAcceptorsAgreed) {
return
}
stateSeg := capn.NewBuffer(nil)
state := msgs.NewRootProposerState(stateSeg)
acceptorsCap := stateSeg.NewUInt32List(len(palc.acceptors))
state.SetAcceptors(acceptorsCap)
for idx, rmId := range palc.acceptors {
acceptorsCap.Set(idx, uint32(rmId))
}
data := server.SegToBytes(stateSeg)
future := palc.proposerManager.DB.ReadWriteTransaction(false, func(rwtxn *mdbs.RWTxn) interface{} {
rwtxn.Put(palc.proposerManager.DB.Proposers, palc.txnId[:], data, 0)
return true
})
go func() {
if ran, err := future.ResultError(); err != nil {
panic(fmt.Sprintf("Error: %v when writing proposer to disk: %v\n", palc.txnId, err))
} else if ran != nil {
palc.proposerManager.Exe.Enqueue(palc.writeDone)
}
}()
}
func (palc *proposerAwaitLocallyComplete) writeDone() {
if palc.currentState == palc {
palc.nextState()
}
}
// receive globally complete
type proposerReceiveGloballyComplete struct {
*Proposer
tlcSender *RepeatingSender
locallyCompleted bool
}
func (prgc *proposerReceiveGloballyComplete) init(proposer *Proposer) {
prgc.Proposer = proposer
}
func (prgc *proposerReceiveGloballyComplete) start() {
if !prgc.locallyCompleted {
prgc.locallyCompleted = true
prgc.mode = proposerTLCSender
tlcMsg := MakeTxnLocallyCompleteMsg(prgc.txnId)
prgc.tlcSender = NewRepeatingSender(tlcMsg, prgc.acceptors...)
server.Log(prgc.txnId, "Adding TLC Sender to", prgc.acceptors)
prgc.proposerManager.AddServerConnectionSubscriber(prgc.tlcSender)
}
}
func (prgc *proposerReceiveGloballyComplete) proposerStateMachineComponentWitness() {}
func (prgc *proposerReceiveGloballyComplete) String() string {
return "proposerReceiveGloballyComplete"
}
func (prgc *proposerReceiveGloballyComplete) TxnGloballyCompleteReceived(sender common.RMId) {
if prgc.currentState == prgc {
if prgc.outcomeAccumulator.TxnGloballyCompleteReceived(sender) {
prgc.nextState()
}
}
// If currentState != proposerReceiveGloballyComplete then this TGC
// could just be a duplicate from some acceptor that's got bounced.
// But we should not receive any TGC until we've issued TLCs.
if !prgc.locallyCompleted {
log.Printf("Error: %v globally complete received from %v without us issuing locally complete. (%v)\n",
prgc.txnId, sender, prgc.currentState)
}
}
// await finished
type proposerAwaitFinished struct {
*Proposer
}
func (paf *proposerAwaitFinished) init(proposer *Proposer) {
paf.Proposer = proposer
}
func (paf *proposerAwaitFinished) start() {
if paf.txn == nil {
paf.TxnFinished(paf.txn)
} else {
paf.txn.CompletionReceived()
}
}
func (paf *proposerAwaitFinished) proposerStateMachineComponentWitness() {}
func (paf *proposerAwaitFinished) String() string {
return "proposerAwaitFinished"
}
func (paf *proposerAwaitFinished) TxnFinished(*eng.Txn) {
server.Log(paf.txnId, "Txn Finished Callback")
if paf.currentState == paf {
paf.nextState()
future := paf.proposerManager.DB.ReadWriteTransaction(false, func(rwtxn *mdbs.RWTxn) interface{} {
rwtxn.Del(paf.proposerManager.DB.Proposers, paf.txnId[:], nil)
return true
})
go func() {
if ran, err := future.ResultError(); err != nil {
panic(fmt.Sprintf("Error: %v when deleting proposer from disk: %v\n", paf.txnId, err))
} else if ran != nil {
paf.proposerManager.Exe.Enqueue(func() {
paf.proposerManager.RemoveServerConnectionSubscriber(paf.tlcSender)
paf.tlcSender = nil
paf.proposerManager.TxnFinished(paf.txnId)
})
}
}()
} else {
log.Printf("Error: %v TxnFinished callback invoked with proposer in wrong state: %v",
paf.txnId, paf.currentState)
}
}