Skip to content

Commit

Permalink
Merge pull request #2163 from tamird/fewer-desc-reads
Browse files Browse the repository at this point in the history
DRY use of `(*Replica).Desc()`
  • Loading branch information
tamird committed Aug 19, 2015
2 parents 4ec5bf2 + 01e6cf0 commit 2928f5e
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 63 deletions.
13 changes: 8 additions & 5 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (gcq *gcQueue) shouldQueue(now proto.Timestamp, repl *Replica) (shouldQ boo
// intentAgeThreshold.
func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error {
snap := repl.rm.Engine().NewSnapshot()
iter := newRangeDataIterator(repl.Desc(), snap)
desc := repl.Desc()
iter := newRangeDataIterator(desc, snap)
defer iter.Close()
defer snap.Close()

Expand All @@ -131,7 +132,7 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error {
gcArgs := &proto.GCRequest{
RequestHeader: proto.RequestHeader{
Timestamp: now,
RangeID: repl.Desc().RangeID,
RangeID: desc.RangeID,
},
}
var mu sync.Mutex
Expand Down Expand Up @@ -316,15 +317,17 @@ func (gcq *gcQueue) lookupGCPolicy(repl *Replica) (config.GCPolicy, error) {
return config.GCPolicy{}, util.Errorf("gossiped info is not a prefix configuration map: %+v", info)
}

desc := repl.Desc()

// Verify that the replica's range doesn't cross over the zone config
// prefix. This could be the case if the zone config is new and the range
// hasn't been split yet along the new boundary.
var gc *config.GCPolicy
if err = configMap.VisitPrefixesHierarchically(repl.Desc().StartKey, func(start, end proto.Key, cfg config.ConfigUnion) (bool, error) {
if err = configMap.VisitPrefixesHierarchically(desc.StartKey, func(start, end proto.Key, cfg config.ConfigUnion) (bool, error) {
zone := cfg.GetValue().(*config.ZoneConfig)
if zone.GC != nil {
repl.RLock()
isCovered := !end.Less(repl.Desc().EndKey)
isCovered := !end.Less(desc.EndKey)
repl.RUnlock()
if !isCovered {
return false, util.Errorf("replica is only partially covered by zone %s (%q-%q); must wait for range split", cfg, start, end)
Expand All @@ -342,7 +345,7 @@ func (gcq *gcQueue) lookupGCPolicy(repl *Replica) (config.GCPolicy, error) {

// We should always match _at least_ the default GC.
if gc == nil {
return config.GCPolicy{}, util.Errorf("no zone for range with start key %q", repl.Desc().StartKey)
return config.GCPolicy{}, util.Errorf("no zone for range with start key %q", desc.StartKey)
}
return *gc, nil
}
7 changes: 5 additions & 2 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ func (bq *baseQueue) addInternal(repl *Replica, should bool, priority float64) e
if atomic.LoadInt32(&bq.disabled) == 1 {
return errQueueDisabled
}
item, ok := bq.replicas[repl.Desc().RangeID]

rangeID := repl.Desc().RangeID

item, ok := bq.replicas[rangeID]
if !should {
if ok {
bq.remove(item.index)
Expand All @@ -203,7 +206,7 @@ func (bq *baseQueue) addInternal(repl *Replica, should bool, priority float64) e
}
item = &replicaItem{value: repl, priority: priority}
heap.Push(&bq.priorityQ, item)
bq.replicas[repl.Desc().RangeID] = item
bq.replicas[rangeID] = item

// If adding this replica has pushed the queue past its maximum size,
// remove the lowest priority element.
Expand Down
14 changes: 8 additions & 6 deletions storage/range_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (q *rangeGCQueue) shouldQueue(now proto.Timestamp, rng *Replica) (bool, flo
// process performs a consistent lookup on the range descriptor to see if we are
// still a member of the range.
func (q *rangeGCQueue) process(now proto.Timestamp, rng *Replica) error {
desc := rng.Desc()

// Calls to RangeLookup typically use inconsistent reads, but we
// want to do a consistent read here. This is important when we are
// considering one of the metadata ranges: we must not do an
Expand All @@ -83,7 +85,7 @@ func (q *rangeGCQueue) process(now proto.Timestamp, rng *Replica) error {
b.InternalAddCall(proto.Call{
Args: &proto.RangeLookupRequest{
RequestHeader: proto.RequestHeader{
Key: keys.RangeMetaKey(rng.Desc().StartKey),
Key: keys.RangeMetaKey(desc.StartKey),
},
MaxRanges: 1,
},
Expand All @@ -96,11 +98,11 @@ func (q *rangeGCQueue) process(now proto.Timestamp, rng *Replica) error {
if len(reply.Ranges) != 1 {
return util.Errorf("expected 1 range descriptor, got %d", len(reply.Ranges))
}
desc := reply.Ranges[0]

replyDesc := reply.Ranges[0]
currentMember := false
if me := rng.GetReplica(); me != nil {
for _, rep := range desc.Replicas {
for _, rep := range replyDesc.Replicas {
if rep.StoreID == me.StoreID {
currentMember = true
break
Expand All @@ -111,7 +113,7 @@ func (q *rangeGCQueue) process(now proto.Timestamp, rng *Replica) error {
if !currentMember {
// We are no longer a member of this range; clean up our local data.
if log.V(1) {
log.Infof("destroying local data from range %d", rng.Desc().RangeID)
log.Infof("destroying local data from range %d", desc.RangeID)
}
if err := rng.rm.RemoveReplica(rng); err != nil {
return err
Expand All @@ -122,13 +124,13 @@ func (q *rangeGCQueue) process(now proto.Timestamp, rng *Replica) error {
if err := rng.Destroy(); err != nil {
return err
}
} else if desc.RangeID != rng.Desc().RangeID {
} else if desc.RangeID != desc.RangeID {
// If we get a different range ID back, then the range has been merged
// away. But currentMember is true, so we are still a member of the
// subsuming range. Shut down raft processing for the former range
// and delete any remaining metadata, but do not delete the data.
if log.V(1) {
log.Infof("removing merged range %d", rng.Desc().RangeID)
log.Infof("removing merged range %d", desc.RangeID)
}
if err := rng.rm.RemoveReplica(rng); err != nil {
return err
Expand Down
24 changes: 15 additions & 9 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ func NewReplica(desc *proto.RangeDescriptor, rm rangeManager) (*Replica, error)

// String returns a string representation of the range.
func (r *Replica) String() string {
return fmt.Sprintf("range=%d [%s-%s)", r.Desc().RangeID, r.Desc().StartKey, r.Desc().EndKey)
desc := r.Desc()
return fmt.Sprintf("range=%d [%s-%s)", desc.RangeID, desc.StartKey, desc.EndKey)
}

// Destroy cleans up all data associated with this range.
Expand Down Expand Up @@ -312,11 +313,13 @@ func (r *Replica) getLease() *proto.Lease {
func (r *Replica) newNotLeaderError(l *proto.Lease, originNode proto.RaftNodeID) error {
err := &proto.NotLeaderError{}
if l != nil && l.RaftNodeID != 0 {
err.RangeID = r.Desc().RangeID
desc := r.Desc()

err.RangeID = desc.RangeID
_, originStoreID := proto.DecodeRaftNodeID(originNode)
_, err.Replica = r.Desc().FindReplica(originStoreID)
_, err.Replica = desc.FindReplica(originStoreID)
_, storeID := proto.DecodeRaftNodeID(proto.RaftNodeID(l.RaftNodeID))
_, err.Leader = r.Desc().FindReplica(storeID)
_, err.Leader = desc.FindReplica(storeID)
}
return err
}
Expand All @@ -330,15 +333,16 @@ func (r *Replica) requestLeaderLease(timestamp proto.Timestamp) error {
duration := int64(DefaultLeaderLeaseDuration)
// Prepare a Raft command to get a leader lease for this replica.
expiration := timestamp.Add(duration, 0)
desc := r.Desc()
args := &proto.LeaderLeaseRequest{
RequestHeader: proto.RequestHeader{
Key: r.Desc().StartKey,
Key: desc.StartKey,
Timestamp: timestamp,
CmdID: proto.ClientCmdID{
WallTime: r.rm.Clock().Now().WallTime,
Random: rand.Int63(),
},
RangeID: r.Desc().RangeID,
RangeID: desc.RangeID,
},
Lease: proto.Lease{
Start: timestamp,
Expand Down Expand Up @@ -985,6 +989,8 @@ func (r *Replica) maybeGossipFirstRange() error {

ctx := r.context()

desc := *r.Desc()

// Gossip the cluster ID from all replicas of the first range.
if log.V(1) {
log.Infoc(ctx, "gossiping cluster id %s from store %d, range %d", r.rm.ClusterID(),
Expand All @@ -998,15 +1004,15 @@ func (r *Replica) maybeGossipFirstRange() error {
return err
}
if log.V(1) {
log.Infoc(ctx, "gossiping sentinel from store %d, range %d", r.rm.StoreID(), r.Desc().RangeID)
log.Infoc(ctx, "gossiping sentinel from store %d, range %d", r.rm.StoreID(), desc.RangeID)
}
if err := r.rm.Gossip().AddInfo(gossip.KeySentinel, r.rm.ClusterID(), clusterIDGossipTTL); err != nil {
log.Errorc(ctx, "failed to gossip cluster ID: %s", err)
}
if log.V(1) {
log.Infoc(ctx, "gossiping first range from store %d, range %d", r.rm.StoreID(), r.Desc().RangeID)
log.Infoc(ctx, "gossiping first range from store %d, range %d", r.rm.StoreID(), desc.RangeID)
}
if err := r.rm.Gossip().AddInfo(gossip.KeyFirstRangeDescriptor, *r.Desc(), configGossipTTL); err != nil {
if err := r.rm.Gossip().AddInfo(gossip.KeyFirstRangeDescriptor, desc, configGossipTTL); err != nil {
log.Errorc(ctx, "failed to gossip first range metadata: %s", err)
}
return nil
Expand Down
29 changes: 17 additions & 12 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, args
// Resolve any explicit intents. All that are local to this range get
// resolved synchronously in the same batch. The remainder are collected
// and handed off to asynchronous processing.
desc := *(r.Desc())
desc := *r.Desc()
if wideDesc := args.GetInternalCommitTrigger().GetMergeTrigger().GetUpdatedDesc(); wideDesc.RangeID != 0 {
// If this is a merge, then use the post-merge descriptor to determine
// which intents are local (note that for a split, we want to use the
Expand Down Expand Up @@ -922,8 +922,9 @@ func (r *Replica) TruncateLog(batch engine.Engine, ms *engine.MVCCStats, args pr
if err != nil {
return reply, err
}
start := keys.RaftLogKey(r.Desc().RangeID, 0)
end := keys.RaftLogKey(r.Desc().RangeID, args.Index)
rangeID := r.Desc().RangeID
start := keys.RaftLogKey(rangeID, 0)
end := keys.RaftLogKey(rangeID, args.Index)
if err = batch.Iterate(engine.MVCCEncodeKey(start), engine.MVCCEncodeKey(end), func(kv proto.RawKeyValue) (bool, error) {
return false, batch.Clear(kv.Key)
}); err != nil {
Expand All @@ -933,7 +934,7 @@ func (r *Replica) TruncateLog(batch engine.Engine, ms *engine.MVCCStats, args pr
Index: args.Index - 1,
Term: term,
}
return reply, engine.MVCCPutProto(batch, ms, keys.RaftTruncatedStateKey(r.Desc().RangeID), proto.ZeroTimestamp, nil, &ts)
return reply, engine.MVCCPutProto(batch, ms, keys.RaftTruncatedStateKey(rangeID), proto.ZeroTimestamp, nil, &ts)
}

// LeaderLease sets the leader lease for this range. The command fails
Expand Down Expand Up @@ -998,8 +999,10 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr

args.Lease.Start = effectiveStart

rangeID := r.Desc().RangeID

// Store the lease to disk & in-memory.
if err := engine.MVCCPutProto(batch, ms, keys.RaftLeaderLeaseKey(r.Desc().RangeID), proto.ZeroTimestamp, nil, &args.Lease); err != nil {
if err := engine.MVCCPutProto(batch, ms, keys.RaftLeaderLeaseKey(rangeID), proto.ZeroTimestamp, nil, &args.Lease); err != nil {
return reply, err
}
atomic.StorePointer(&r.lease, unsafe.Pointer(&args.Lease))
Expand All @@ -1011,7 +1014,7 @@ func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, args pr
// node.
if r.getLease().RaftNodeID == r.rm.RaftNodeID() && prevLease.RaftNodeID != r.getLease().RaftNodeID {
r.tsCache.SetLowWater(prevLease.Expiration.Add(int64(r.rm.Clock().MaxOffset()), 0))
log.Infof("range %d: new leader lease %s", r.Desc().RangeID, args.Lease)
log.Infof("range %d: new leader lease %s", rangeID, args.Lease)
}

// Gossip configs in the event this range contains config info.
Expand Down Expand Up @@ -1129,8 +1132,9 @@ func (r *Replica) AdminSplit(args proto.AdminSplitRequest) (proto.AdminSplitResp
// recomputes stats for both the existing, updated range and the new
// range.
func (r *Replica) splitTrigger(batch engine.Engine, split *proto.SplitTrigger) error {
if !bytes.Equal(r.Desc().StartKey, split.UpdatedDesc.StartKey) ||
!bytes.Equal(r.Desc().EndKey, split.NewDesc.EndKey) {
desc := r.Desc()
if !bytes.Equal(desc.StartKey, split.UpdatedDesc.StartKey) ||
!bytes.Equal(desc.EndKey, split.NewDesc.EndKey) {
return util.Errorf("range does not match splits: (%s-%s) + (%s-%s) != %s",
split.UpdatedDesc.StartKey, split.UpdatedDesc.EndKey,
split.NewDesc.StartKey, split.NewDesc.EndKey, r)
Expand Down Expand Up @@ -1299,14 +1303,15 @@ func (r *Replica) AdminMerge(args proto.AdminMergeRequest) (proto.AdminMergeResp
// mergeTrigger is called on a successful commit of an AdminMerge
// transaction. It recomputes stats for the receiving range.
func (r *Replica) mergeTrigger(batch engine.Engine, merge *proto.MergeTrigger) error {
if !bytes.Equal(r.Desc().StartKey, merge.UpdatedDesc.StartKey) {
desc := r.Desc()
if !bytes.Equal(desc.StartKey, merge.UpdatedDesc.StartKey) {
return util.Errorf("range and updated range start keys do not match: %s != %s",
r.Desc().StartKey, merge.UpdatedDesc.StartKey)
desc.StartKey, merge.UpdatedDesc.StartKey)
}

if !r.Desc().EndKey.Less(merge.UpdatedDesc.EndKey) {
if !desc.EndKey.Less(merge.UpdatedDesc.EndKey) {
return util.Errorf("range end key is not less than the post merge end key: %s >= %s",
r.Desc().EndKey, merge.UpdatedDesc.EndKey)
desc.EndKey, merge.UpdatedDesc.EndKey)
}

if merge.SubsumedRangeID <= 0 {
Expand Down
31 changes: 20 additions & 11 deletions storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var _ multiraft.WriteableGroupStorage = &Replica{}
// InitialState implements the raft.Storage interface.
func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
var hs raftpb.HardState
found, err := engine.MVCCGetProto(r.rm.Engine(), keys.RaftHardStateKey(r.Desc().RangeID),
desc := r.Desc()
found, err := engine.MVCCGetProto(r.rm.Engine(), keys.RaftHardStateKey(desc.RangeID),
proto.ZeroTimestamp, true, nil, &hs)
if err != nil {
return raftpb.HardState{}, raftpb.ConfState{}, err
Expand All @@ -60,7 +61,7 @@ func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
var cs raftpb.ConfState
// For uninitalized ranges, membership is unknown at this point.
if found || r.isInitialized() {
for _, rep := range r.Desc().Replicas {
for _, rep := range desc.Replicas {
cs.Nodes = append(cs.Nodes, uint64(proto.MakeRaftNodeID(rep.NodeID, rep.StoreID)))
}
}
Expand Down Expand Up @@ -89,9 +90,11 @@ func (r *Replica) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
return maxBytes > 0 && size > maxBytes, nil
}

rangeID := r.Desc().RangeID

_, err := engine.MVCCIterate(r.rm.Engine(),
keys.RaftLogKey(r.Desc().RangeID, lo),
keys.RaftLogKey(r.Desc().RangeID, hi),
keys.RaftLogKey(rangeID, lo),
keys.RaftLogKey(rangeID, hi),
proto.ZeroTimestamp,
true /* consistent */, nil /* txn */, false /* !reverse */, scanFunc)

Expand Down Expand Up @@ -241,11 +244,13 @@ func (r *Replica) Snapshot() (raftpb.Snapshot, error) {
if err != nil {
return raftpb.Snapshot{}, err
}

curDesc := r.Desc()
var desc proto.RangeDescriptor
// We ignore intents on the range descriptor (consistent=false) because we
// know they cannot be committed yet; operations that modify range
// descriptors resolve their own intents when they commit.
ok, err := engine.MVCCGetProto(snap, keys.RangeDescriptorKey(r.Desc().StartKey),
ok, err := engine.MVCCGetProto(snap, keys.RangeDescriptorKey(curDesc.StartKey),
r.rm.Clock().Now(), false /* !consistent */, nil, &desc)
if err != nil {
return raftpb.Snapshot{}, util.Errorf("failed to get desc: %s", err)
Expand All @@ -259,7 +264,7 @@ func (r *Replica) Snapshot() (raftpb.Snapshot, error) {

// Iterate over all the data in the range, including local-only data like
// the response cache.
for iter := newRangeDataIterator(r.Desc(), snap); iter.Valid(); iter.Next() {
for iter := newRangeDataIterator(curDesc, snap); iter.Valid(); iter.Next() {
snapData.KV = append(snapData.KV,
&proto.RaftSnapshotData_KeyValue{Key: iter.Key(), Value: iter.Value()})
}
Expand Down Expand Up @@ -298,8 +303,10 @@ func (r *Replica) Append(entries []raftpb.Entry) error {
batch := r.rm.Engine().NewBatch()
defer batch.Close()

rangeID := r.Desc().RangeID

for _, ent := range entries {
err := engine.MVCCPutProto(batch, nil, keys.RaftLogKey(r.Desc().RangeID, ent.Index),
err := engine.MVCCPutProto(batch, nil, keys.RaftLogKey(rangeID, ent.Index),
proto.ZeroTimestamp, nil, &ent)
if err != nil {
return err
Expand All @@ -310,14 +317,14 @@ func (r *Replica) Append(entries []raftpb.Entry) error {
// Delete any previously appended log entries which never committed.
for i := lastIndex + 1; i <= prevLastIndex; i++ {
err := engine.MVCCDelete(batch, nil,
keys.RaftLogKey(r.Desc().RangeID, i), proto.ZeroTimestamp, nil)
keys.RaftLogKey(rangeID, i), proto.ZeroTimestamp, nil)
if err != nil {
return err
}
}

// Commit the batch and update the last index.
if err := setLastIndex(batch, r.Desc().RangeID, lastIndex); err != nil {
if err := setLastIndex(batch, rangeID, lastIndex); err != nil {
return err
}
if err := batch.Commit(); err != nil {
Expand Down Expand Up @@ -360,9 +367,11 @@ func (r *Replica) ApplySnapshot(snap raftpb.Snapshot) error {
return err
}

rangeID := r.Desc().RangeID

// First, save the HardState. The HardState must not be changed
// because it may record a previous vote cast by this node.
hardStateKey := keys.RaftHardStateKey(r.Desc().RangeID)
hardStateKey := keys.RaftHardStateKey(rangeID)
hardState, _, err := engine.MVCCGet(r.rm.Engine(), hardStateKey, proto.ZeroTimestamp, true /* consistent */, nil)
if err != nil {
return err
Expand Down Expand Up @@ -423,7 +432,7 @@ func (r *Replica) ApplySnapshot(snap raftpb.Snapshot) error {
// performance implications are not likely to be drastic. If our feelings
// about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
if err := setLastIndex(batch, r.Desc().RangeID, snap.Metadata.Index); err != nil {
if err := setLastIndex(batch, rangeID, snap.Metadata.Index); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 2928f5e

Please sign in to comment.