Skip to content

Commit

Permalink
Prune in-memory blocks from missing tenants (#314)
Browse files Browse the repository at this point in the history
* Prune in-memory blocks

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Add changelog

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Feedback from review

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Add tests

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Improve tests

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Remove missing tenant blocks

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Remove old code

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Make CI happy

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Move code to a method

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* From map[string]bool to map[string]struct

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Add test for cleanMissingTenants

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>

* Add enconding

Signed-off-by: Daniel González Lopes <danielgonzalezlopes@gmail.com>
  • Loading branch information
dgzlopes committed Nov 10, 2020
1 parent 814282d commit 2f0917d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -9,3 +9,4 @@
* [BUGFIX] S3 multi-part upload errors [#306](https://github.com/grafana/tempo/pull/325)
* [BUGFIX] Increase Prometheus `notfound` metric on tempo-vulture. [#301](https://github.com/grafana/tempo/pull/301)
* [BUGFIX] Return 404 if searching for a tenant id that does not exist in the backend. [#321](https://github.com/grafana/tempo/pull/321)
* [BUGFIX] Prune in-memory blocks from missing tenants. [#314](https://github.com/grafana/tempo/pull/314)
27 changes: 27 additions & 0 deletions tempodb/tempodb.go
Expand Up @@ -377,6 +377,8 @@ func (rw *readerWriter) pollBlocklist() {
level.Error(rw.logger).Log("msg", "error retrieving tenants while polling blocklist", "err", err)
}

rw.cleanMissingTenants(tenants)

for _, tenantID := range tenants {
blockIDs, err := rw.r.Blocks(ctx, tenantID)
if err != nil {
Expand Down Expand Up @@ -544,3 +546,28 @@ func (rw *readerWriter) compactedBlocklist(tenantID string) []*encoding.Compacte

return copiedBlocklist
}

func (rw *readerWriter) cleanMissingTenants(tenants []string) {
tenantSet := make(map[string]struct{})
for _, tenantID := range tenants {
tenantSet[tenantID] = struct{}{}
}

for tenantID := range rw.blockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.blockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory blocklists", "tenantID", tenantID)
}
}

for tenantID := range rw.compactedBlockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.compactedBlockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory compacted blocklists", "tenantID", tenantID)
}
}
}
108 changes: 108 additions & 0 deletions tempodb/tempodb_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -175,6 +176,113 @@ func TestRetention(t *testing.T) {
checkBlocklists(t, blockID, 0, 0, rw)
}

func TestBlockCleanup(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IndexDownsample: 17,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})

blockID := uuid.New()

wal := w.WAL()
assert.NoError(t, err)

head, err := wal.NewBlock(blockID, testTenantID)
assert.NoError(t, err)

complete, err := head.Complete(wal, &mockSharder{})
assert.NoError(t, err)

err = w.WriteBlock(context.Background(), complete)
assert.NoError(t, err)

rw := r.(*readerWriter)

// poll
rw.pollBlocklist()

assert.Len(t, rw.blockLists[testTenantID], 1)

os.RemoveAll(tempDir + "/traces/" + testTenantID)

// poll
rw.pollBlocklist()

_, ok := rw.blockLists[testTenantID]
assert.False(t, ok)
}

func TestCleanMissingTenants(t *testing.T) {
tests := []struct {
name string
tenants []string
blocklist map[string][]*encoding.BlockMeta
expected map[string][]*encoding.BlockMeta
}{
{
name: "one missing tenant",
tenants: []string{"foo"},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{"foo": {{}}},
},
{
name: "no missing tenants",
tenants: []string{"foo", "bar"},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
},
{
name: "all missing tenants",
tenants: []string{},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r, _, _, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join("/tmp", "traces"),
},
WAL: &wal.Config{
Filepath: path.Join("/tmp", "wal"),
IndexDownsample: 17,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

rw := r.(*readerWriter)

rw.blockLists = tt.blocklist
rw.cleanMissingTenants(tt.tenants)
assert.Equal(t, rw.blockLists, tt.expected)
})
}
}

func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expectedCB int, rw *readerWriter) {
rw.pollBlocklist()

Expand Down

0 comments on commit 2f0917d

Please sign in to comment.