Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: drop raftentry.Cache data in applySnapshot #37055

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions pkg/storage/raftentry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type partition struct {
next, prev *partition // accessed under Cache.mu
}

var partitionSize = int32(unsafe.Sizeof(partition{}))
const partitionSize = int32(unsafe.Sizeof(partition{}))

// rangeCache represents the interface that the partition uses.
// It is never explicitly used but a new implementation to replace ringBuf must
Expand Down Expand Up @@ -131,6 +131,16 @@ func (c *Cache) Metrics() Metrics {
return c.metrics
}

// Drop drops all cached entries associated with the specified range.
func (c *Cache) Drop(id roachpb.RangeID) {
c.mu.Lock()
defer c.mu.Unlock()
p := c.getPartLocked(id, false /* create */, false /* recordUse */)
if p != nil {
c.updateGauges(c.evictPartitionLocked(p))
}
}

// Add inserts ents into the cache. If truncate is true, the method also removes
// all entries with indices equal to or greater than the indices of the entries
// provided. ents is expected to consist of entries with a contiguous sequence
Expand Down Expand Up @@ -269,14 +279,17 @@ func (c *Cache) getPartLocked(id roachpb.RangeID, create, recordUse bool) *parti
func (c *Cache) evictLocked(toAdd int32) {
bytes := c.addBytes(toAdd)
for bytes > c.maxBytes && len(c.parts) > 0 {
p := c.lru.remove(c.lru.back())
pBytes, pEntries := p.evict()
c.addEntries(-1 * pEntries)
bytes = c.addBytes(-1 * pBytes)
delete(c.parts, p.id)
bytes, _ = c.evictPartitionLocked(c.lru.back())
}
}

func (c *Cache) evictPartitionLocked(p *partition) (updatedBytes, updatedEntries int32) {
delete(c.parts, p.id)
c.lru.remove(p)
pBytes, pEntries := p.evict()
return c.addBytes(-1 * pBytes), c.addEntries(-1 * pEntries)
}

// recordUpdate adjusts the partition and cache bookkeeping to account for the
// changes which actually occurred in an update relative to the guess made
// before the update.
Expand Down
73 changes: 57 additions & 16 deletions pkg/storage/raftentry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,28 @@ func TestAddAndTruncate(t *testing.T) {
verifyMetrics(t, c, 4, 36+int64(partitionSize))
}

func TestDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
const (
r1 roachpb.RangeID = 1
r2 roachpb.RangeID = 2

sizeOf9Entries = 81
partitionSize = int64(sizeOf9Entries + partitionSize)
)
c := NewCache(1 << 10)
ents1 := addEntries(c, r1, 1, 10)
verifyGet(t, c, r1, 1, 10, ents1, 10, false)
verifyMetrics(t, c, 9, partitionSize)
ents2 := addEntries(c, r2, 1, 10)
verifyGet(t, c, r2, 1, 10, ents2, 10, false)
verifyMetrics(t, c, 18, 2*partitionSize)
c.Drop(r1)
verifyMetrics(t, c, 9, partitionSize)
c.Drop(r2)
verifyMetrics(t, c, 0, 0)
}

func TestCacheLaterEntries(t *testing.T) {
c := NewCache(1000)
rangeID := roachpb.RangeID(1)
Expand Down Expand Up @@ -337,7 +359,6 @@ func TestConcurrentEvictions(t *testing.T) {
}
c.Clear(r, data[len(data)-1].Index+1)
}

verifyMetrics(t, c, 0, int64(len(c.parts))*int64(partitionSize))
}

Expand Down Expand Up @@ -428,24 +449,44 @@ func TestEntryCacheEviction(t *testing.T) {
func TestConcurrentUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
c := NewCache(10000)
const r1 roachpb.RangeID = 1
ents := []raftpb.Entry{newEntry(20, 35), newEntry(21, 35)}
var wg sync.WaitGroup
// NB: N is chosen based on the race detector's limit of 8128 goroutines.
const N = 8000
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
if i%2 == 1 {
c.Add(1, ents, true)
} else {
c.Clear(1, 22)
// Test using both Clear and Drop to remove the added entries.
for _, clearMethod := range []struct {
name string
clear func()
}{
{"drop", func() { c.Drop(r1) }},
{"clear", func() { c.Clear(r1, ents[len(ents)-1].Index+1) }},
} {
t.Run(clearMethod.name, func(t *testing.T) {
// NB: N is chosen based on the race detector's limit of 8128 goroutines.
const N = 8000
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
if i%2 == 1 {
c.Add(r1, ents, true)
} else {
clearMethod.clear()
}
wg.Done()
}(i)
}
wg.Done()
}(i)
wg.Wait()
clearMethod.clear()
// Clear does not evict the partition struct itself so we expect the cache
// to have a partition's initial byte size when using Clear and nothing
// when using Drop.
switch clearMethod.name {
case "drop":
verifyMetrics(t, c, 0, 0)
case "clear":
verifyMetrics(t, c, 0, int64(initialSize.bytes()))
}
})
}
wg.Wait()
c.Clear(1, 22)
verifyMetrics(t, c, 0, int64(initialSize.bytes()))
}

func TestPartitionList(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,9 @@ func (r *Replica) applySnapshot(
if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil {
return err
}
// Clear the cached raft log entries to ensure that old or uncommitted
// entries don't impact the in-memory state.
r.store.raftEntryCache.Drop(r.RangeID)
stats.clear = timeutil.Now()

// Write the snapshot into the range.
Expand Down