Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[raft] Add methods in replica.Replica to get IDs and range descriptor. #6520

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion enterprise/server/raft/rangelease/rangelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (l *Lease) sendCasRequest(ctx context.Context, expectedValue, newVal []byte
if err != nil {
return nil, err
}
rsp, err := client.SyncProposeLocal(ctx, l.nodeHost, l.replica.ShardID, casRequest)
rsp, err := client.SyncProposeLocal(ctx, l.nodeHost, l.replica.ShardID(), casRequest)
if err != nil {
return nil, err
}
Expand Down
37 changes: 26 additions & 11 deletions enterprise/server/raft/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type Replica struct {

rootDir string
fileDir string
ShardID uint64
ReplicaID uint64
shardID uint64
replicaID uint64
// The ID of the node where this replica resided.
NHID string

Expand Down Expand Up @@ -163,7 +163,7 @@ func (sm *Replica) batchContainsKey(wb pebble.Batch, key []byte) ([]byte, bool)
}

func (sm *Replica) replicaPrefix() []byte {
prefixString := fmt.Sprintf("c%04dn%04d-", sm.ShardID, sm.ReplicaID)
prefixString := fmt.Sprintf("c%04dn%04d-", sm.shardID, sm.replicaID)
return append(constants.LocalPrefix, []byte(prefixString)...)
}

Expand All @@ -187,8 +187,8 @@ func (sm *Replica) Usage() (*rfpb.ReplicaUsage, error) {

ru := &rfpb.ReplicaUsage{
Replica: &rfpb.ReplicaDescriptor{
ShardId: sm.ShardID,
ReplicaId: sm.ReplicaID,
ShardId: sm.shardID,
ReplicaId: sm.replicaID,
Nhid: proto.String(sm.NHID),
},
RangeId: rd.GetRangeId(),
Expand Down Expand Up @@ -218,7 +218,7 @@ func (sm *Replica) String() string {
rd := sm.rangeDescriptor
sm.rangeMu.RUnlock()

return fmt.Sprintf("Replica c%dn%d %s", sm.ShardID, sm.ReplicaID, rdString(rd))
return fmt.Sprintf("Replica c%dn%d %s", sm.shardID, sm.replicaID, rdString(rd))
}

func rdString(rd *rfpb.RangeDescriptor) string {
Expand Down Expand Up @@ -954,7 +954,7 @@ func (sm *Replica) findSplitPoint() (*rfpb.FindSplitPointResponse, error) {
sm.printRange(db, iterOpts, "unsplittable range")
return nil, status.NotFoundErrorf("Could not find split point. (Total size: %d, start split size: %d", totalSize, leftSize)
}
sm.log.Debugf("Cluster %d found split @ %q start rows: %d, size: %d, end rows: %d, size: %d", sm.ShardID, splitKey, splitRows, splitSize, totalRows-splitRows, totalSize-splitSize)
sm.log.Debugf("Cluster %d found split @ %q start rows: %d, size: %d, end rows: %d, size: %d", sm.shardID, splitKey, splitRows, splitSize, totalRows-splitRows, totalSize-splitSize)
return &rfpb.FindSplitPointResponse{
SplitKey: splitKey,
}, nil
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func statusProto(err error) *statuspb.Status {
func (sm *Replica) handlePostCommit(hook *rfpb.PostCommitHook) {
if snap := hook.GetSnapshotCluster(); snap != nil {
go func() {
if err := sm.store.SnapshotCluster(context.TODO(), sm.ShardID); err != nil {
if err := sm.store.SnapshotCluster(context.TODO(), sm.shardID); err != nil {
sm.log.Errorf("Error processing post-commit hook: %s", err)
}
}()
Expand Down Expand Up @@ -2010,7 +2010,7 @@ func (sm *Replica) SaveSnapshot(preparedSnap interface{}, w io.Writer, quit <-ch
// RecoverFromSnapshot is not required to synchronize its recovered in-core
// state with that on disk.
func (sm *Replica) RecoverFromSnapshot(r io.Reader, quit <-chan struct{}) error {
log.Debugf("RecoverFromSnapshot for ShardID=%d, ReplicaID=%d", sm.ShardID, sm.ReplicaID)
log.Debugf("RecoverFromSnapshot for ShardID=%d, ReplicaID=%d", sm.shardID, sm.replicaID)
db, err := sm.leaser.DB()
if err != nil {
return err
Expand All @@ -2030,6 +2030,21 @@ func (sm *Replica) RecoverFromSnapshot(r io.Reader, quit <-chan struct{}) error
return sm.loadReplicaState(db)
}

func (sm *Replica) RangeDescriptor() *rfpb.RangeDescriptor {
sm.rangeMu.RLock()
rd := sm.rangeDescriptor
sm.rangeMu.RUnlock()
return rd.CloneVT()
}

func (sm *Replica) ReplicaID() uint64 {
return sm.replicaID
}

func (sm *Replica) ShardID() uint64 {
return sm.shardID
}

func (sm *Replica) TestingDB() (pebble.IPebbleDB, error) {
return sm.leaser.DB()
}
Expand Down Expand Up @@ -2072,8 +2087,8 @@ func (sm *Replica) Close() error {
// New creates a new Replica, an on-disk state machine.
func New(leaser pebble.Leaser, shardID, replicaID uint64, store IStore, broadcast chan<- events.Event) *Replica {
return &Replica{
ShardID: shardID,
ReplicaID: replicaID,
shardID: shardID,
replicaID: replicaID,
NHID: store.NHID(),
store: store,
leaser: leaser,
Expand Down
10 changes: 5 additions & 5 deletions enterprise/server/raft/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,17 +333,17 @@ func (s *Store) Statusz(ctx context.Context) string {
return true
})
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].ShardID < replicas[j].ShardID
return replicas[i].ShardID() < replicas[j].ShardID()
})
for _, r := range replicas {
replicaName := fmt.Sprintf(" Shard: %5d Replica: %5d", r.ShardID, r.ReplicaID)
replicaName := fmt.Sprintf(" Shard: %5d Replica: %5d", r.ShardID(), r.ReplicaID())
ru, err := r.Usage()
if err != nil {
buf += fmt.Sprintf("%s error: %s\n", replicaName, err)
continue
}
isLeader := 0
if rd := s.lookupRange(r.ShardID); rd != nil {
if rd := s.lookupRange(r.ShardID()); rd != nil {
if s.leaseKeeper.HaveLease(rd.GetRangeId()) {
isLeader = 1
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func (s *Store) validatedRange(header *rfpb.Header) (*replica.Replica, *rfpb.Ran

func (s *Store) HaveLease(rangeID uint64) bool {
if r, err := s.GetReplica(rangeID); err == nil {
return s.leaseKeeper.HaveLease(r.ShardID)
return s.leaseKeeper.HaveLease(r.ShardID())
}
s.log.Warningf("HaveLease check for unheld range: %d", rangeID)
return false
Expand Down Expand Up @@ -853,7 +853,7 @@ func (s *Store) SyncPropose(ctx context.Context, req *rfpb.SyncProposeRequest) (
if err != nil {
return nil, err
}
shardID = r.ShardID
shardID = r.ShardID()
}

batchResponse, err := client.SyncProposeLocal(ctx, s.nodeHost, shardID, req.GetBatch())
Expand Down
Loading