Permalink
Browse files

Fix various deadlocks in Dgraph (#2548)

Fixed a bunch of long-standing deadlock issues:

1. Deadlock caused by recursive locking in posting/list.go in an internal function, which was causing `applyCh` to block when applying a mutation on a posting list with a read from a query.
2. Deadlock caused by loss of Raft ConfState during a restart of a node. We were not picking up the previous ConfState, hence it was set by default to nil in the next CreateSnapshot. Now we pick up the state, and ensure that the snapshot has a valid ConfState. This basically caused a node to see an empty group, and never participate in elections.
3. Fix #2541 -- A Tick missed to fire, caused due to the repeated calling of `raft Storage.FirstIndex()`. This was causing Badger to create an iterator every time, which was expensive. Now we cache the first index, to avoid repeatedly looking it up.

Also introduced golang/glog library for better logging.
  • Loading branch information...
manishrjain committed Aug 24, 2018
1 parent c76a290 commit 87790666d254f05e27f54c4647a0f8c869bbee59
Showing with 90 additions and 22 deletions.
  1. +12 −10 conn/node.go
  2. +8 −3 contrib/integration/acctupsert/main.go
  3. +4 −0 dgraph/cmd/root.go
  4. +8 −0 edgraph/server.go
  5. +3 −2 posting/list.go
  6. +29 −4 raftwal/storage.go
  7. +25 −3 worker/draft.go
  8. +1 −0 x/lock.go
@@ -23,6 +23,7 @@ import (
"github.com/dgraph-io/dgraph/protos/intern"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"golang.org/x/net/context"
)

@@ -124,9 +125,9 @@ func (n *Node) Raft() raft.Node {

// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
glog.Infof("Setting conf state to %+v\n", cs)
n.Lock()
defer n.Unlock()
x.Printf("Setting conf state to %+v\n", cs)
n._confState = cs
}

@@ -215,7 +216,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptySnap(sp) {
x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata)
glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
restart = true
idx = sp.Metadata.Index
}
@@ -226,7 +227,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
return
}
if !raft.IsEmptyHardState(hd) {
x.Printf("Found hardstate: %+v\n", hd)
glog.Infof("Found hardstate: %+v\n", hd)
restart = true
}

@@ -235,7 +236,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
if rerr != nil {
return
}
x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num)
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
// We'll always have at least one entry.
if num > 1 {
restart = true
@@ -292,7 +293,7 @@ func (n *Node) BatchAndSendMessages() {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
glog.Warningf("No healthy connection to node Id: %d addr: [%s], err: %v\n",
to, addr, err)
failedConn[to] = true
}
@@ -325,7 +326,8 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {
go func() {
_, err := c.RaftMessage(ctx, batch)
if err != nil {
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
glog.Warningf("Error while sending message to node with addr: %s, err: %v\n",
pool.Addr, err)
}
ch <- err
}()
@@ -356,7 +358,7 @@ func (n *Node) Connect(pid uint64, addr string) {
// a nil *pool.
if addr == n.MyAddr {
// TODO: Note this fact in more general peer health info somehow.
x.Printf("Peer %d claims same host as me\n", pid)
glog.Infof("Peer %d claims same host as me\n", pid)
n.SetPeer(pid, addr)
return
}
@@ -387,7 +389,7 @@ func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) erro
if cctx.Err() != nil {
return errInternalRetry
}
x.Printf("Error while proposing conf change: %v", err)
glog.Warningf("Error while proposing conf change: %v", err)
return err
}
select {
@@ -419,8 +421,8 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
}
err = errInternalRetry
for err == errInternalRetry {
x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr)
x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState())
glog.Infof("Trying to add %d to cluster. Addr: %v\n", pid, addr)
glog.Infof("Current confstate at %d: %+v\n", n.Id, n.ConfState())
err = n.proposeConfChange(ctx, cc)
}
return err
@@ -12,6 +12,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"sync/atomic"
@@ -33,6 +34,7 @@ var (
firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"}
lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"}
ages = []int{20, 25, 30, 35}
types = []string{"CEO", "COO", "CTO", "CFO"}
)

type account struct {
@@ -139,6 +141,7 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
{
get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) {
uid
expand(_all_) {uid}
}
}
`, acc.first, acc.last, acc.age)
@@ -153,6 +156,8 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
x.Check(json.Unmarshal(resp.GetJson(), &decode))

x.AssertTrue(len(decode.Get) <= 1)
t := rand.Intn(len(types))

var uid string
if len(decode.Get) == 1 {
x.AssertTrue(decode.Get[0].Uid != nil)
@@ -162,8 +167,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
_:acct <first> %q .
_:acct <last> %q .
_:acct <age> "%d"^^<xs:int> .
`,
acc.first, acc.last, acc.age,
_:acct <%s> "" .
`,
acc.first, acc.last, acc.age, types[t],
)
mu := &api.Mutation{SetNquads: []byte(nqs)}
assigned, err := txn.Mutate(ctx, mu)
@@ -172,7 +178,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
}
uid = assigned.GetUids()["acct"]
x.AssertTrue(uid != "")

}

nq := fmt.Sprintf(`
@@ -8,6 +8,7 @@
package cmd

import (
goflag "flag"
"fmt"
"os"

@@ -19,6 +20,7 @@ import (
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/x"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
)

@@ -40,6 +42,7 @@ cluster.
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
goflag.Parse()
if err := RootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
@@ -61,6 +64,7 @@ func init() {
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)

var subcommands = []*x.SubCommand{
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,
@@ -35,6 +35,7 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"github.com/pkg/errors"
)

@@ -235,6 +236,10 @@ func (s *ServerState) getTimestamp() uint64 {
}

func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
if glog.V(2) {
glog.Infof("Received ALTER op: %+v", op)
defer glog.Infof("ALTER op: %+v done", op)
}
if op.Schema == "" && op.DropAttr == "" && !op.DropAll {
// Must have at least one field set. This helps users if they attempt
// to set a field but use the wrong name (could be decoded from JSON).
@@ -397,6 +402,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
}
if err := x.HealthCheck(); err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Request rejected %v", err)
@@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
// If list consists of one or more languages, first available value is returned; if no language
// from list match the values, processing is the same as for empty list.
func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) {
l.RLock() // All public methods should acquire locks, while private ones should assert them.
defer l.RUnlock()
p, err := l.postingFor(readTs, langs)
if err != nil {
return rval, err
@@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err
}

func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) {
l.RLock()
defer l.RUnlock()
l.AssertRLock() // Avoid recursive locking by asserting a lock here.
return l.postingForLangs(readTs, langs)
}

@@ -17,6 +17,7 @@ import (
"github.com/coreos/etcd/raft"
pb "github.com/coreos/etcd/raft/raftpb"
"github.com/dgraph-io/badger"
"github.com/golang/glog"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/x"
@@ -65,13 +66,27 @@ func (u *txnUnifier) Cancel() {

type localCache struct {
sync.RWMutex
snap pb.Snapshot
firstIndex uint64
snap pb.Snapshot
}

func (c *localCache) setFirst(first uint64) {
c.Lock()
defer c.Unlock()
c.firstIndex = first
}

func (c *localCache) first() uint64 {
c.RLock()
defer c.RUnlock()
return c.firstIndex
}

func (c *localCache) setSnapshot(s pb.Snapshot) {
c.Lock()
defer c.Unlock()
c.snap = s
c.firstIndex = 0 // Reset firstIndex.
}

func (c *localCache) snapshot() pb.Snapshot {
@@ -240,7 +255,16 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {
if !raft.IsEmptySnap(snap) {
return snap.Metadata.Index + 1, nil
}
if first := w.cache.first(); first > 0 {
return first, nil
}
index, err := w.seekEntry(nil, 0, false)
if err == nil {
glog.V(2).Infof("Setting first index: %d", index+1)
w.cache.setFirst(index + 1)
} else {
glog.Errorf("While seekEntry. Error: %v", err)
}
return index + 1, err
}

@@ -549,11 +573,13 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
return err
}
if i < first {
glog.Errorf("i=%d<first=%d, ErrSnapOutOfDate", i, first)
return raft.ErrSnapOutOfDate
}

@@ -568,9 +594,8 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
var snap pb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
if cs != nil {
snap.Metadata.ConfState = *cs
}
x.AssertTrue(cs != nil)
snap.Metadata.ConfState = *cs
snap.Data = data

u := w.newUnifier()
@@ -19,6 +19,7 @@ import (

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/net/trace"

@@ -123,6 +124,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er
if n.Raft() == nil {
return x.Errorf("Raft isn't initialized yet")
}

// TODO: Should be based on number of edges (amount of work)
pendingProposals <- struct{}{}
x.PendingProposals.Add(1)
@@ -406,12 +408,23 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
return n.commitOrAbort(proposal.Key, proposal.Delta)

} else if proposal.Snapshot != nil {
existing, err := n.Store.Snapshot()
if err != nil {
return err
}
snap := proposal.Snapshot
if existing.Metadata.Index >= snap.Index {
log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d",
snap.Index, existing.Metadata.Index)
n.elog.Printf(log)
glog.Info(log)
return nil
}
n.elog.Printf("Creating snapshot: %+v", snap)
x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
data, err := snap.Marshal()
x.Check(err)
// We can now discard all invalid versions of keys below this ts.
// We can now discard all invalid versions of keys below this ts.
pstore.SetDiscardTs(snap.ReadTs)
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)

@@ -571,6 +584,7 @@ func (n *node) Run() {
done := make(chan struct{})
go func() {
<-n.closer.HasBeenClosed()
glog.Infof("Stopping node.Run")
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
time.Sleep(time.Second) // Let transfer happen.
@@ -950,6 +964,13 @@ func (n *node) InitAndStartNode() {
sp, err := n.Store.Snapshot()
x.Checkf(err, "Unable to get existing snapshot")
if !raft.IsEmptySnap(sp) {
// It is important that we pick up the conf state here.
// Otherwise, we'll lose the store conf state, and it would get
// overwritten with an empty state when a new snapshot is taken.
// This causes a node to just hang on restart, because it finds a
// zero-member Raft group.
n.SetConfState(&sp.Metadata.ConfState)

members := groups().members(n.gid)
for _, id := range sp.Metadata.ConfState.Nodes {
m, ok := members[id]
@@ -959,8 +980,9 @@ func (n *node) InitAndStartNode() {
}
}
n.SetRaft(raft.RestartNode(n.Cfg))
glog.V(2).Infoln("Restart node complete")
} else {
x.Printf("New Node for group: %d\n", n.gid)
glog.Infof("New Node for group: %d\n", n.gid)
if _, hasPeer := groups().MyPeer(); hasPeer {
// Get snapshot before joining peers as it can take time to retrieve it and we dont
// want the quorum to be inactive when it happens.
@@ -14,6 +14,7 @@ import (

// SafeMutex can be used in place of sync.RWMutex
type SafeMutex struct {
// m deadlock.RWMutex // Very useful for detecting locking issues.
m sync.RWMutex
wait *SafeWait
writer int32

0 comments on commit 8779066

Please sign in to comment.