Skip to content
Permalink
Browse files
Fix move tablet Jepsen test failure. (#4496)
* Track move ts of a pred move, so we can reject any reads which happen before the move.

* Try with a timestamp jitter to update MaxAssigned.

* Added an expected checksum, so Alpha group would only delete predicate after it has latest membership info from Zero.

* Regenerate protos.

Co-authored-by: Daniel Mai <daniel@dgraph.io>
  • Loading branch information
manishrjain and danielmai committed Jan 9, 2020
1 parent 371b747 commit ec445503f91185671fe5456eb7d8ff1ddcddb95c
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 282 deletions.
@@ -143,6 +143,7 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
Predicate: predicate,
Space: tab.Space,
Force: true,
MoveTs: in.TxnTs,
}
msg = fmt.Sprintf("Move at Alpha done. Now proposing: %+v", p)
span.Annotate(nil, msg)
@@ -155,7 +156,13 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
glog.Info(msg)
span.Annotate(nil, msg)

// Now that the move has happened, we can delete the predicate from the source group.
// Now that the move has happened, we can delete the predicate from the source group. But before
// doing that, we should ensure the source group understands that the predicate is now being
// served by the destination group. For that, we pass in the expected checksum for the source
// group. Only once the source group membership checksum matches, would the source group delete
// the predicate. This ensures that it does not service any transaction after deletion of data.
checksums := s.groupChecksums()
in.ExpectedChecksum = checksums[in.SourceGid]
in.DestGid = 0 // Indicates deletion of predicate in the source group.
if _, err := wc.MovePredicate(ctx, in); err != nil {
msg = fmt.Sprintf("While deleting predicate [%v] in group %d. Error: %v",
@@ -25,7 +25,7 @@ syntax = "proto3";
package pb;

import "api.proto";
import "github.com/dgraph-io/badger/v2/pb/pb.proto";
import "github.com/dgraph-io/badger/pb/pb.proto";

/* import "gogoproto/gogo.proto"; */

@@ -187,7 +187,8 @@ message Tablet {
bool force = 3; // Used while moving predicate.
int64 space = 7;
bool remove = 8;
bool read_only = 9; // If true, do not ask zero to serve any tablets.
bool read_only = 9; // If true, do not ask zero to serve any tablets.
uint64 move_ts = 10;
}

message DirectedEdge {
@@ -307,9 +308,11 @@ message UidBlock {
// because when the PB is brought to memory, Go would always use 8-bytes per integer. Instead,
// storing it as a byte slice is a lot cheaper in memory.
bytes deltas = 2;
// num_uids is the number of UIDs in the block. We are including this because we want to
// swtich encoding to groupvarint encoding. Current avaialble open source version implements
// encoding and decoding for uint32. We want to wrap it around our logic to use it here.
// num_uids is the number of UIDs in the block. We are including this because we want to
// switch encoding to groupvarint encoding. Current avaialble open source version implements
// encoding and decoding for uint32. To use that, we create different blocks for different 32-bit
// MSB base uids. That is, if the 32 MSBs are different, we will create a new block irrespective
// of whether the block is filled with the block_size or not.
// Default Blocksize is 256 so uint32 would be sufficient.
uint32 num_uids = 3;
}
@@ -431,10 +434,11 @@ message MapEntry {
}

message MovePredicatePayload {
string predicate = 1;
uint32 source_gid = 2;
uint32 dest_gid = 3;
uint64 txn_ts = 4;
string predicate = 1;
uint32 source_gid = 2;
uint32 dest_gid = 3;
uint64 txn_ts = 4;
uint64 expected_checksum = 5;
}

message TxnStatus {

Large diffs are not rendered by default.

@@ -189,7 +189,7 @@ func (g *groupi) informZeroAboutTablets() {
func (g *groupi) proposeInitialSchema() {
initialSchema := schema.InitialSchema()
for _, s := range initialSchema {
if gid, err := g.BelongsToReadOnly(s.Predicate); err != nil {
if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil {
glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.",
s.Predicate)
g.upsertSchema(s)
@@ -386,11 +386,18 @@ func (g *groupi) BelongsTo(key string) (uint32, error) {

// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
// the tablet for key if no group is currently serving it.
func (g *groupi) BelongsToReadOnly(key string) (uint32, error) {
// The ts passed should be the start ts of the query, so this method can compare that against a
// tablet move timestamp. If the tablet was moved to this group after the start ts of the query, we
// should reject that query.
func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) {
g.RLock()
tablet := g.tablets[key]
g.RUnlock()
if tablet != nil {
if ts > 0 && ts < tablet.MoveTs {
return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
ts, tablet.MoveTs, key)
}
return tablet.GetGroupId(), nil
}

@@ -415,6 +422,10 @@ func (g *groupi) BelongsToReadOnly(key string) (uint32, error) {
g.Lock()
defer g.Unlock()
g.tablets[key] = out
if out != nil && ts > 0 && ts < out.MoveTs {
return 0, errors.Errorf("StartTs: %d is from before MoveTs: %d for pred: %q",
ts, out.MoveTs, key)
}
return out.GetGroupId(), nil
}

@@ -427,16 +438,6 @@ func (g *groupi) ServesTablet(key string) (bool, error) {
return false, nil
}

// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to
// serve the tablet for key if no group is currently serving it.
func (g *groupi) ServesTabletReadOnly(key string) (bool, error) {
gid, err := g.BelongsToReadOnly(key)
if err != nil {
return false, err
}
return gid == groups().groupId(), nil
}

// Do not modify the returned Tablet
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
emptyTablet := pb.Tablet{}
@@ -20,6 +20,8 @@ import (
"fmt"
"io"
"strconv"
"sync/atomic"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
@@ -190,6 +192,23 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
if len(in.Predicate) == 0 {
return &emptyPayload, errEmptyPredicate
}

// This loop ensures that we have seen the latest membership update following the move where
// this predicate now belongs to another group. Without this check, this group could end up
// serving transactions asking for this predicate, even after this tablet has been deleted. This
// issue is known to have caused Jepsen failures.
for in.ExpectedChecksum > 0 {
cur := atomic.LoadUint64(&groups().membershipChecksum)
if in.ExpectedChecksum == cur {
break
}
if ctx.Err() != nil {
return &emptyPayload, ctx.Err()
}
glog.Infof("Waiting for checksums to match. Expected: %d. Current: %d\n",
in.ExpectedChecksum, cur)
time.Sleep(time.Second)
}
if in.DestGid == 0 {
glog.Infof("Was instructed to delete tablet: %v", in.Predicate)
p := &pb.Proposal{CleanPredicate: in.Predicate}
@@ -59,12 +59,15 @@ func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, erro
"lang", "noconflict"}
}

myGid := groups().groupId()
for _, attr := range predicates {
// This can happen after a predicate is moved. We don't delete predicate from schema state
// immediately. So lets ignore this predicate.
if servesTablet, err := groups().ServesTabletReadOnly(attr); err != nil {
gid, err := groups().BelongsToReadOnly(attr, 0)
if err != nil {
return nil, err
} else if !servesTablet {
}
if myGid != gid {
continue
}

@@ -118,7 +121,7 @@ func populateSchema(attr string, fields []string) *pb.SchemaNode {
// empty then it adds all known groups
func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaRequest) error {
for _, attr := range schema.Predicates {
gid, err := groups().BelongsToReadOnly(attr)
gid, err := groups().BelongsToReadOnly(attr, 0)
if err != nil {
return err
}
@@ -54,7 +54,7 @@ type sortresult struct {

// SortOverNetwork sends sort query over the network.
func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error) {
gid, err := groups().BelongsToReadOnly(q.Order[0].Attr)
gid, err := groups().BelongsToReadOnly(q.Order[0].Attr, q.ReadTs)
if err != nil {
return &emptySortResult, err
} else if gid == 0 {
@@ -88,7 +88,7 @@ func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResul
ctx, span := otrace.StartSpan(ctx, "worker.Sort")
defer span.End()

gid, err := groups().BelongsToReadOnly(s.Order[0].Attr)
gid, err := groups().BelongsToReadOnly(s.Order[0].Attr, s.ReadTs)
if err != nil {
return &emptySortResult, err
}
@@ -130,7 +130,7 @@ func processWithBackupRequest(
// query.
func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) {
attr := q.Attr
gid, err := groups().BelongsToReadOnly(attr)
gid, err := groups().BelongsToReadOnly(attr, q.ReadTs)
switch {
case err != nil:
return &pb.Result{}, err
@@ -809,7 +809,7 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
// we get partitioned away from group zero as long as it's not removed.
// BelongsToReadOnly is called instead of BelongsTo to prevent this alpha
// from requesting to serve this tablet.
knownGid, err := groups().BelongsToReadOnly(q.Attr)
knownGid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs)
switch {
case err != nil:
return &pb.Result{}, err
@@ -1761,7 +1761,7 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
return &pb.Result{}, ctx.Err()
}

gid, err := groups().BelongsToReadOnly(q.Attr)
gid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs)
switch {
case err != nil:
return &pb.Result{}, err

0 comments on commit ec44550

Please sign in to comment.