Skip to content

Commit 3b6d817

Browse files
authored
Integrate Opencensus and fix Jepsen bank test (#2764)
Got it. Fixed it. Squashed it. Done with it. Tamed it. Time to bask in the glory of victory! Fixed Jepsen bank test violation during a network partition. The violation was happening because: 1. Current Zero leader receives a commit request and assigns a commit timestamp. 2. Gets partitioned out, so the proposal gets blocked. 3. Another Zero becomes the leader and renews the txn timestamp lease, starting at a much higher number (previous lease + lease bandwidth of 10K). 4. The new leader services new txn requests, which advances all Alphas to a much higher MaxApplied timestamp. 5. The previous leader, who is now the follower, retries the old commit proposal and succeeds. This causes 2 issues. a) A later txn read doesn't see this commit, and advances. b) Alpha rejects a write on a posting list key, which already has a new commit at higher ts. Both of the scenarios caused bank txn test violation. Open Census and Dgraph debug disect was instrumental in determining the cause of this violation. The tell-tale sign was noticing a /Commit timeout of one of the penultimate commits, to the violating commit. Fixes: 0. Use OpenCensus as a torch to light every path that a txn took, to determine the cause. 1. Do not allow a context deadline when proposing a txn commit. 2. Do not allow Zero follower to propagate proposals to Zero leader. Tested this PR to fix issue #2391 . Tested with partition-ring and clock-skew nemesis in a 10-node cluster. More testing is needed around predicate moves. Changelog: * Trial: Do not purge anything (needs to be refactored and reintroduced before a release). * More debugging, more tracing for Jepsen. * Opencensus in Zero. * One fix for Jepsen test, which ensures that a context deadline cannot just cancel a txn proposal. Need some refactoring of how Zero gets membership updates, now that we need Zero to not forward proposals to the leader. * Update Raft lib, so we have access to the feature to disallow Raft proposal forwarding. * Update raftpb proto package as well * Dirty changes to ensure that Zero followers don't forward proposals to Zero leader. * Various Opencensus integration changes.
1 parent b720fee commit 3b6d817

File tree

22 files changed

+654
-251
lines changed

22 files changed

+654
-251
lines changed

conn/pool.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/dgraph-io/dgraph/protos/pb"
2929
"github.com/dgraph-io/dgraph/x"
3030
"github.com/golang/glog"
31+
"go.opencensus.io/plugin/ocgrpc"
3132

3233
"google.golang.org/grpc"
3334
)
@@ -124,6 +125,7 @@ func (p *Pools) Connect(addr string) *Pool {
124125
// NewPool creates a new "pool" with one gRPC connection, refcount 0.
125126
func NewPool(addr string) (*Pool, error) {
126127
conn, err := grpc.Dial(addr,
128+
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
127129
grpc.WithDefaultCallOptions(
128130
grpc.MaxCallRecvMsgSize(x.GrpcMaxSize),
129131
grpc.MaxCallSendMsgSize(x.GrpcMaxSize)),

dgraph/cmd/alpha/run.go

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ import (
4444
"github.com/spf13/cobra"
4545
"go.opencensus.io/exporter/jaeger"
4646
"go.opencensus.io/plugin/ocgrpc"
47-
"go.opencensus.io/stats/view"
4847
otrace "go.opencensus.io/trace"
48+
"go.opencensus.io/zpages"
4949
"golang.org/x/net/context"
5050
"golang.org/x/net/trace"
5151
"google.golang.org/grpc"
@@ -92,7 +92,8 @@ they form a Raft group and provide synchronous replication.
9292
"[mmap, disk] Specifies how Badger Value log is stored."+
9393
" mmap consumes more RAM, but provides better performance.")
9494

95-
flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.")
95+
// OpenCensus flags.
96+
flag.Float64("trace", 1.0, "The ratio of queries to trace.")
9697
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
9798

9899
flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
@@ -103,7 +104,6 @@ they form a Raft group and provide synchronous replication.
103104
flag.String("export", "export", "Folder in which to store exports.")
104105
flag.Int("pending_proposals", 256,
105106
"Number of pending mutation proposals. Useful for rate limiting.")
106-
flag.Float64("trace", 0.0, "The ratio of queries to trace.")
107107
flag.String("my", "",
108108
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
109109
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
@@ -242,43 +242,35 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) {
242242

243243
func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) {
244244
defer wg.Done()
245-
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
246-
glog.Fatalf("Unable to register opencensus: %v", err)
247-
}
248-
249-
handler := &ocgrpc.ServerHandler{
250-
IsPublicEndpoint: true,
251-
StartOptions: otrace.StartOptions{
252-
Sampler: otrace.AlwaysSample(),
253-
},
254-
}
255-
opt := []grpc.ServerOption{
256-
grpc.MaxRecvMsgSize(x.GrpcMaxSize),
257-
grpc.MaxSendMsgSize(x.GrpcMaxSize),
258-
grpc.MaxConcurrentStreams(1000),
259-
grpc.StatsHandler(handler),
260-
}
261-
if tlsCfg != nil {
262-
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
263-
}
264245

265-
if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 {
246+
if collector := Alpha.Conf.GetString("jaeger.collector"); len(collector) > 0 {
266247
// Port details: https://www.jaegertracing.io/docs/getting-started/
267-
// Default endpoints are:
268-
// agentEndpointURI := "localhost:6831"
269-
// collectorEndpointURI := "http://localhost:14268"
270-
collector := Alpha.Conf.GetString("jaeger.collector")
248+
// Default collectorEndpointURI := "http://localhost:14268"
271249
je, err := jaeger.NewExporter(jaeger.Options{
272-
AgentEndpoint: agent,
273-
Endpoint: collector,
274-
ServiceName: "dgraph.alpha",
250+
Endpoint: collector,
251+
ServiceName: "dgraph.alpha",
275252
})
276253
if err != nil {
277254
log.Fatalf("Failed to create the Jaeger exporter: %v", err)
278255
}
279256
// And now finally register it as a Trace Exporter
280257
otrace.RegisterExporter(je)
281258
}
259+
// Exclusively for stats, metrics, etc. Not for tracing.
260+
// var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...)
261+
// if err := view.Register(views...); err != nil {
262+
// glog.Fatalf("Unable to register OpenCensus stats: %v", err)
263+
// }
264+
265+
opt := []grpc.ServerOption{
266+
grpc.MaxRecvMsgSize(x.GrpcMaxSize),
267+
grpc.MaxSendMsgSize(x.GrpcMaxSize),
268+
grpc.MaxConcurrentStreams(1000),
269+
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
270+
}
271+
if tlsCfg != nil {
272+
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
273+
}
282274

283275
s := grpc.NewServer(opt...)
284276
api.RegisterDgraphServer(s, &edgraph.Server{})
@@ -350,12 +342,18 @@ func setupServer() {
350342
http.HandleFunc("/alter", alterHandler)
351343
http.HandleFunc("/health", healthCheck)
352344
http.HandleFunc("/share", shareHandler)
345+
346+
// TODO: Figure out what this is for?
353347
http.HandleFunc("/debug/store", storeStatsHandler)
348+
354349
http.HandleFunc("/admin/shutdown", shutDownHandler)
355350
http.HandleFunc("/admin/backup", backupHandler)
356351
http.HandleFunc("/admin/export", exportHandler)
357352
http.HandleFunc("/admin/config/lru_mb", memoryLimitHandler)
358353

354+
// Add OpenCensus z-pages.
355+
zpages.Handle(http.DefaultServeMux, "/z")
356+
359357
http.HandleFunc("/", homeHandler)
360358
http.HandleFunc("/ui/keywords", keywordHandler)
361359

@@ -430,6 +428,8 @@ func run() {
430428
return true, true
431429
}
432430
}
431+
otrace.ApplyConfig(otrace.Config{
432+
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)})
433433

434434
// Posting will initialize index which requires schema. Hence, initialize
435435
// schema before calling posting.Init().

dgraph/cmd/debug/run.go

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ func init() {
7070
flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
7171
}
7272

73+
func toInt(o *pb.Posting) int {
74+
from := types.Val{
75+
Tid: types.TypeID(o.ValType),
76+
Value: o.Value,
77+
}
78+
out, err := types.Convert(from, types.StringID)
79+
x.Check(err)
80+
val := out.Value.(string)
81+
a, err := strconv.Atoi(val)
82+
if err != nil {
83+
return 0
84+
}
85+
return a
86+
}
87+
7388
func readAmount(txn *badger.Txn, uid uint64) int {
7489
iopt := badger.DefaultIteratorOptions
7590
iopt.AllVersions = true
@@ -86,16 +101,7 @@ func readAmount(txn *badger.Txn, uid uint64) int {
86101
var times int
87102
var amount int
88103
err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
89-
from := types.Val{
90-
Tid: types.TypeID(o.ValType),
91-
Value: o.Value,
92-
}
93-
out, err := types.Convert(from, types.StringID)
94-
x.Check(err)
95-
val := out.Value.(string)
96-
a, err := strconv.Atoi(val)
97-
x.Check(err)
98-
amount = a
104+
amount = toInt(o)
99105
times++
100106
return nil
101107
})
@@ -155,7 +161,7 @@ func seekTotal(db *badger.DB, readTs uint64) int {
155161
var total int
156162
for uid, key := range keys {
157163
a := readAmount(txn, uid)
158-
fmt.Printf("uid: %-5d key: %d amount: %d\n", uid, key, a)
164+
fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
159165
total += a
160166
}
161167
fmt.Printf("Total @ %d = %d\n", readTs, total)
@@ -192,6 +198,12 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
192198
itr := txn.NewIterator(badger.DefaultIteratorOptions)
193199
defer itr.Close()
194200

201+
type account struct {
202+
Key int
203+
Amt int
204+
}
205+
keys := make(map[uint64]*account)
206+
195207
var buf bytes.Buffer
196208
fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs)
197209
for itr.Rewind(); itr.Valid(); itr.Next() {
@@ -204,15 +216,39 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
204216
if !pk.IsData() || pk.Attr == "_predicate_" {
205217
continue
206218
}
207-
fmt.Fprintf(&buf, "\nkey: %+v hex: %x\n", pk, item.Key())
219+
220+
var acc *account
221+
if pk.Attr == "key_0" || pk.Attr == "amount_0" {
222+
var has bool
223+
acc, has = keys[pk.Uid]
224+
if !has {
225+
acc = &account{}
226+
keys[pk.Uid] = acc
227+
}
228+
}
229+
fmt.Fprintf(&buf, " key: %+v hex: %x\n", pk, item.Key())
208230
val, err := item.ValueCopy(nil)
209231
x.Check(err)
210232
var plist pb.PostingList
211233
x.Check(plist.Unmarshal(val))
212234

235+
x.AssertTrue(len(plist.Postings) <= 1)
236+
var num int
213237
for _, p := range plist.Postings {
238+
num = toInt(p)
214239
appendPosting(&buf, p)
215240
}
241+
if num > 0 && acc != nil {
242+
switch pk.Attr {
243+
case "key_0":
244+
acc.Key = num
245+
case "amount_0":
246+
acc.Amt = num
247+
}
248+
}
249+
}
250+
for uid, acc := range keys {
251+
fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
216252
}
217253
fmt.Println(buf.String())
218254
}

dgraph/cmd/zero/assign.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (s *Server) updateLeases() {
4141
s.nextLeaseId = s.state.MaxLeaseId + 1
4242
s.nextTxnTs = s.state.MaxTxnTs + 1
4343
startTs = s.nextTxnTs
44+
glog.Infof("Updated Lease id: %d. Txn Ts: %d", s.nextLeaseId, s.nextTxnTs)
4445
s.Unlock()
4546
s.orc.updateStartTxnTs(startTs)
4647
}

dgraph/cmd/zero/oracle.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/dgraph-io/dgraph/protos/pb"
2727
"github.com/dgraph-io/dgraph/x"
2828
"github.com/golang/glog"
29+
otrace "go.opencensus.io/trace"
2930
"golang.org/x/net/context"
3031
)
3132

@@ -67,6 +68,7 @@ func (o *Oracle) updateStartTxnTs(ts uint64) {
6768
o.keyCommit = make(map[string]uint64)
6869
}
6970

71+
// TODO: This should be done during proposal application for Txn status.
7072
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
7173
// This transaction was started before I became leader.
7274
if src.StartTs < o.startTxnTs {
@@ -84,6 +86,10 @@ func (o *Oracle) purgeBelow(minTs uint64) {
8486
o.Lock()
8587
defer o.Unlock()
8688

89+
// TODO: HACK. Remove this later.
90+
glog.Infof("Not purging below: %d", minTs)
91+
return
92+
8793
// Dropping would be cheaper if abort/commits map is sharded
8894
for ts := range o.commits {
8995
if ts < minTs {
@@ -197,6 +203,7 @@ func (o *Oracle) sendDeltasToSubscribers() {
197203
// Don't goto slurp_loop, because it would break from select immediately.
198204
}
199205

206+
glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
200207
o.Lock()
201208
for id, ch := range o.subscribers {
202209
select {
@@ -273,6 +280,21 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
273280
CommitTs: src.CommitTs,
274281
Aborted: src.Aborted,
275282
}
283+
284+
// NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
285+
// happen, irrespective of what the user context timeout might be. We check for it before
286+
// reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
287+
// through. Otherwise, we should block here forever. If we don't do this, we'll see txn
288+
// violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
289+
// cause newer txns to see older data.
290+
291+
// We could consider adding a wrapper around the user proposal, so we can access any key-values.
292+
// Something like this:
293+
// https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4
294+
ctx = context.Background() // Use a new context with no timeout.
295+
296+
// If this node stops being the leader, we want this proposal to not be forwarded to the leader,
297+
// and get aborted.
276298
if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
277299
return err
278300
}
@@ -292,6 +314,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
292314
}
293315

294316
func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
317+
span := otrace.FromContext(ctx)
318+
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
295319
if src.Aborted {
296320
return s.proposeTxn(ctx, src)
297321
}
@@ -301,6 +325,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
301325
conflict := s.orc.hasConflict(src)
302326
s.orc.RUnlock()
303327
if conflict {
328+
span.Annotate(nil, "Oracle found conflict")
304329
src.Aborted = true
305330
return s.proposeTxn(ctx, src)
306331
}
@@ -328,6 +353,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
328353
for pred := range preds {
329354
tablet := s.ServingTablet(pred)
330355
if tablet == nil || tablet.GetReadOnly() {
356+
span.Annotate(nil, "Tablet is readonly. Aborting.")
331357
src.Aborted = true
332358
return s.proposeTxn(ctx, src)
333359
}
@@ -341,8 +367,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
341367
src.CommitTs = assigned.StartId
342368
// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
343369
defer s.orc.doneUntil.Done(src.CommitTs)
370+
span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
371+
"Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)
344372

345373
if err := s.orc.commit(src); err != nil {
374+
span.Annotatef(nil, "Found a conflict. Aborting.")
375+
src.Aborted = true
376+
}
377+
if err := ctx.Err(); err != nil {
378+
span.Annotatef(nil, "Aborting txn due to context timing out.")
346379
src.Aborted = true
347380
}
348381
// Propose txn should be used to set watermark as done.
@@ -353,6 +386,9 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T
353386
if ctx.Err() != nil {
354387
return nil, ctx.Err()
355388
}
389+
ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
390+
defer span.End()
391+
356392
if !s.Node.AmLeader() {
357393
return nil, x.Errorf("Only leader can decide to commit or abort")
358394
}

dgraph/cmd/zero/raft.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
7070
}
7171

7272
propose := func(timeout time.Duration) error {
73+
if !n.AmLeader() {
74+
return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal)
75+
}
7376
cctx, cancel := context.WithTimeout(ctx, timeout)
7477
defer cancel()
7578

@@ -84,6 +87,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
8487
defer n.Proposals.Delete(key)
8588
proposal.Key = key
8689

90+
// TODO: Remove this and use OpenCensus spans.
8791
if tr, ok := trace.FromContext(ctx); ok {
8892
tr.LazyPrintf("Proposing with key: %X", key)
8993
}
@@ -119,6 +123,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
119123
for err == errInternalRetry {
120124
err = propose(timeout)
121125
timeout *= 2 // Exponential backoff
126+
if timeout > time.Minute {
127+
timeout = 32 * time.Second
128+
}
122129
}
123130
return err
124131
}
@@ -523,9 +530,9 @@ func (n *node) Run() {
523530
n.Applied.Done(entry.Index)
524531
}
525532

526-
// TODO: Should we move this to the top?
527533
if rd.SoftState != nil {
528534
if rd.RaftState == raft.StateLeader && !leader {
535+
glog.Infoln("I've become the leader, updating leases.")
529536
n.server.updateLeases()
530537
}
531538
leader = rd.RaftState == raft.StateLeader

0 commit comments

Comments
 (0)