Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: do lazy map allocations in replicaFlowControl #113150

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/kv/kvserver/flow_control_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ func (rf *replicaFlowControl) getPausedFollowers() map[roachpb.ReplicaID]struct{

func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{} {
rf.assertLocked()
behindFollowers := make(map[roachpb.ReplicaID]struct{})
// Lazily allocate the map, since expected to be empty.
var behindFollowers map[roachpb.ReplicaID]struct{}
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
rf.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress rafttracker.Progress) {
if progress.State == rafttracker.StateReplicate {
return
}

if behindFollowers == nil {
behindFollowers = make(map[roachpb.ReplicaID]struct{})
}
replID := roachpb.ReplicaID(id)
behindFollowers[replID] = struct{}{}

Expand All @@ -90,12 +93,16 @@ func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{

func (rf *replicaFlowControl) getInactiveFollowers() map[roachpb.ReplicaID]struct{} {
rf.assertLocked()
inactiveFollowers := make(map[roachpb.ReplicaID]struct{})
// Lazily allocate the map, since expected to be empty.
var inactiveFollowers map[roachpb.ReplicaID]struct{}
for _, desc := range rf.getDescriptor().Replicas().Descriptors() {
if desc.ReplicaID == rf.getReplicaID() {
continue
}
if !rf.mu.lastUpdateTimes.isFollowerActiveSince(desc.ReplicaID, timeutil.Now(), rf.store.cfg.RangeLeaseDuration) {
if inactiveFollowers == nil {
inactiveFollowers = make(map[roachpb.ReplicaID]struct{})
}
inactiveFollowers[desc.ReplicaID] = struct{}{}
}
}
Expand All @@ -104,12 +111,16 @@ func (rf *replicaFlowControl) getInactiveFollowers() map[roachpb.ReplicaID]struc

func (rf *replicaFlowControl) getDisconnectedFollowers() map[roachpb.ReplicaID]struct{} {
rf.assertLocked()
disconnectedFollowers := make(map[roachpb.ReplicaID]struct{})
// Lazily allocate the map, since expected to be empty.
var disconnectedFollowers map[roachpb.ReplicaID]struct{}
for _, desc := range rf.getDescriptor().Replicas().Descriptors() {
if desc.ReplicaID == rf.getReplicaID() {
continue
}
if !rf.store.raftTransportForFlowControl.isConnectedTo(desc.StoreID) {
if disconnectedFollowers == nil {
disconnectedFollowers = make(map[roachpb.ReplicaID]struct{})
}
disconnectedFollowers[desc.ReplicaID] = struct{}{}
}
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/flow_control_replica_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func (f *replicaFlowControlIntegrationImpl) handle() (kvflowcontrol.Handle, bool
// reconnect previously disconnected streams if we're able.
func (f *replicaFlowControlIntegrationImpl) refreshStreams(ctx context.Context, reason string) {
f.disconnectStreams(ctx, f.notActivelyReplicatingTo(), reason)
// TODO(sumeer): we call notActivelyReplicatingTo() again in tryReconnect(),
// which is wasteful, since refreshStreams is called on every raft tick.
// Simply pass the return value from the call above to the following method.
f.tryReconnect(ctx)
}

Expand All @@ -296,6 +299,11 @@ func (f *replicaFlowControlIntegrationImpl) refreshStreams(ctx context.Context,
// than its last position (I4), replicas on dead nodes (I2), replicas we're not
// connected to via the raft transport (I1), and paused followers (I3).
func (f *replicaFlowControlIntegrationImpl) notActivelyReplicatingTo() []roachpb.ReplicaDescriptor {
// These methods return maps, which are mostly lazily allocated, since they
// are expected to be empty. If we need to avoid even the lazy allocation,
// we could use the fact that the contents of these maps are used while
// holding replicaFlowControl.mu, so the allocations could be done once, and
// kept as members of replicaFlowControl.
pausedFollowers := f.replicaForFlowControl.getPausedFollowers()
behindFollowers := f.replicaForFlowControl.getBehindFollowers()
inactiveFollowers := f.replicaForFlowControl.getInactiveFollowers()
Expand Down