Skip to content

Commit

Permalink
Patch WAL dedupe when search is disabled (grafana#968)
Browse files Browse the repository at this point in the history
* Patch WAL dedupe when search is disabled

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Chanelog

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add comment around guard code

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 committed Sep 21, 2021
1 parent af0f76f commit e5f7ded
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25)
* [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott)
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
* [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk)
Expand Down
2 changes: 1 addition & 1 deletion modules/distributor/search_data_test.go
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestExtractSearchData(t *testing.T) {
traceIDA := []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
traceIDA := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}

testCases := []struct {
name string
Expand Down
78 changes: 78 additions & 0 deletions tempodb/search/backend_search_block_test.go
Expand Up @@ -81,6 +81,84 @@ func TestBackendSearchBlockSearch(t *testing.T) {
require.Equal(t, traceCount, int(sr.TracesInspected()))
}

func TestBackendSearchBlockDedupesWAL(t *testing.T) {
traceCount := 1_000

testCases := []struct {
name string
searchDataGenerator func(traceID []byte, i int) [][]byte
searchTags map[string]string
expectedLenResults int
expectedLenInspected int
}{
{
name: "distinct traces",
searchDataGenerator: genSearchData,
searchTags: map[string]string{"key10": "value_A_10", "key20": "value_B_20"},
expectedLenResults: 1,
expectedLenInspected: 1,
},
{
name: "empty traces",
searchDataGenerator: func(traceID []byte, i int) [][]byte {
return [][]byte{}
},
searchTags: map[string]string{"key10": "value_A_10"},
expectedLenResults: 0,
expectedLenInspected: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644)
require.NoError(t, err)

b1, err := NewStreamingSearchBlockForFile(f)
require.NoError(t, err)

id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
for i := 0; i < traceCount; i++ {
require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i)))
}

l, err := local.NewBackend(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

blockID := uuid.New()
tenantID := "fake"
err = NewBackendSearchBlock(b1, l, blockID, tenantID, backend.EncNone, 0)
require.NoError(t, err)

b2 := OpenBackendSearchBlock(l, blockID, tenantID)

p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: tc.searchTags,
})

sr := NewResults()

sr.StartWorker()
go func() {
defer sr.FinishWorker()
err := b2.Search(context.TODO(), p, sr)
require.NoError(t, err)
}()
sr.AllWorkersStarted()

var results []*tempopb.TraceSearchMetadata
for r := range sr.Results() {
results = append(results, r)
}
require.Equal(t, tc.expectedLenResults, len(results))
require.Equal(t, tc.expectedLenInspected, int(sr.TracesInspected()))
})
}

}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

Expand Down
4 changes: 4 additions & 0 deletions tempodb/search/data_combiner.go
Expand Up @@ -25,6 +25,10 @@ func (*DataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) {
data := tempofb.SearchEntryMutable{}
kv := &tempofb.KeyValues{} // buffer
for _, sb := range searchData {
// we append zero-length entries to the WAL even when search is disabled. skipping to prevent unmarshalling and panik :)
if len(sb) == 0 {
continue
}
sd := tempofb.SearchEntryFromBytes(sb)
for i, ii := 0, sd.TagsLength(); i < ii; i++ {
sd.Tags(kv, i)
Expand Down

0 comments on commit e5f7ded

Please sign in to comment.