Skip to content

Commit

Permalink
Move the guts of Store.RemoveReplica to the processRaft thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
bdarnell committed Sep 3, 2015
1 parent 7cf8032 commit 0d559e9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 36 deletions.
20 changes: 20 additions & 0 deletions storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,23 @@ func TestRaftAfterRemoveRange(t *testing.T) {
// Execute another replica change to ensure that MultiRaft has processed the heartbeat just sent.
mtc.replicateRange(proto.RangeID(1), 0, 1)
}

// TestRaftRemoveRace adds and removes a replica repeatedly in an
// attempt to reproduce a race
// (https://github.com/cockroachdb/cockroach/issues/1911). Note that
// 10 repetitions is not enough to reliably reproduce the problem, but
// it's better than any other tests we have for this (increasing the
// number of repetitions adds an unacceptable amount of test runtime).
func TestRaftRemoveRace(t *testing.T) {
defer leaktest.AfterTest(t)
mtc := startMultiTestContext(t, 3)
defer mtc.Stop()

rangeID := proto.RangeID(1)
mtc.replicateRange(rangeID, 0, 1, 2)

for i := 0; i < 10; i++ {
mtc.unreplicateRange(rangeID, 0, 2)
mtc.replicateRange(rangeID, 0, 2)
}
}
94 changes: 58 additions & 36 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,26 @@ func (rs *storeRangeSet) EstimatedCount() int {
// A Store maintains a map of ranges by start key. A Store corresponds
// to one physical device.
type Store struct {
Ident proto.StoreIdent
ctx StoreContext
db *client.DB
engine engine.Engine // The underlying key-value store
_allocator allocator // Makes allocation decisions
rangeIDAlloc *idAllocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
_splitQueue *splitQueue // Range splitting queue
verifyQueue *verifyQueue // Checksum verification queue
replicateQueue replicateQueue // Replication queue
_rangeGCQueue *rangeGCQueue // Range GC queue
scanner *replicaScanner // Range scanner
feed StoreEventFeed // Event Feed
multiraft *multiraft.MultiRaft
started int32
stopper *stop.Stopper
startedAt int64
nodeDesc *proto.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks
Ident proto.StoreIdent
ctx StoreContext
db *client.DB
engine engine.Engine // The underlying key-value store
_allocator allocator // Makes allocation decisions
rangeIDAlloc *idAllocator // Range ID allocator
gcQueue *gcQueue // Garbage collection queue
_splitQueue *splitQueue // Range splitting queue
verifyQueue *verifyQueue // Checksum verification queue
replicateQueue replicateQueue // Replication queue
_rangeGCQueue *rangeGCQueue // Range GC queue
scanner *replicaScanner // Range scanner
feed StoreEventFeed // Event Feed
removeReplicaChan chan removeReplicaOp
multiraft *multiraft.MultiRaft
started int32
stopper *stop.Stopper
startedAt int64
nodeDesc *proto.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks

mu sync.RWMutex // Protects variables below...
replicas map[proto.RangeID]*Replica // Map of replicas by Range ID
Expand Down Expand Up @@ -358,14 +359,15 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *proto.NodeDescripto
}

s := &Store{
ctx: ctx,
db: ctx.DB, // TODO(tschottdorf) remove redundancy.
engine: eng,
_allocator: makeAllocator(ctx.StorePool),
replicas: map[proto.RangeID]*Replica{},
replicasByKey: btree.New(64 /* degree */),
uninitReplicas: map[proto.RangeID]*Replica{},
nodeDesc: nodeDesc,
ctx: ctx,
db: ctx.DB, // TODO(tschottdorf) remove redundancy.
engine: eng,
_allocator: makeAllocator(ctx.StorePool),
replicas: map[proto.RangeID]*Replica{},
replicasByKey: btree.New(64 /* degree */),
uninitReplicas: map[proto.RangeID]*Replica{},
nodeDesc: nodeDesc,
removeReplicaChan: make(chan removeReplicaOp),
}

// Add range scanner and configure with queues.
Expand Down Expand Up @@ -1060,7 +1062,7 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error {

// MergeRange expands the subsuming range to absorb the subsumed range.
// This merge operation will fail if the two ranges are not collocated
// on the same store.
// on the same store. Must be called from the processRaft goroutine.
func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey proto.Key, subsumedRangeID proto.RangeID) error {
subsumingDesc := subsumingRng.Desc()

Expand All @@ -1080,8 +1082,9 @@ func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey proto.Key, subsu
subsumedDesc.Replicas, subsumingDesc.Replicas)
}

// Remove and destroy the subsumed range.
if err = s.RemoveReplica(subsumedRng); err != nil {
// Remove and destroy the subsumed range. Note that we are on the
// processRaft goroutine so we can call removeReplicaImpl directly.
if err = s.removeReplicaImpl(subsumedRng); err != nil {
return util.Errorf("cannot remove range %s", err)
}

Expand Down Expand Up @@ -1145,24 +1148,40 @@ func (s *Store) addReplicaToRangeMap(rng *Replica) error {
return nil
}

type removeReplicaOp struct {
rep *Replica
ch chan<- error
}

// RemoveReplica removes the replica from the store's replica map and from
// the sorted replicasByKey btree.
func (s *Store) RemoveReplica(rng *Replica) error {
rangeID := rng.Desc().RangeID
func (s *Store) RemoveReplica(rep *Replica) error {

ch := make(chan error)
s.removeReplicaChan <- removeReplicaOp{rep, ch}
return <-ch
}

// removeReplicaImpl runs on the processRaft goroutine.
func (s *Store) removeReplicaImpl(rep *Replica) error {
rangeID := rep.Desc().RangeID

// RemoveGroup needs to access the storage, which in turn needs the
// lock. Some care is needed to avoid deadlocks.
// lock. Some care is needed to avoid deadlocks. We remove the group
// from multiraft outside the scope of s.mu; this is effectively
// synchronized by the fact that this method runs on the processRaft
// goroutine.
if err := s.multiraft.RemoveGroup(rangeID); err != nil {
return err
}
s.mu.Lock()
defer s.mu.Unlock()

delete(s.replicas, rangeID)
if s.replicasByKey.Delete(rng) == nil {
return util.Errorf("couldn't find range in rangesByKey btree")
if s.replicasByKey.Delete(rep) == nil {
return util.Errorf("couldn't find range in replicasByKey btree")
}
s.scanner.RemoveReplica(rng)
s.scanner.RemoveReplica(rep)
return nil
}

Expand Down Expand Up @@ -1567,6 +1586,9 @@ func (s *Store) processRaft() {
callback(err)
}

case op := <-s.removeReplicaChan:
op.ch <- s.removeReplicaImpl(op.rep)

case <-s.stopper.ShouldStop():
return
}
Expand Down

0 comments on commit 0d559e9

Please sign in to comment.