Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune in-memory blocks from missing tenants #314

Merged
merged 12 commits into from
Nov 10, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
* [ENHANCEMENT] Add command line flags for s3 credentials. [#308](https://github.com/grafana/tempo/pull/308)
* [BUGFIX] Increase Prometheus `notfound` metric on tempo-vulture. [#301](https://github.com/grafana/tempo/pull/301)
* [BUGFIX] Return 404 if searching for a tenant id that does not exist in the backend. [#321](https://github.com/grafana/tempo/pull/321)
* [BUGFIX] Prune in-memory blocks from missing tenants. [#314](https://github.com/grafana/tempo/pull/314)
27 changes: 27 additions & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ func (rw *readerWriter) pollBlocklist() {
level.Error(rw.logger).Log("msg", "error retrieving tenants while polling blocklist", "err", err)
}

rw.cleanMissingTenants(tenants)

for _, tenantID := range tenants {
blockIDs, err := rw.r.Blocks(ctx, tenantID)
if err != nil {
Expand Down Expand Up @@ -544,3 +546,28 @@ func (rw *readerWriter) compactedBlocklist(tenantID string) []*encoding.Compacte

return copiedBlocklist
}

dgzlopes marked this conversation as resolved.
Show resolved Hide resolved
func (rw *readerWriter) cleanMissingTenants(tenants []string) {
dgzlopes marked this conversation as resolved.
Show resolved Hide resolved
tenantSet := make(map[string]struct{})
for _, tenantID := range tenants {
tenantSet[tenantID] = struct{}{}
}

for tenantID := range rw.blockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.blockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory blocklists", "tenantID", tenantID)
}
}

for tenantID := range rw.compactedBlockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.compactedBlockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory compacted blocklists", "tenantID", tenantID)
}
}
}
108 changes: 108 additions & 0 deletions tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -175,6 +176,113 @@ func TestRetention(t *testing.T) {
checkBlocklists(t, blockID, 0, 0, rw)
}

func TestBlockCleanup(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IndexDownsample: 17,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})

blockID := uuid.New()

wal := w.WAL()
assert.NoError(t, err)

head, err := wal.NewBlock(blockID, testTenantID)
assert.NoError(t, err)

complete, err := head.Complete(wal, &mockSharder{})
assert.NoError(t, err)

err = w.WriteBlock(context.Background(), complete)
assert.NoError(t, err)

rw := r.(*readerWriter)

dgzlopes marked this conversation as resolved.
Show resolved Hide resolved
// poll
rw.pollBlocklist()

assert.Len(t, rw.blockLists[testTenantID], 1)

os.RemoveAll(tempDir + "/traces/" + testTenantID)

// poll
rw.pollBlocklist()

_, ok := rw.blockLists[testTenantID]
assert.False(t, ok)
}

func TestCleanMissingTenants(t *testing.T) {
tests := []struct {
name string
tenants []string
blocklist map[string][]*encoding.BlockMeta
expected map[string][]*encoding.BlockMeta
}{
{
name: "one missing tenant",
tenants: []string{"foo"},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{"foo": {{}}},
},
{
name: "no missing tenants",
tenants: []string{"foo", "bar"},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
},
{
name: "all missing tenants",
tenants: []string{},
blocklist: map[string][]*encoding.BlockMeta{"foo": {{}}, "bar": {{}}},
expected: map[string][]*encoding.BlockMeta{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r, _, _, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join("/tmp", "traces"),
},
WAL: &wal.Config{
Filepath: path.Join("/tmp", "wal"),
IndexDownsample: 17,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

rw := r.(*readerWriter)

rw.blockLists = tt.blocklist
rw.cleanMissingTenants(tt.tenants)
assert.Equal(t, rw.blockLists, tt.expected)
})
}
}

func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expectedCB int, rw *readerWriter) {
rw.pollBlocklist()

Expand Down