Skip to content
Permalink
Browse files
Integrate Opencensus and fix Jepsen bank test (#2764)
Got it. Fixed it. Squashed it. Done with it. Tamed it. Time to bask in the glory of victory!

Fixed Jepsen bank test violation during a network partition. The violation was happening because:
1. Current Zero leader receives a commit request and assigns a commit timestamp.
2. Gets partitioned out, so the proposal gets blocked.
3. Another Zero becomes the leader and renews the txn timestamp lease, starting at a much higher number (previous lease + lease bandwidth of 10K).
4. The new leader services new txn requests, which advances all Alphas to a much higher MaxApplied timestamp. 
5. The previous leader, who is now the follower, retries the old commit proposal and succeeds. This causes 2 issues.
  a) A later txn read doesn't see this commit, and advances.
  b) Alpha rejects a write on a posting list key, which already has a new commit at higher ts.
Both of the scenarios caused bank txn test violation.

Open Census and Dgraph debug disect was instrumental in determining the cause of this violation. The tell-tale sign was noticing a /Commit timeout of one of the penultimate commits, to the violating commit.

Fixes:

0. Use OpenCensus as a torch to light every path that a txn took, to determine the cause.
1. Do not allow a context deadline when proposing a txn commit.
2. Do not allow Zero follower to propagate proposals to Zero leader.

Tested this PR to fix issue #2391 . Tested with partition-ring and clock-skew nemesis in a 10-node cluster. More testing is needed around predicate moves.

Changelog:

* Trial: Do not purge anything (needs to be refactored and reintroduced before a release).
* More debugging, more tracing for Jepsen.
* Opencensus in Zero.
* One fix for Jepsen test, which ensures that a context deadline cannot just cancel a txn proposal. Need some refactoring of how Zero gets membership updates, now that we need Zero to not forward proposals to the leader.
* Update Raft lib, so we have access to the feature to disallow Raft proposal forwarding.
* Update raftpb proto package as well
* Dirty changes to ensure that Zero followers don't forward proposals to Zero leader.
* Various Opencensus integration changes.
  • Loading branch information
manishrjain committed Nov 20, 2018
1 parent b720fee commit 3b6d817f4d4cc575578119036fc3a2c9c08efec6
Showing 22 changed files with 654 additions and 251 deletions.
@@ -28,6 +28,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
"go.opencensus.io/plugin/ocgrpc"

"google.golang.org/grpc"
)
@@ -124,6 +125,7 @@ func (p *Pools) Connect(addr string) *Pool {
// NewPool creates a new "pool" with one gRPC connection, refcount 0.
func NewPool(addr string) (*Pool, error) {
conn, err := grpc.Dial(addr,
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(x.GrpcMaxSize),
grpc.MaxCallSendMsgSize(x.GrpcMaxSize)),
@@ -44,8 +44,8 @@ import (
"github.com/spf13/cobra"
"go.opencensus.io/exporter/jaeger"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
otrace "go.opencensus.io/trace"
"go.opencensus.io/zpages"
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc"
@@ -92,7 +92,8 @@ they form a Raft group and provide synchronous replication.
"[mmap, disk] Specifies how Badger Value log is stored."+
" mmap consumes more RAM, but provides better performance.")

flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.")
// OpenCensus flags.
flag.Float64("trace", 1.0, "The ratio of queries to trace.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")

flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.")
@@ -103,7 +104,6 @@ they form a Raft group and provide synchronous replication.
flag.String("export", "export", "Folder in which to store exports.")
flag.Int("pending_proposals", 256,
"Number of pending mutation proposals. Useful for rate limiting.")
flag.Float64("trace", 0.0, "The ratio of queries to trace.")
flag.String("my", "",
"IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.")
flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc),
@@ -242,43 +242,35 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) {

func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) {
defer wg.Done()
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
glog.Fatalf("Unable to register opencensus: %v", err)
}

handler := &ocgrpc.ServerHandler{
IsPublicEndpoint: true,
StartOptions: otrace.StartOptions{
Sampler: otrace.AlwaysSample(),
},
}
opt := []grpc.ServerOption{
grpc.MaxRecvMsgSize(x.GrpcMaxSize),
grpc.MaxSendMsgSize(x.GrpcMaxSize),
grpc.MaxConcurrentStreams(1000),
grpc.StatsHandler(handler),
}
if tlsCfg != nil {
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
}

if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 {
if collector := Alpha.Conf.GetString("jaeger.collector"); len(collector) > 0 {
// Port details: https://www.jaegertracing.io/docs/getting-started/
// Default endpoints are:
// agentEndpointURI := "localhost:6831"
// collectorEndpointURI := "http://localhost:14268"
collector := Alpha.Conf.GetString("jaeger.collector")
// Default collectorEndpointURI := "http://localhost:14268"
je, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: agent,
Endpoint: collector,
ServiceName: "dgraph.alpha",
Endpoint: collector,
ServiceName: "dgraph.alpha",
})
if err != nil {
log.Fatalf("Failed to create the Jaeger exporter: %v", err)
}
// And now finally register it as a Trace Exporter
otrace.RegisterExporter(je)
}
// Exclusively for stats, metrics, etc. Not for tracing.
// var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...)
// if err := view.Register(views...); err != nil {
// glog.Fatalf("Unable to register OpenCensus stats: %v", err)
// }

opt := []grpc.ServerOption{
grpc.MaxRecvMsgSize(x.GrpcMaxSize),
grpc.MaxSendMsgSize(x.GrpcMaxSize),
grpc.MaxConcurrentStreams(1000),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
}
if tlsCfg != nil {
opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg)))
}

s := grpc.NewServer(opt...)
api.RegisterDgraphServer(s, &edgraph.Server{})
@@ -350,12 +342,18 @@ func setupServer() {
http.HandleFunc("/alter", alterHandler)
http.HandleFunc("/health", healthCheck)
http.HandleFunc("/share", shareHandler)

// TODO: Figure out what this is for?
http.HandleFunc("/debug/store", storeStatsHandler)

http.HandleFunc("/admin/shutdown", shutDownHandler)
http.HandleFunc("/admin/backup", backupHandler)
http.HandleFunc("/admin/export", exportHandler)
http.HandleFunc("/admin/config/lru_mb", memoryLimitHandler)

// Add OpenCensus z-pages.
zpages.Handle(http.DefaultServeMux, "/z")

http.HandleFunc("/", homeHandler)
http.HandleFunc("/ui/keywords", keywordHandler)

@@ -430,6 +428,8 @@ func run() {
return true, true
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
@@ -70,6 +70,21 @@ func init() {
flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
}

func toInt(o *pb.Posting) int {
from := types.Val{
Tid: types.TypeID(o.ValType),
Value: o.Value,
}
out, err := types.Convert(from, types.StringID)
x.Check(err)
val := out.Value.(string)
a, err := strconv.Atoi(val)
if err != nil {
return 0
}
return a
}

func readAmount(txn *badger.Txn, uid uint64) int {
iopt := badger.DefaultIteratorOptions
iopt.AllVersions = true
@@ -86,16 +101,7 @@ func readAmount(txn *badger.Txn, uid uint64) int {
var times int
var amount int
err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error {
from := types.Val{
Tid: types.TypeID(o.ValType),
Value: o.Value,
}
out, err := types.Convert(from, types.StringID)
x.Check(err)
val := out.Value.(string)
a, err := strconv.Atoi(val)
x.Check(err)
amount = a
amount = toInt(o)
times++
return nil
})
@@ -155,7 +161,7 @@ func seekTotal(db *badger.DB, readTs uint64) int {
var total int
for uid, key := range keys {
a := readAmount(txn, uid)
fmt.Printf("uid: %-5d key: %d amount: %d\n", uid, key, a)
fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a)
total += a
}
fmt.Printf("Total @ %d = %d\n", readTs, total)
@@ -192,6 +198,12 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

type account struct {
Key int
Amt int
}
keys := make(map[uint64]*account)

var buf bytes.Buffer
fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs)
for itr.Rewind(); itr.Valid(); itr.Next() {
@@ -204,15 +216,39 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) {
if !pk.IsData() || pk.Attr == "_predicate_" {
continue
}
fmt.Fprintf(&buf, "\nkey: %+v hex: %x\n", pk, item.Key())

var acc *account
if pk.Attr == "key_0" || pk.Attr == "amount_0" {
var has bool
acc, has = keys[pk.Uid]
if !has {
acc = &account{}
keys[pk.Uid] = acc
}
}
fmt.Fprintf(&buf, " key: %+v hex: %x\n", pk, item.Key())
val, err := item.ValueCopy(nil)
x.Check(err)
var plist pb.PostingList
x.Check(plist.Unmarshal(val))

x.AssertTrue(len(plist.Postings) <= 1)
var num int
for _, p := range plist.Postings {
num = toInt(p)
appendPosting(&buf, p)
}
if num > 0 && acc != nil {
switch pk.Attr {
case "key_0":
acc.Key = num
case "amount_0":
acc.Amt = num
}
}
}
for uid, acc := range keys {
fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt)
}
fmt.Println(buf.String())
}
@@ -41,6 +41,7 @@ func (s *Server) updateLeases() {
s.nextLeaseId = s.state.MaxLeaseId + 1
s.nextTxnTs = s.state.MaxTxnTs + 1
startTs = s.nextTxnTs
glog.Infof("Updated Lease id: %d. Txn Ts: %d", s.nextLeaseId, s.nextTxnTs)
s.Unlock()
s.orc.updateStartTxnTs(startTs)
}
@@ -26,6 +26,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
otrace "go.opencensus.io/trace"
"golang.org/x/net/context"
)

@@ -67,6 +68,7 @@ func (o *Oracle) updateStartTxnTs(ts uint64) {
o.keyCommit = make(map[string]uint64)
}

// TODO: This should be done during proposal application for Txn status.
func (o *Oracle) hasConflict(src *api.TxnContext) bool {
// This transaction was started before I became leader.
if src.StartTs < o.startTxnTs {
@@ -84,6 +86,10 @@ func (o *Oracle) purgeBelow(minTs uint64) {
o.Lock()
defer o.Unlock()

// TODO: HACK. Remove this later.
glog.Infof("Not purging below: %d", minTs)
return

// Dropping would be cheaper if abort/commits map is sharded
for ts := range o.commits {
if ts < minTs {
@@ -197,6 +203,7 @@ func (o *Oracle) sendDeltasToSubscribers() {
// Don't goto slurp_loop, because it would break from select immediately.
}

glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
o.Lock()
for id, ch := range o.subscribers {
select {
@@ -273,6 +280,21 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
CommitTs: src.CommitTs,
Aborted: src.Aborted,
}

// NOTE: It is important that we continue retrying proposeTxn until we succeed. This should
// happen, irrespective of what the user context timeout might be. We check for it before
// reaching this stage, but now that we're here, we have to ensure that the commit proposal goes
// through. Otherwise, we should block here forever. If we don't do this, we'll see txn
// violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would
// cause newer txns to see older data.

// We could consider adding a wrapper around the user proposal, so we can access any key-values.
// Something like this:
// https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4
ctx = context.Background() // Use a new context with no timeout.

// If this node stops being the leader, we want this proposal to not be forwarded to the leader,
// and get aborted.
if err := s.Node.proposeAndWait(ctx, &zp); err != nil {
return err
}
@@ -292,6 +314,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error {
}

func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
span := otrace.FromContext(ctx)
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "")
if src.Aborted {
return s.proposeTxn(ctx, src)
}
@@ -301,6 +325,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
conflict := s.orc.hasConflict(src)
s.orc.RUnlock()
if conflict {
span.Annotate(nil, "Oracle found conflict")
src.Aborted = true
return s.proposeTxn(ctx, src)
}
@@ -328,6 +353,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
for pred := range preds {
tablet := s.ServingTablet(pred)
if tablet == nil || tablet.GetReadOnly() {
span.Annotate(nil, "Tablet is readonly. Aborting.")
src.Aborted = true
return s.proposeTxn(ctx, src)
}
@@ -341,8 +367,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
src.CommitTs = assigned.StartId
// Mark the transaction as done, irrespective of whether the proposal succeeded or not.
defer s.orc.doneUntil.Done(src.CommitTs)
span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))},
"Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src)

if err := s.orc.commit(src); err != nil {
span.Annotatef(nil, "Found a conflict. Aborting.")
src.Aborted = true
}
if err := ctx.Err(); err != nil {
span.Annotatef(nil, "Aborting txn due to context timing out.")
src.Aborted = true
}
// Propose txn should be used to set watermark as done.
@@ -353,6 +386,9 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T
if ctx.Err() != nil {
return nil, ctx.Err()
}
ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort")
defer span.End()

if !s.Node.AmLeader() {
return nil, x.Errorf("Only leader can decide to commit or abort")
}
@@ -70,6 +70,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
}

propose := func(timeout time.Duration) error {
if !n.AmLeader() {
return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal)
}
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

@@ -84,6 +87,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
defer n.Proposals.Delete(key)
proposal.Key = key

// TODO: Remove this and use OpenCensus spans.
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Proposing with key: %X", key)
}
@@ -119,6 +123,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er
for err == errInternalRetry {
err = propose(timeout)
timeout *= 2 // Exponential backoff
if timeout > time.Minute {
timeout = 32 * time.Second
}
}
return err
}
@@ -523,9 +530,9 @@ func (n *node) Run() {
n.Applied.Done(entry.Index)
}

// TODO: Should we move this to the top?
if rd.SoftState != nil {
if rd.RaftState == raft.StateLeader && !leader {
glog.Infoln("I've become the leader, updating leases.")
n.server.updateLeases()
}
leader = rd.RaftState == raft.StateLeader

0 comments on commit 3b6d817

Please sign in to comment.