From ca112d645a95ba8856401a0c60b5691609dd6063 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 23 Apr 2019 17:25:49 -0400 Subject: [PATCH 1/2] raftentry: add Drop method to cache to efficiently clear a range Release note: None --- pkg/storage/raftentry/cache.go | 25 +++++++--- pkg/storage/raftentry/cache_test.go | 73 ++++++++++++++++++++++------- 2 files changed, 76 insertions(+), 22 deletions(-) diff --git a/pkg/storage/raftentry/cache.go b/pkg/storage/raftentry/cache.go index 5627290c92ed..5307c4c867bc 100644 --- a/pkg/storage/raftentry/cache.go +++ b/pkg/storage/raftentry/cache.go @@ -96,7 +96,7 @@ type partition struct { next, prev *partition // accessed under Cache.mu } -var partitionSize = int32(unsafe.Sizeof(partition{})) +const partitionSize = int32(unsafe.Sizeof(partition{})) // rangeCache represents the interface that the partition uses. // It is never explicitly used but a new implementation to replace ringBuf must @@ -131,6 +131,16 @@ func (c *Cache) Metrics() Metrics { return c.metrics } +// Drop drops all cached entries associated with the specified range. +func (c *Cache) Drop(id roachpb.RangeID) { + c.mu.Lock() + defer c.mu.Unlock() + p := c.getPartLocked(id, false /* create */, false /* recordUse */) + if p != nil { + c.updateGauges(c.evictPartitionLocked(p)) + } +} + // Add inserts ents into the cache. If truncate is true, the method also removes // all entries with indices equal to or greater than the indices of the entries // provided. ents is expected to consist of entries with a contiguous sequence @@ -269,14 +279,17 @@ func (c *Cache) getPartLocked(id roachpb.RangeID, create, recordUse bool) *parti func (c *Cache) evictLocked(toAdd int32) { bytes := c.addBytes(toAdd) for bytes > c.maxBytes && len(c.parts) > 0 { - p := c.lru.remove(c.lru.back()) - pBytes, pEntries := p.evict() - c.addEntries(-1 * pEntries) - bytes = c.addBytes(-1 * pBytes) - delete(c.parts, p.id) + bytes, _ = c.evictPartitionLocked(c.lru.back()) } } +func (c *Cache) evictPartitionLocked(p *partition) (updatedBytes, updatedEntries int32) { + delete(c.parts, p.id) + c.lru.remove(p) + pBytes, pEntries := p.evict() + return c.addBytes(-1 * pBytes), c.addEntries(-1 * pEntries) +} + // recordUpdate adjusts the partition and cache bookkeeping to account for the // changes which actually occurred in an update relative to the guess made // before the update. diff --git a/pkg/storage/raftentry/cache_test.go b/pkg/storage/raftentry/cache_test.go index 446581ef7f5e..2ac67ad60da3 100644 --- a/pkg/storage/raftentry/cache_test.go +++ b/pkg/storage/raftentry/cache_test.go @@ -193,6 +193,28 @@ func TestAddAndTruncate(t *testing.T) { verifyMetrics(t, c, 4, 36+int64(partitionSize)) } +func TestDrop(t *testing.T) { + defer leaktest.AfterTest(t)() + const ( + r1 roachpb.RangeID = 1 + r2 roachpb.RangeID = 2 + + sizeOf9Entries = 81 + partitionSize = int64(sizeOf9Entries + partitionSize) + ) + c := NewCache(1 << 10) + ents1 := addEntries(c, r1, 1, 10) + verifyGet(t, c, r1, 1, 10, ents1, 10, false) + verifyMetrics(t, c, 9, partitionSize) + ents2 := addEntries(c, r2, 1, 10) + verifyGet(t, c, r2, 1, 10, ents2, 10, false) + verifyMetrics(t, c, 18, 2*partitionSize) + c.Drop(r1) + verifyMetrics(t, c, 9, partitionSize) + c.Drop(r2) + verifyMetrics(t, c, 0, 0) +} + func TestCacheLaterEntries(t *testing.T) { c := NewCache(1000) rangeID := roachpb.RangeID(1) @@ -337,7 +359,6 @@ func TestConcurrentEvictions(t *testing.T) { } c.Clear(r, data[len(data)-1].Index+1) } - verifyMetrics(t, c, 0, int64(len(c.parts))*int64(partitionSize)) } @@ -428,24 +449,44 @@ func TestEntryCacheEviction(t *testing.T) { func TestConcurrentUpdates(t *testing.T) { defer leaktest.AfterTest(t)() c := NewCache(10000) + const r1 roachpb.RangeID = 1 ents := []raftpb.Entry{newEntry(20, 35), newEntry(21, 35)} - var wg sync.WaitGroup - // NB: N is chosen based on the race detector's limit of 8128 goroutines. - const N = 8000 - wg.Add(N) - for i := 0; i < N; i++ { - go func(i int) { - if i%2 == 1 { - c.Add(1, ents, true) - } else { - c.Clear(1, 22) + // Test using both Clear and Drop to remove the added entries. + for _, clearMethod := range []struct { + name string + clear func() + }{ + {"drop", func() { c.Drop(r1) }}, + {"clear", func() { c.Clear(r1, ents[len(ents)-1].Index+1) }}, + } { + t.Run(clearMethod.name, func(t *testing.T) { + // NB: N is chosen based on the race detector's limit of 8128 goroutines. + const N = 8000 + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + go func(i int) { + if i%2 == 1 { + c.Add(r1, ents, true) + } else { + clearMethod.clear() + } + wg.Done() + }(i) } - wg.Done() - }(i) + wg.Wait() + clearMethod.clear() + // Clear does not evict the partition struct itself so we expect the cache + // to have a partition's initial byte size when using Clear and nothing + // when using Drop. + switch clearMethod.name { + case "drop": + verifyMetrics(t, c, 0, 0) + case "clear": + verifyMetrics(t, c, 0, int64(initialSize.bytes())) + } + }) } - wg.Wait() - c.Clear(1, 22) - verifyMetrics(t, c, 0, int64(initialSize.bytes())) } func TestPartitionList(t *testing.T) { From 0071acdd2162210cb6f7918dc77fee09145e0b34 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 23 Apr 2019 17:31:35 -0400 Subject: [PATCH 2/2] storage: drop raftentry.Cache data in applySnapshot Add logic to clear cached raft log entries when applying a snapshot. Release note: None --- pkg/storage/replica_raftstorage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 65bd758c584a..b16e2398ddaf 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -856,6 +856,9 @@ func (r *Replica) applySnapshot( if err := clearRangeData(ctx, s.Desc, r.store.Engine(), batch, true /* destroyData */); err != nil { return err } + // Clear the cached raft log entries to ensure that old or uncommitted + // entries don't impact the in-memory state. + r.store.raftEntryCache.Drop(r.RangeID) stats.clear = timeutil.Now() // Write the snapshot into the range.