Skip to content

Commit

Permalink
[raft] check meta range when determine whether a node is a zombie (#6601
Browse files Browse the repository at this point in the history
)
  • Loading branch information
luluz66 committed May 20, 2024
1 parent 5fde9ec commit f84669c
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 20 deletions.
72 changes: 52 additions & 20 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,44 +950,76 @@ func (s *Store) acquireNodeLiveness(ctx context.Context) {
}
}

func (s *Store) isRangelessNode(shardID uint64) bool {
if rd := s.lookupRange(shardID); rd != nil {
for _, r := range rd.GetReplicas() {
if r.GetShardId() == shardID {
return false
}
}
}
return true
}
type membershipStatus int

// If the replica is behind: don’t kill
// If the replica is one of the replicas specified in the range: don’t kill
// Otherwise: kill
func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo) bool {
const (
membershipStatusUnknown membershipStatus = iota
membershipStatusMember
membershipStatusNotMember
)

// checkMembershipStatus checks the status of the membership given the shard info.
// Returns membershipStatusMember when the replica's config change is current and
// it's one of the voters, non-voters, or witnesses;
// Returns membershipStatusUnknown when the replica's config change is behind or
// there is error to get membership info.
func (s *Store) checkMembershipStatus(ctx context.Context, shardInfo dragonboat.ShardInfo) membershipStatus {
// Get the config change index for this shard.
membership, err := s.getMembership(ctx, shardInfo.ShardID)
if err != nil {
s.log.Errorf("Error gettting membership for shard %d: %s", shardInfo.ShardID, err)
return false
s.log.Errorf("checkMembershipStatus failed to get membership for shard %d: %s", shardInfo.ShardID, err)
return membershipStatusUnknown
}

if shardInfo.ConfigChangeIndex > 0 && shardInfo.ConfigChangeIndex <= membership.ConfigChangeID {
return false
return membershipStatusUnknown
}

for replicaID := range membership.Nodes {
if replicaID == shardInfo.ReplicaID {
return false
return membershipStatusMember
}
}
for replicaID := range membership.NonVotings {
if replicaID == shardInfo.ReplicaID {
return false
return membershipStatusMember
}
}
for replicaID := range membership.Witnesses {
if replicaID == shardInfo.ReplicaID {
return membershipStatusMember
}
}
return membershipStatusNotMember
}

// isZombieNode checks whether a node is a zombie node.
func (s *Store) isZombieNode(ctx context.Context, shardInfo dragonboat.ShardInfo) bool {
membershipStatus := s.checkMembershipStatus(ctx, shardInfo)
if membershipStatus == membershipStatusNotMember {
return true
}

rd := s.lookupRange(shardInfo.ShardID)
if rd == nil {
return true
}

// The replica info in the local range descriptor could be out of date. For
// example, when another node in the raft cluster proposed to remove this node
// when this node is dead. When this node restarted, the range descriptor is
// behind, but it cannot get updates from other nodes b/c it was removed from the
// cluster.
updatedRD, err := s.Sender().LookupRangeDescriptor(ctx, rd.GetStart(), false /*skip Cache */)
if err != nil {
log.Errorf("failed to look up range descriptor: %s", err)
return false
}
if updatedRD.GetGeneration() >= rd.GetGeneration() {
rd = updatedRD
}
for _, r := range rd.GetReplicas() {
if r.GetShardId() == shardInfo.ShardID && r.GetReplicaId() == shardInfo.ReplicaID {
return false
}
}
Expand Down Expand Up @@ -1017,7 +1049,7 @@ func (s *Store) cleanupZombieNodes(ctx context.Context) {
sInfo := nInfo.ShardInfoList[idx]
idx += 1

if s.isZombieNode(ctx, sInfo) || s.isRangelessNode(sInfo.ShardID) {
if s.isZombieNode(ctx, sInfo) {
s.log.Debugf("Removing zombie node: %+v...", sInfo)
if err := s.nodeHost.StopReplica(sInfo.ShardID, sInfo.ReplicaID); err != nil {
s.log.Errorf("Error stopping zombie replica: %s", err)
Expand Down
76 changes: 76 additions & 0 deletions enterprise/server/raft/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,82 @@ func TestCleanupZombieShards(t *testing.T) {
t.Fatalf("Zombie killer never cleaned up zombie range 2")
}

func TestCleanupZombieReplicas(t *testing.T) {
flag.Set("cache.raft.zombie_node_scan_interval", "100ms")

sf := newStoreFactory(t)
s1, nh1 := sf.NewStore(t)
s2, nh2 := sf.NewStore(t)
ctx := context.Background()

err := bringup.SendStartShardRequests(ctx, s1.NodeHost, s1.APIClient, map[string]string{
nh1.ID(): s1.GRPCAddress,
nh2.ID(): s2.GRPCAddress,
})
require.NoError(t, err)

stores := []*TestingStore{s1, s2}
waitForRangeLease(t, stores, 2)

s := getStoreWithRangeLease(t, stores, 2)
rd := s.GetRange(2)

require.Equal(t, len(rd.GetReplicas()), 2)

// Remove replica of range 2 on nh1 in meta range
replicas := make([]*rfpb.ReplicaDescriptor, 0, len(rd.GetReplicas())-1)
for _, repl := range rd.GetReplicas() {
if repl.GetNhid() == nh1.ID() {
continue
}
replicas = append(replicas, repl)
}
rd.Replicas = replicas
require.Equal(t, 1, len(replicas))
rd.Generation = rd.GetGeneration() + 1
protoBytes, err := proto.Marshal(rd)
require.NoError(t, err)

// Write the range descriptor the meta range
writeReq, err := rbuilder.NewBatchBuilder().Add(&rfpb.DirectWriteRequest{
Kv: &rfpb.KV{
Key: keys.RangeMetaKey(rd.GetEnd()),
Value: protoBytes,
},
}).ToProto()
require.NoError(t, err)
writeRsp, err := s1.Sender.SyncPropose(ctx, constants.MetaRangePrefix, writeReq)
require.NoError(t, err)
err = rbuilder.NewBatchResponseFromProto(writeRsp).AnyError()
require.NoError(t, err)

for i := 0; i < 30; i++ {
list, err := s1.ListReplicas(ctx, &rfpb.ListReplicasRequest{})
require.NoError(t, err)
if len(list.GetReplicas()) == 1 {
log.Debugf("listReplicas: %+v", list)
repl := list.GetReplicas()[0]
// nh1 only has shard 1
require.Equal(t, uint64(1), repl.GetShardId())

require.Nil(t, s1.GetRange(2))
_, err := s1.GetReplica(2)
require.True(t, status.IsOutOfRangeError(err))
return
}
time.Sleep(time.Second)
}
// verify that the range and replica is not removed from s2
list, err := s2.ListReplicas(ctx, &rfpb.ListReplicasRequest{})
require.NoError(t, err)
require.Equal(t, 2, len(list.GetReplicas()))
require.NotNil(t, s2.GetRange(2))
_, err = s2.GetReplica(2)
require.NoError(t, err)

t.Fatalf("Zombie killer never cleaned up zombie range 2")
}

func TestAutomaticSplitting(t *testing.T) {
flag.Set("cache.raft.entries_between_usage_checks", "1")
flag.Set("cache.raft.max_range_size_bytes", "10000")
Expand Down

0 comments on commit f84669c

Please sign in to comment.