Skip to content

Commit

Permalink
Fix deadlock in 10-node cluster convergence (dgraph-io#2467)
Browse files Browse the repository at this point in the history
This PR fixes dgraph-io#2286 .

- CheckQuorum was causing us multiple issues. When doing a 5-node Zero cluster
  bootstrap, it would cause a leader to step down when the size of the cluster
  is 2, then causing all the rest of the joins to be blocked indefinitely. It
  would also cause leader step down in a seemingly healthy cluster which is
  processing proposals. CheckQuorum was mandated by raft.ReadOnlyLeaseBased,
  which is a less safe option to do linearizable reads. Switch ReadOnlyOption
  back to raft.ReadOnlySafe. Moreover, we don't need to do quorum based lin
  reads in the Alpha servers, because of the switch to proposing and then
  applying transaction updates.
- raft.ReadIndex is not working for some reason. So, commented out its usage in
  Zero (and removed it from Alpha permanently). Needs to be fixed when the
  following issue is resolved. etcd-io/etcd#9893
- The logic to do lin reads was replicated in both Zero and Alpha. Refactor that
  into one place in conn/node.go.
- Retry conf change proposals if they timeout. This mechanism is similar to the
  one introduced for normal proposals in a previous commit 06ea4c.
- Use a lock to only allow one JoinCluster call at a time. Block JoinCluster
  until node.AddToCluster is successful (or return the error).
- Set raft library to 3.2.23. Before upgrade, we were at 3.2.6.

Commit log:

* Trying to understand why JoinCluster doesn't work properly.
* Fucking works. Fucking works.
* It all works now.
* More Dgraph servers. Found a new issue where requesting read quorum doesn't respond.
* Refactor wait lin read code and move it to conn/node.go
* Remove lin read wait for server, because txn timestamp should be sufficient for waiting. Also, for the time being, comment out lin read wait from Zero as well.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent cfc323b commit 765497e
Show file tree
Hide file tree
Showing 17 changed files with 450 additions and 666 deletions.
201 changes: 163 additions & 38 deletions conn/node.go
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgo/protos/api"
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
Expand All @@ -37,6 +38,11 @@ type sendmsg struct {
type Node struct {
x.SafeMutex

joinLock sync.Mutex

// Used to keep track of lin read requests.
requestCh chan linReadReq

// SafeMutex is for fields which can be changed after init.
_confState *raftpb.ConfState
_raft raft.Node
Expand Down Expand Up @@ -88,27 +94,30 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
MaxSizePerMsg: 256 << 10,
MaxInflightMsgs: 256,
Logger: &raft.DefaultLogger{Logger: x.Logger},
// We use lease-based linearizable ReadIndex for performance, at the cost of
// correctness. With it, communication goes follower->leader->follower, instead of
// follower->leader->majority_of_followers->leader->follower. We lose correctness
// because the Raft ticker might not arrive promptly, in which case the leader would
// falsely believe that its lease is still good.
CheckQuorum: true,
ReadOnlyOption: raft.ReadOnlyLeaseBased,
// We don't need lease based reads. They cause issues because they require CheckQuorum
// to be true, and that causes a lot of issues for us during cluster bootstrapping and
// later. A seemingly healthy cluster would just cause leader to step down due to
// "inactive" quorum, and then disallow anyone from becoming leader. So, let's stick to
// default options. Let's achieve correctness, then we achieve performance. Plus, for
// the Dgraph servers, we'll be soon relying only on Timestamps for blocking reads and
// achieving linearizability, than checking quorums (Zero would still check quorums).
ReadOnlyOption: raft.ReadOnlySafe,
},
// processConfChange etc are not throttled so some extra delta, so that we don't
// block tick when applyCh is full
peers: make(map[uint64]string),
confChanges: make(map[uint64]chan error),
RaftContext: rc,
messages: make(chan sendmsg, 100),
Applied: x.WaterMark{Name: fmt.Sprintf("Applied watermark")},
RaftContext: rc,
Rand: rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
confChanges: make(map[uint64]chan error),
messages: make(chan sendmsg, 100),
peers: make(map[uint64]string),
requestCh: make(chan linReadReq),
}
n.Applied.Init()
// TODO: n_ = n is a hack. We should properly init node, and make it part of the server struct.
// This can happen once we get rid of groups.
n_ = n

return n
}

Expand Down Expand Up @@ -375,6 +384,34 @@ func (n *Node) DeletePeer(pid uint64) {
delete(n.peers, pid)
}

var errInternalRetry = errors.New("Retry proposal again")

func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) error {
cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

ch := make(chan error, 1)
id := n.storeConfChange(ch)
// TODO: Delete id from the map.
pb.ID = id
if err := n.Raft().ProposeConfChange(cctx, pb); err != nil {
if cctx.Err() != nil {
return errInternalRetry
}
x.Printf("Error while proposing conf change: %v", err)
return err
}
select {
case err := <-ch:
return err
case <-ctx.Done():
return ctx.Err()
case <-cctx.Done():
return errInternalRetry
}
return nil
}

func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
addr, ok := n.Peer(pid)
x.AssertTruef(ok, "Unable to find conn pool for peer: %d", pid)
Expand All @@ -386,18 +423,17 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
rcBytes, err := rc.Marshal()
x.Check(err)

ch := make(chan error, 1)
id := n.storeConfChange(ch)
err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
ID: id,
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: pid,
Context: rcBytes,
})
if err != nil {
return err
}
err = <-ch
err = errInternalRetry
for err == errInternalRetry {
x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr)
x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState())
err = n.proposeConfChange(ctx, cc)
}
return err
}

Expand All @@ -408,20 +444,111 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
return x.Errorf("Node %d not part of group", id)
}
ch := make(chan error, 1)
pid := n.storeConfChange(ch)
err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
ID: pid,
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
})
if err != nil {
return err
}
err = <-ch
err := errInternalRetry
for err == errInternalRetry {
err = n.proposeConfChange(ctx, cc)
}
return err
}

type linReadReq struct {
// A one-shot chan which we send a raft index upon
indexCh chan<- uint64
}

var errReadIndex = x.Errorf("cannot get linearized read (time expired or no configured leader)")

func (n *Node) WaitLinearizableRead(ctx context.Context) error {
indexCh := make(chan uint64, 1)
select {
case n.requestCh <- linReadReq{indexCh: indexCh}:
case <-ctx.Done():
return ctx.Err()
}

select {
case index := <-indexCh:
if index == 0 {
return errReadIndex
}
return n.Applied.WaitForMark(ctx, index)
case <-ctx.Done():
return ctx.Err()
}
}

func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadState) {
defer closer.Done()
readIndex := func() (uint64, error) {
// Read Request can get rejected then we would wait idefinitely on the channel
// so have a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

activeRctx := make([]byte, 8)
x.Check2(n.Rand.Read(activeRctx[:]))
if err := n.Raft().ReadIndex(ctx, activeRctx[:]); err != nil {
x.Errorf("Error while trying to call ReadIndex: %v\n", err)
return 0, err
}

again:
select {
case <-closer.HasBeenClosed():
return 0, errors.New("closer has been called")
case rs := <-readStateCh:
if !bytes.Equal(activeRctx[:], rs.RequestCtx) {
goto again
}
return rs.Index, nil
case <-ctx.Done():
x.Errorf("[%d] Read index context timed out\n")
return 0, errInternalRetry
}
}

// We maintain one linearizable ReadIndex request at a time. Others wait queued behind
// requestCh.
requests := []linReadReq{}
for {
select {
case <-closer.HasBeenClosed():
return
case rs := <-readStateCh:
// Do nothing, discard ReadState as we don't have any pending ReadIndex requests.
x.Errorf("Received a read state unexpectedly: %+v\n", rs)
case req := <-n.requestCh:
slurpLoop:
for {
requests = append(requests, req)
select {
case req = <-n.requestCh:
default:
break slurpLoop
}
}
for {
index, err := readIndex()
if err == errInternalRetry {
continue
}
if err != nil {
index = 0
x.Errorf("[%d] While trying to do lin read index: %v", n.Id, err)
}
for _, req := range requests {
req.indexCh <- index
}
}
requests = requests[:0]
}
}
}

// TODO: Get rid of this in the upcoming changes.
var n_ *Node

Expand Down Expand Up @@ -466,6 +593,10 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
if node == nil || node.Raft() == nil {
return nil, errNoNode
}
// Only process one JoinCluster request at a time.
node.joinLock.Lock()
defer node.joinLock.Unlock()

// Check that the new node is from the same group as me.
if rc.Group != node.RaftContext.Group {
return nil, x.Errorf("Raft group mismatch")
Expand All @@ -474,25 +605,19 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
if rc.Id == node.RaftContext.Id {
return nil, ErrDuplicateRaftId
}

// Check that the new node is not already part of the group.
if addr, ok := node.peers[rc.Id]; ok && rc.Addr != addr {
Get().Connect(addr)
if addr, ok := node.Peer(rc.Id); ok && rc.Addr != addr {
// There exists a healthy connection to server with same id.
if _, err := Get().Get(addr); err == nil {
return &api.Payload{}, ErrDuplicateRaftId
}
}
node.Connect(rc.Id, rc.Addr)

c := make(chan error, 1)
go func() { c <- node.AddToCluster(ctx, rc.Id) }()

select {
case <-ctx.Done():
return &api.Payload{}, ctx.Err()
case err := <-c:
return &api.Payload{}, err
}
err := node.AddToCluster(context.Background(), rc.Id)
x.Printf("[%d] Done joining cluster with err: %v", rc.Id, err)
return &api.Payload{}, err
}

var (
Expand Down
4 changes: 2 additions & 2 deletions conn/pool.go
Expand Up @@ -105,7 +105,7 @@ func (p *Pools) Connect(addr string) *Pool {
p.Unlock()
return existingPool
}
x.Printf("== CONNECT ==> Setting %v\n", addr)
x.Printf("== CONNECTED ==> Setting %v\n", addr)
p.all[addr] = pool
p.Unlock()
return pool
Expand All @@ -117,7 +117,7 @@ func NewPool(addr string) (*Pool, error) {
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(x.GrpcMaxSize),
grpc.MaxCallSendMsgSize(x.GrpcMaxSize)),
grpc.WithBackoffMaxDelay(10*time.Second),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithInsecure())
if err != nil {
return nil, err
Expand Down

0 comments on commit 765497e

Please sign in to comment.