Skip to content

Commit

Permalink
Skip flatbuffers and live traces search in TraceQL queries (#1997)
Browse files Browse the repository at this point in the history
* Skip flatbuffers and live traces search in TraceQL queries

* Add tests

* Changelog
  • Loading branch information
mapno committed Jan 16, 2023
1 parent 3047761 commit e6b58d5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -168,6 +168,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`.
* [BUGFIX] Fix traceql parsing of most binary operations to not require spacing [#1939](https://github.com/grafana/tempo/pull/1941) (@mdisibio)
* [BUGFIX] Don't persist tenants without blocks in the ingester[#1947](https://github.com/grafana/tempo/pull/1947) (@joe-elliott)
* [BUGFIX] TraceQL: span scope not working with ranges [#1948](https://github.com/grafana/tempo/issues/1948) (@mdisibio)
* [BUGFIX] TraceQL: skip live traces search [#1997](https://github.com/grafana/tempo/pull/1997) (@mapno)

## v1.5.0 / 2022-08-17

Expand Down
19 changes: 10 additions & 9 deletions integration/e2e/e2e_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"math/rand"
"net/http"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -106,6 +107,9 @@ func TestAllInOne(t *testing.T) {
// flush trace to backend
callFlush(t, tempo)

// traceql search and verify trace is searchable once cut
util.SearchTraceQLAndAssertTrace(t, apiClient, info)

// sleep
time.Sleep(10 * time.Second)

Expand All @@ -115,7 +119,7 @@ func TestAllInOne(t *testing.T) {
// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(4), "tempo_query_frontend_queries_total"))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(5), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, info)
Expand Down Expand Up @@ -451,40 +455,37 @@ func callFlush(t *testing.T, ingester *e2e.HTTPService) {
fmt.Printf("Calling /flush on %s\n", ingester.Name())
res, err := e2e.DoGet("http://" + ingester.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)
require.Equal(t, http.StatusNoContent, res.StatusCode)
}

func callIngesterRing(t *testing.T, svc *e2e.HTTPService) {
endpoint := "/ingester/ring"
fmt.Printf("Calling %s on %s\n", endpoint, svc.Name())
res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)
}

func callCompactorRing(t *testing.T, svc *e2e.HTTPService) {
endpoint := "/compactor/ring"
fmt.Printf("Calling %s on %s\n", endpoint, svc.Name())
res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)
}

func callStatus(t *testing.T, svc *e2e.HTTPService) {
endpoint := "/status/endpoints"
fmt.Printf("Calling %s on %s\n", endpoint, svc.Name())
res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint)
require.NoError(t, err)
// body, err := ioutil.ReadAll(res.Body)
// require.NoError(t, err)
// t.Logf("body: %+v", string(body))
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)
}

func assertEcho(t *testing.T, url string) {
res, err := e2e.DoGet(url)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
require.Equal(t, http.StatusOK, res.StatusCode)
defer res.Body.Close()
}

Expand Down
26 changes: 26 additions & 0 deletions integration/util.go
Expand Up @@ -3,6 +3,7 @@ package integration
// Collection of utilities to share between our various load tests

import (
"fmt"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -251,6 +252,31 @@ func SearchAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUti
require.True(t, hasHex(info.HexID(), resp))
}

func SearchTraceQLAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo) {
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

attr := tempoUtil.RandomAttrFromTrace(expected)
query := fmt.Sprintf(`{ .%s = "%s"}`, attr.GetKey(), attr.GetValue().GetStringValue())

resp, err := client.SearchTraceQL(query)
require.NoError(t, err)

hasHex := func(hexId string, resp *tempopb.SearchResponse) bool {
for _, s := range resp.Traces {
equal, err := tempoUtil.EqualHexStringTraceIDs(s.TraceID, hexId)
require.NoError(t, err)
if equal {
return true
}
}

return false
}

require.True(t, hasHex(info.HexID(), resp))
}

// by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers
// to look in the backend blocks
func SearchAndAssertTraceBackend(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo, start int64, end int64) {
Expand Down
64 changes: 38 additions & 26 deletions modules/ingester/instance_search.go
Expand Up @@ -40,7 +40,10 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
sr := search.NewResults()
defer sr.Close()

i.searchLiveTraces(ctx, p, sr)
// skip live traces in TraceQL queries
if !api.IsTraceQLQuery(req) {
i.searchLiveTraces(ctx, p, sr)
}

// Lock blocks mutex until all search tasks have been created. This avoids
// deadlocking with other activity (ingest, flushing), caused by releasing
Expand Down Expand Up @@ -139,7 +142,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *

// searchWAL starts a search task for every WAL block. Must be called under lock.
func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) {
searchFunc := func(e *searchStreamingBlockEntry) {
searchFBEntry := func(e *searchStreamingBlockEntry) {
// flat-buffers search
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWAL")
defer span.Finish()
Expand Down Expand Up @@ -202,9 +205,11 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p
go searchWalBlock(i.headBlock)
}

if i.searchHeadBlock != nil {
// skip flat-buffers search in TraceQL queries
// sanity check, vParquet blocks shouldn't have flat-buffers search blocks
if i.searchHeadBlock != nil && !api.IsTraceQLQuery(req) {
sr.StartWorker()
go searchFunc(i.searchHeadBlock)
go searchFBEntry(i.searchHeadBlock)
}

// completing blocks
Expand All @@ -213,42 +218,49 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p
go searchWalBlock(b)
}

for _, e := range i.searchAppendBlocks {
sr.StartWorker()
go searchFunc(e)
// skip flat-buffers search in TraceQL queries
// sanity check, vParquet blocks shouldn't have flat-buffers search blocks
if !api.IsTraceQLQuery(req) {
for _, e := range i.searchAppendBlocks {
sr.StartWorker()
go searchFBEntry(e)
}
}
}

// searchLocalBlocks starts a search task for every local block. Must be called under lock.
func (i *instance) searchLocalBlocks(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) {
// first check the searchCompleteBlocks map. if there is an entry for a block here we want to search it first
for _, e := range i.searchCompleteBlocks {
sr.StartWorker()
go func(e *searchLocalBlockEntry) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.fb.searchLocalBlocks")
defer span.Finish()
if !api.IsTraceQLQuery(req) { // Skip flat-buffers search in TraceQL queries

defer sr.FinishWorker()
// first check the searchCompleteBlocks map. if there is an entry for a block here we want to search it first
for _, e := range i.searchCompleteBlocks {
sr.StartWorker()
go func(e *searchLocalBlockEntry) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.fb.searchLocalBlocks")
defer span.Finish()

e.mtx.RLock()
defer e.mtx.RUnlock()
defer sr.FinishWorker()

span.LogFields(ot_log.Event("local block entry mtx acquired"))
span.SetTag("blockID", e.b.BlockID().String())
e.mtx.RLock()
defer e.mtx.RUnlock()

// flat-buffers search
err := e.b.Search(ctx, p, sr)
if err != nil {
level.Error(log.Logger).Log("msg", "error searching local block", "blockID", e.b.BlockID().String(), "err", err)
}
}(e)
span.LogFields(ot_log.Event("local block entry mtx acquired"))
span.SetTag("blockID", e.b.BlockID().String())

// flat-buffers search
err := e.b.Search(ctx, p, sr)
if err != nil {
level.Error(log.Logger).Log("msg", "error searching local block", "blockID", e.b.BlockID().String(), "err", err)
}
}(e)
}
}

// next check all complete blocks to see if they were not searched, if they weren't then attempt to search them
for _, e := range i.completeBlocks {
_, ok := i.searchCompleteBlocks[e]
if ok {
if _, ok := i.searchCompleteBlocks[e]; ok && !api.IsTraceQLQuery(req) {
// no need to search this block, we already did above
// only applies to non-traceql queries
continue
}

Expand Down
10 changes: 7 additions & 3 deletions modules/ingester/instance_search_test.go
Expand Up @@ -110,12 +110,16 @@ func TestInstanceSearchTraceQL(t *testing.T) {

req := &tempopb.SearchRequest{Query: query, Limit: 20}

// Test live traces
sr, err := i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, 0)

// Test after appending to WAL
err := i.CutCompleteTraces(0, true)
require.NoError(t, err)
require.NoError(t, i.CutCompleteTraces(0, true))
assert.Equal(t, int(i.traceCount.Load()), len(i.traces))

sr, err := i.Search(context.Background(), req)
sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/client.go
Expand Up @@ -161,3 +161,13 @@ func (c *Client) QueryTrace(id string) (*tempopb.Trace, error) {

return m, nil
}

func (c *Client) SearchTraceQL(query string) (*tempopb.SearchResponse, error) {
m := &tempopb.SearchResponse{}
_, err := c.getFor(c.BaseURL+"/api/search?q="+url.QueryEscape(query), m)
if err != nil {
return nil, err
}

return m, nil
}

0 comments on commit e6b58d5

Please sign in to comment.