@@ -18,6 +18,7 @@ import (
18
18
19
19
"github.com/coreos/etcd/raft"
20
20
"github.com/coreos/etcd/raft/raftpb"
21
+ "github.com/dgraph-io/badger/y"
21
22
"github.com/dgraph-io/dgo/protos/api"
22
23
"github.com/dgraph-io/dgraph/protos/intern"
23
24
"github.com/dgraph-io/dgraph/raftwal"
@@ -37,6 +38,11 @@ type sendmsg struct {
37
38
type Node struct {
38
39
x.SafeMutex
39
40
41
+ joinLock sync.Mutex
42
+
43
+ // Used to keep track of lin read requests.
44
+ requestCh chan linReadReq
45
+
40
46
// SafeMutex is for fields which can be changed after init.
41
47
_confState * raftpb.ConfState
42
48
_raft raft.Node
@@ -88,27 +94,30 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
88
94
MaxSizePerMsg : 256 << 10 ,
89
95
MaxInflightMsgs : 256 ,
90
96
Logger : & raft.DefaultLogger {Logger : x .Logger },
91
- // We use lease-based linearizable ReadIndex for performance, at the cost of
92
- // correctness. With it, communication goes follower->leader->follower, instead of
93
- // follower->leader->majority_of_followers->leader->follower. We lose correctness
94
- // because the Raft ticker might not arrive promptly, in which case the leader would
95
- // falsely believe that its lease is still good.
96
- CheckQuorum : true ,
97
- ReadOnlyOption : raft .ReadOnlyLeaseBased ,
97
+ // We don't need lease based reads. They cause issues because they require CheckQuorum
98
+ // to be true, and that causes a lot of issues for us during cluster bootstrapping and
99
+ // later. A seemingly healthy cluster would just cause leader to step down due to
100
+ // "inactive" quorum, and then disallow anyone from becoming leader. So, let's stick to
101
+ // default options. Let's achieve correctness, then we achieve performance. Plus, for
102
+ // the Dgraph servers, we'll be soon relying only on Timestamps for blocking reads and
103
+ // achieving linearizability, than checking quorums (Zero would still check quorums).
104
+ ReadOnlyOption : raft .ReadOnlySafe ,
98
105
},
99
106
// processConfChange etc are not throttled so some extra delta, so that we don't
100
107
// block tick when applyCh is full
101
- peers : make (map [uint64 ]string ),
102
- confChanges : make (map [uint64 ]chan error ),
103
- RaftContext : rc ,
104
- messages : make (chan sendmsg , 100 ),
105
108
Applied : x.WaterMark {Name : fmt .Sprintf ("Applied watermark" )},
109
+ RaftContext : rc ,
106
110
Rand : rand .New (& lockedSource {src : rand .NewSource (time .Now ().UnixNano ())}),
111
+ confChanges : make (map [uint64 ]chan error ),
112
+ messages : make (chan sendmsg , 100 ),
113
+ peers : make (map [uint64 ]string ),
114
+ requestCh : make (chan linReadReq ),
107
115
}
108
116
n .Applied .Init ()
109
117
// TODO: n_ = n is a hack. We should properly init node, and make it part of the server struct.
110
118
// This can happen once we get rid of groups.
111
119
n_ = n
120
+
112
121
return n
113
122
}
114
123
@@ -375,6 +384,34 @@ func (n *Node) DeletePeer(pid uint64) {
375
384
delete (n .peers , pid )
376
385
}
377
386
387
+ var errInternalRetry = errors .New ("Retry proposal again" )
388
+
389
+ func (n * Node ) proposeConfChange (ctx context.Context , pb raftpb.ConfChange ) error {
390
+ cctx , cancel := context .WithTimeout (ctx , 3 * time .Second )
391
+ defer cancel ()
392
+
393
+ ch := make (chan error , 1 )
394
+ id := n .storeConfChange (ch )
395
+ // TODO: Delete id from the map.
396
+ pb .ID = id
397
+ if err := n .Raft ().ProposeConfChange (cctx , pb ); err != nil {
398
+ if cctx .Err () != nil {
399
+ return errInternalRetry
400
+ }
401
+ x .Printf ("Error while proposing conf change: %v" , err )
402
+ return err
403
+ }
404
+ select {
405
+ case err := <- ch :
406
+ return err
407
+ case <- ctx .Done ():
408
+ return ctx .Err ()
409
+ case <- cctx .Done ():
410
+ return errInternalRetry
411
+ }
412
+ return nil
413
+ }
414
+
378
415
func (n * Node ) AddToCluster (ctx context.Context , pid uint64 ) error {
379
416
addr , ok := n .Peer (pid )
380
417
x .AssertTruef (ok , "Unable to find conn pool for peer: %d" , pid )
@@ -386,18 +423,17 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
386
423
rcBytes , err := rc .Marshal ()
387
424
x .Check (err )
388
425
389
- ch := make (chan error , 1 )
390
- id := n .storeConfChange (ch )
391
- err = n .Raft ().ProposeConfChange (ctx , raftpb.ConfChange {
392
- ID : id ,
426
+ cc := raftpb.ConfChange {
393
427
Type : raftpb .ConfChangeAddNode ,
394
428
NodeID : pid ,
395
429
Context : rcBytes ,
396
- })
397
- if err != nil {
398
- return err
399
430
}
400
- err = <- ch
431
+ err = errInternalRetry
432
+ for err == errInternalRetry {
433
+ x .Printf ("Trying to add %d to cluster. Addr: %v\n " , pid , addr )
434
+ x .Printf ("Current confstate at %d: %+v\n " , n .Id , n .ConfState ())
435
+ err = n .proposeConfChange (ctx , cc )
436
+ }
401
437
return err
402
438
}
403
439
@@ -408,20 +444,111 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
408
444
if _ , ok := n .Peer (id ); ! ok && id != n .RaftContext .Id {
409
445
return x .Errorf ("Node %d not part of group" , id )
410
446
}
411
- ch := make (chan error , 1 )
412
- pid := n .storeConfChange (ch )
413
- err := n .Raft ().ProposeConfChange (ctx , raftpb.ConfChange {
414
- ID : pid ,
447
+ cc := raftpb.ConfChange {
415
448
Type : raftpb .ConfChangeRemoveNode ,
416
449
NodeID : id ,
417
- })
418
- if err != nil {
419
- return err
420
450
}
421
- err = <- ch
451
+ err := errInternalRetry
452
+ for err == errInternalRetry {
453
+ err = n .proposeConfChange (ctx , cc )
454
+ }
422
455
return err
423
456
}
424
457
458
+ type linReadReq struct {
459
+ // A one-shot chan which we send a raft index upon
460
+ indexCh chan <- uint64
461
+ }
462
+
463
+ var errReadIndex = x .Errorf ("cannot get linearized read (time expired or no configured leader)" )
464
+
465
+ func (n * Node ) WaitLinearizableRead (ctx context.Context ) error {
466
+ indexCh := make (chan uint64 , 1 )
467
+ select {
468
+ case n .requestCh <- linReadReq {indexCh : indexCh }:
469
+ case <- ctx .Done ():
470
+ return ctx .Err ()
471
+ }
472
+
473
+ select {
474
+ case index := <- indexCh :
475
+ if index == 0 {
476
+ return errReadIndex
477
+ }
478
+ return n .Applied .WaitForMark (ctx , index )
479
+ case <- ctx .Done ():
480
+ return ctx .Err ()
481
+ }
482
+ }
483
+
484
+ func (n * Node ) RunReadIndexLoop (closer * y.Closer , readStateCh <- chan raft.ReadState ) {
485
+ defer closer .Done ()
486
+ readIndex := func () (uint64 , error ) {
487
+ // Read Request can get rejected then we would wait idefinitely on the channel
488
+ // so have a timeout.
489
+ ctx , cancel := context .WithTimeout (context .Background (), 3 * time .Second )
490
+ defer cancel ()
491
+
492
+ activeRctx := make ([]byte , 8 )
493
+ x .Check2 (n .Rand .Read (activeRctx [:]))
494
+ if err := n .Raft ().ReadIndex (ctx , activeRctx [:]); err != nil {
495
+ x .Errorf ("Error while trying to call ReadIndex: %v\n " , err )
496
+ return 0 , err
497
+ }
498
+
499
+ again:
500
+ select {
501
+ case <- closer .HasBeenClosed ():
502
+ return 0 , errors .New ("closer has been called" )
503
+ case rs := <- readStateCh :
504
+ if ! bytes .Equal (activeRctx [:], rs .RequestCtx ) {
505
+ goto again
506
+ }
507
+ return rs .Index , nil
508
+ case <- ctx .Done ():
509
+ x .Errorf ("[%d] Read index context timed out\n " )
510
+ return 0 , errInternalRetry
511
+ }
512
+ }
513
+
514
+ // We maintain one linearizable ReadIndex request at a time. Others wait queued behind
515
+ // requestCh.
516
+ requests := []linReadReq {}
517
+ for {
518
+ select {
519
+ case <- closer .HasBeenClosed ():
520
+ return
521
+ case rs := <- readStateCh :
522
+ // Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
523
+ x .Errorf ("Received a read state unexpectedly: %+v\n " , rs )
524
+ case req := <- n .requestCh :
525
+ slurpLoop:
526
+ for {
527
+ requests = append (requests , req )
528
+ select {
529
+ case req = <- n .requestCh :
530
+ default :
531
+ break slurpLoop
532
+ }
533
+ }
534
+ for {
535
+ index , err := readIndex ()
536
+ if err == errInternalRetry {
537
+ continue
538
+ }
539
+ if err != nil {
540
+ index = 0
541
+ x .Errorf ("[%d] While trying to do lin read index: %v" , n .Id , err )
542
+ }
543
+ for _ , req := range requests {
544
+ req .indexCh <- index
545
+ }
546
+ }
547
+ requests = requests [:0 ]
548
+ }
549
+ }
550
+ }
551
+
425
552
// TODO: Get rid of this in the upcoming changes.
426
553
var n_ * Node
427
554
@@ -466,6 +593,10 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
466
593
if node == nil || node .Raft () == nil {
467
594
return nil , errNoNode
468
595
}
596
+ // Only process one JoinCluster request at a time.
597
+ node .joinLock .Lock ()
598
+ defer node .joinLock .Unlock ()
599
+
469
600
// Check that the new node is from the same group as me.
470
601
if rc .Group != node .RaftContext .Group {
471
602
return nil , x .Errorf ("Raft group mismatch" )
@@ -474,25 +605,19 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
474
605
if rc .Id == node .RaftContext .Id {
475
606
return nil , ErrDuplicateRaftId
476
607
}
608
+
477
609
// Check that the new node is not already part of the group.
478
- if addr , ok := node .peers [rc .Id ]; ok && rc .Addr != addr {
479
- Get ().Connect (addr )
610
+ if addr , ok := node .Peer (rc .Id ); ok && rc .Addr != addr {
480
611
// There exists a healthy connection to server with same id.
481
612
if _ , err := Get ().Get (addr ); err == nil {
482
613
return & api.Payload {}, ErrDuplicateRaftId
483
614
}
484
615
}
485
616
node .Connect (rc .Id , rc .Addr )
486
617
487
- c := make (chan error , 1 )
488
- go func () { c <- node .AddToCluster (ctx , rc .Id ) }()
489
-
490
- select {
491
- case <- ctx .Done ():
492
- return & api.Payload {}, ctx .Err ()
493
- case err := <- c :
494
- return & api.Payload {}, err
495
- }
618
+ err := node .AddToCluster (context .Background (), rc .Id )
619
+ x .Printf ("[%d] Done joining cluster with err: %v" , rc .Id , err )
620
+ return & api.Payload {}, err
496
621
}
497
622
498
623
var (
0 commit comments