Skip to content

Commit 8779066

Browse files
authored
Fix various deadlocks in Dgraph (#2548)
Fixed a bunch of long-standing deadlock issues: 1. Deadlock caused by recursive locking in posting/list.go in an internal function, which was causing `applyCh` to block when applying a mutation on a posting list with a read from a query. 2. Deadlock caused by loss of Raft ConfState during a restart of a node. We were not picking up the previous ConfState, hence it was set by default to nil in the next CreateSnapshot. Now we pick up the state, and ensure that the snapshot has a valid ConfState. This basically caused a node to see an empty group, and never participate in elections. 3. Fix #2541 -- A Tick missed to fire, caused due to the repeated calling of `raft Storage.FirstIndex()`. This was causing Badger to create an iterator every time, which was expensive. Now we cache the first index, to avoid repeatedly looking it up. Also introduced golang/glog library for better logging.
1 parent c76a290 commit 8779066

File tree

8 files changed

+90
-22
lines changed

8 files changed

+90
-22
lines changed

conn/node.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/dgraph-io/dgraph/protos/intern"
2424
"github.com/dgraph-io/dgraph/raftwal"
2525
"github.com/dgraph-io/dgraph/x"
26+
"github.com/golang/glog"
2627
"golang.org/x/net/context"
2728
)
2829

@@ -124,9 +125,9 @@ func (n *Node) Raft() raft.Node {
124125

125126
// SetConfState would store the latest ConfState generated by ApplyConfChange.
126127
func (n *Node) SetConfState(cs *raftpb.ConfState) {
128+
glog.Infof("Setting conf state to %+v\n", cs)
127129
n.Lock()
128130
defer n.Unlock()
129-
x.Printf("Setting conf state to %+v\n", cs)
130131
n._confState = cs
131132
}
132133

@@ -215,7 +216,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
215216
return
216217
}
217218
if !raft.IsEmptySnap(sp) {
218-
x.Printf("Found Snapshot, Metadata: %+v\n", sp.Metadata)
219+
glog.Infof("Found Snapshot.Metadata: %+v\n", sp.Metadata)
219220
restart = true
220221
idx = sp.Metadata.Index
221222
}
@@ -226,7 +227,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
226227
return
227228
}
228229
if !raft.IsEmptyHardState(hd) {
229-
x.Printf("Found hardstate: %+v\n", hd)
230+
glog.Infof("Found hardstate: %+v\n", hd)
230231
restart = true
231232
}
232233

@@ -235,7 +236,7 @@ func (n *Node) PastLife() (idx uint64, restart bool, rerr error) {
235236
if rerr != nil {
236237
return
237238
}
238-
x.Printf("Group %d found %d entries\n", n.RaftContext.Group, num)
239+
glog.Infof("Group %d found %d entries\n", n.RaftContext.Group, num)
239240
// We'll always have at least one entry.
240241
if num > 1 {
241242
restart = true
@@ -292,7 +293,7 @@ func (n *Node) BatchAndSendMessages() {
292293
if exists := failedConn[to]; !exists {
293294
// So that we print error only the first time we are not able to connect.
294295
// Otherwise, the log is polluted with multiple errors.
295-
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
296+
glog.Warningf("No healthy connection to node Id: %d addr: [%s], err: %v\n",
296297
to, addr, err)
297298
failedConn[to] = true
298299
}
@@ -325,7 +326,8 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {
325326
go func() {
326327
_, err := c.RaftMessage(ctx, batch)
327328
if err != nil {
328-
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
329+
glog.Warningf("Error while sending message to node with addr: %s, err: %v\n",
330+
pool.Addr, err)
329331
}
330332
ch <- err
331333
}()
@@ -356,7 +358,7 @@ func (n *Node) Connect(pid uint64, addr string) {
356358
// a nil *pool.
357359
if addr == n.MyAddr {
358360
// TODO: Note this fact in more general peer health info somehow.
359-
x.Printf("Peer %d claims same host as me\n", pid)
361+
glog.Infof("Peer %d claims same host as me\n", pid)
360362
n.SetPeer(pid, addr)
361363
return
362364
}
@@ -387,7 +389,7 @@ func (n *Node) proposeConfChange(ctx context.Context, pb raftpb.ConfChange) erro
387389
if cctx.Err() != nil {
388390
return errInternalRetry
389391
}
390-
x.Printf("Error while proposing conf change: %v", err)
392+
glog.Warningf("Error while proposing conf change: %v", err)
391393
return err
392394
}
393395
select {
@@ -419,8 +421,8 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
419421
}
420422
err = errInternalRetry
421423
for err == errInternalRetry {
422-
x.Printf("Trying to add %d to cluster. Addr: %v\n", pid, addr)
423-
x.Printf("Current confstate at %d: %+v\n", n.Id, n.ConfState())
424+
glog.Infof("Trying to add %d to cluster. Addr: %v\n", pid, addr)
425+
glog.Infof("Current confstate at %d: %+v\n", n.Id, n.ConfState())
424426
err = n.proposeConfChange(ctx, cc)
425427
}
426428
return err

contrib/integration/acctupsert/main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"encoding/json"
1313
"flag"
1414
"fmt"
15+
"math/rand"
1516
"strings"
1617
"sync"
1718
"sync/atomic"
@@ -33,6 +34,7 @@ var (
3334
firsts = []string{"Paul", "Eric", "Jack", "John", "Martin"}
3435
lasts = []string{"Brown", "Smith", "Robinson", "Waters", "Taylor"}
3536
ages = []int{20, 25, 30, 35}
37+
types = []string{"CEO", "COO", "CTO", "CFO"}
3638
)
3739

3840
type account struct {
@@ -139,6 +141,7 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
139141
{
140142
get(func: eq(first, %q)) @filter(eq(last, %q) AND eq(age, %d)) {
141143
uid
144+
expand(_all_) {uid}
142145
}
143146
}
144147
`, acc.first, acc.last, acc.age)
@@ -153,6 +156,8 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
153156
x.Check(json.Unmarshal(resp.GetJson(), &decode))
154157

155158
x.AssertTrue(len(decode.Get) <= 1)
159+
t := rand.Intn(len(types))
160+
156161
var uid string
157162
if len(decode.Get) == 1 {
158163
x.AssertTrue(decode.Get[0].Uid != nil)
@@ -162,8 +167,9 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
162167
_:acct <first> %q .
163168
_:acct <last> %q .
164169
_:acct <age> "%d"^^<xs:int> .
165-
`,
166-
acc.first, acc.last, acc.age,
170+
_:acct <%s> "" .
171+
`,
172+
acc.first, acc.last, acc.age, types[t],
167173
)
168174
mu := &api.Mutation{SetNquads: []byte(nqs)}
169175
assigned, err := txn.Mutate(ctx, mu)
@@ -172,7 +178,6 @@ func tryUpsert(c *dgo.Dgraph, acc account) error {
172178
}
173179
uid = assigned.GetUids()["acct"]
174180
x.AssertTrue(uid != "")
175-
176181
}
177182

178183
nq := fmt.Sprintf(`

dgraph/cmd/root.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package cmd
99

1010
import (
11+
goflag "flag"
1112
"fmt"
1213
"os"
1314

@@ -19,6 +20,7 @@ import (
1920
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
2021
"github.com/dgraph-io/dgraph/x"
2122
"github.com/spf13/cobra"
23+
flag "github.com/spf13/pflag"
2224
"github.com/spf13/viper"
2325
)
2426

@@ -40,6 +42,7 @@ cluster.
4042
// Execute adds all child commands to the root command and sets flags appropriately.
4143
// This is called by main.main(). It only needs to happen once to the rootCmd.
4244
func Execute() {
45+
goflag.Parse()
4346
if err := RootCmd.Execute(); err != nil {
4447
fmt.Println(err)
4548
os.Exit(1)
@@ -61,6 +64,7 @@ func init() {
6164
RootCmd.PersistentFlags().Bool("expose_trace", false,
6265
"Allow trace endpoint to be accessible from remote")
6366
rootConf.BindPFlags(RootCmd.PersistentFlags())
67+
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
6468

6569
var subcommands = []*x.SubCommand{
6670
&bulk.Bulk, &live.Live, &server.Server, &zero.Zero, &version.Version, &debug.Debug,

edgraph/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/dgraph-io/dgraph/types/facets"
3636
"github.com/dgraph-io/dgraph/worker"
3737
"github.com/dgraph-io/dgraph/x"
38+
"github.com/golang/glog"
3839
"github.com/pkg/errors"
3940
)
4041

@@ -235,6 +236,10 @@ func (s *ServerState) getTimestamp() uint64 {
235236
}
236237

237238
func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, error) {
239+
if glog.V(2) {
240+
glog.Infof("Received ALTER op: %+v", op)
241+
defer glog.Infof("ALTER op: %+v done", op)
242+
}
238243
if op.Schema == "" && op.DropAttr == "" && !op.DropAll {
239244
// Must have at least one field set. This helps users if they attempt
240245
// to set a field but use the wrong name (could be decoded from JSON).
@@ -397,6 +402,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign
397402
// This method is used to execute the query and return the response to the
398403
// client as a protocol buffer message.
399404
func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
405+
if glog.V(3) {
406+
glog.Infof("Got a query: %+v", req)
407+
}
400408
if err := x.HealthCheck(); err != nil {
401409
if tr, ok := trace.FromContext(ctx); ok {
402410
tr.LazyPrintf("Request rejected %v", err)

posting/list.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,8 @@ func (l *List) Value(readTs uint64) (rval types.Val, rerr error) {
914914
// If list consists of one or more languages, first available value is returned; if no language
915915
// from list match the values, processing is the same as for empty list.
916916
func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr error) {
917+
l.RLock() // All public methods should acquire locks, while private ones should assert them.
918+
defer l.RUnlock()
917919
p, err := l.postingFor(readTs, langs)
918920
if err != nil {
919921
return rval, err
@@ -922,8 +924,7 @@ func (l *List) ValueFor(readTs uint64, langs []string) (rval types.Val, rerr err
922924
}
923925

924926
func (l *List) postingFor(readTs uint64, langs []string) (p *intern.Posting, rerr error) {
925-
l.RLock()
926-
defer l.RUnlock()
927+
l.AssertRLock() // Avoid recursive locking by asserting a lock here.
927928
return l.postingForLangs(readTs, langs)
928929
}
929930

raftwal/storage.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/coreos/etcd/raft"
1818
pb "github.com/coreos/etcd/raft/raftpb"
1919
"github.com/dgraph-io/badger"
20+
"github.com/golang/glog"
2021
"golang.org/x/net/trace"
2122

2223
"github.com/dgraph-io/dgraph/x"
@@ -65,13 +66,27 @@ func (u *txnUnifier) Cancel() {
6566

6667
type localCache struct {
6768
sync.RWMutex
68-
snap pb.Snapshot
69+
firstIndex uint64
70+
snap pb.Snapshot
71+
}
72+
73+
func (c *localCache) setFirst(first uint64) {
74+
c.Lock()
75+
defer c.Unlock()
76+
c.firstIndex = first
77+
}
78+
79+
func (c *localCache) first() uint64 {
80+
c.RLock()
81+
defer c.RUnlock()
82+
return c.firstIndex
6983
}
7084

7185
func (c *localCache) setSnapshot(s pb.Snapshot) {
7286
c.Lock()
7387
defer c.Unlock()
7488
c.snap = s
89+
c.firstIndex = 0 // Reset firstIndex.
7590
}
7691

7792
func (c *localCache) snapshot() pb.Snapshot {
@@ -240,7 +255,16 @@ func (w *DiskStorage) FirstIndex() (uint64, error) {
240255
if !raft.IsEmptySnap(snap) {
241256
return snap.Metadata.Index + 1, nil
242257
}
258+
if first := w.cache.first(); first > 0 {
259+
return first, nil
260+
}
243261
index, err := w.seekEntry(nil, 0, false)
262+
if err == nil {
263+
glog.V(2).Infof("Setting first index: %d", index+1)
264+
w.cache.setFirst(index + 1)
265+
} else {
266+
glog.Errorf("While seekEntry. Error: %v", err)
267+
}
244268
return index + 1, err
245269
}
246270

@@ -549,11 +573,13 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
549573
}
550574

551575
func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
576+
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
552577
first, err := w.FirstIndex()
553578
if err != nil {
554579
return err
555580
}
556581
if i < first {
582+
glog.Errorf("i=%d<first=%d, ErrSnapOutOfDate", i, first)
557583
return raft.ErrSnapOutOfDate
558584
}
559585

@@ -568,9 +594,8 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
568594
var snap pb.Snapshot
569595
snap.Metadata.Index = i
570596
snap.Metadata.Term = e.Term
571-
if cs != nil {
572-
snap.Metadata.ConfState = *cs
573-
}
597+
x.AssertTrue(cs != nil)
598+
snap.Metadata.ConfState = *cs
574599
snap.Data = data
575600

576601
u := w.newUnifier()

worker/draft.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/coreos/etcd/raft"
2121
"github.com/coreos/etcd/raft/raftpb"
22+
"github.com/golang/glog"
2223
"golang.org/x/net/context"
2324
"golang.org/x/net/trace"
2425

@@ -123,6 +124,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *intern.Proposal) er
123124
if n.Raft() == nil {
124125
return x.Errorf("Raft isn't initialized yet")
125126
}
127+
126128
// TODO: Should be based on number of edges (amount of work)
127129
pendingProposals <- struct{}{}
128130
x.PendingProposals.Add(1)
@@ -406,12 +408,23 @@ func (n *node) applyCommitted(proposal *intern.Proposal, index uint64) error {
406408
return n.commitOrAbort(proposal.Key, proposal.Delta)
407409

408410
} else if proposal.Snapshot != nil {
411+
existing, err := n.Store.Snapshot()
412+
if err != nil {
413+
return err
414+
}
409415
snap := proposal.Snapshot
416+
if existing.Metadata.Index >= snap.Index {
417+
log := fmt.Sprintf("Skipping snapshot at %d, because found one at %d",
418+
snap.Index, existing.Metadata.Index)
419+
n.elog.Printf(log)
420+
glog.Info(log)
421+
return nil
422+
}
410423
n.elog.Printf("Creating snapshot: %+v", snap)
411-
x.Printf("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
424+
glog.Infof("Creating snapshot at index: %d. ReadTs: %d.\n", snap.Index, snap.ReadTs)
412425
data, err := snap.Marshal()
413426
x.Check(err)
414-
// We can now discard all invalid versions of keys below this ts.
427+
// We can now discard all invalid versions of keys below this ts.
415428
pstore.SetDiscardTs(snap.ReadTs)
416429
return n.Store.CreateSnapshot(snap.Index, n.ConfState(), data)
417430

@@ -571,6 +584,7 @@ func (n *node) Run() {
571584
done := make(chan struct{})
572585
go func() {
573586
<-n.closer.HasBeenClosed()
587+
glog.Infof("Stopping node.Run")
574588
if peerId, has := groups().MyPeer(); has && n.AmLeader() {
575589
n.Raft().TransferLeadership(n.ctx, Config.RaftId, peerId)
576590
time.Sleep(time.Second) // Let transfer happen.
@@ -950,6 +964,13 @@ func (n *node) InitAndStartNode() {
950964
sp, err := n.Store.Snapshot()
951965
x.Checkf(err, "Unable to get existing snapshot")
952966
if !raft.IsEmptySnap(sp) {
967+
// It is important that we pick up the conf state here.
968+
// Otherwise, we'll lose the store conf state, and it would get
969+
// overwritten with an empty state when a new snapshot is taken.
970+
// This causes a node to just hang on restart, because it finds a
971+
// zero-member Raft group.
972+
n.SetConfState(&sp.Metadata.ConfState)
973+
953974
members := groups().members(n.gid)
954975
for _, id := range sp.Metadata.ConfState.Nodes {
955976
m, ok := members[id]
@@ -959,8 +980,9 @@ func (n *node) InitAndStartNode() {
959980
}
960981
}
961982
n.SetRaft(raft.RestartNode(n.Cfg))
983+
glog.V(2).Infoln("Restart node complete")
962984
} else {
963-
x.Printf("New Node for group: %d\n", n.gid)
985+
glog.Infof("New Node for group: %d\n", n.gid)
964986
if _, hasPeer := groups().MyPeer(); hasPeer {
965987
// Get snapshot before joining peers as it can take time to retrieve it and we dont
966988
// want the quorum to be inactive when it happens.

x/lock.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
// SafeMutex can be used in place of sync.RWMutex
1616
type SafeMutex struct {
17+
// m deadlock.RWMutex // Very useful for detecting locking issues.
1718
m sync.RWMutex
1819
wait *SafeWait
1920
writer int32

0 commit comments

Comments
 (0)