diff --git a/CHANGELOG.md b/CHANGELOG.md index 4be333c2924..a6ce6dc1114 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -168,6 +168,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`. * [BUGFIX] Fix traceql parsing of most binary operations to not require spacing [#1939](https://github.com/grafana/tempo/pull/1941) (@mdisibio) * [BUGFIX] Don't persist tenants without blocks in the ingester[#1947](https://github.com/grafana/tempo/pull/1947) (@joe-elliott) * [BUGFIX] TraceQL: span scope not working with ranges [#1948](https://github.com/grafana/tempo/issues/1948) (@mdisibio) +* [BUGFIX] TraceQL: skip live traces search [#1997](https://github.com/grafana/tempo/pull/1997) (@mapno) ## v1.5.0 / 2022-08-17 diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index c240701bb4e..2da032ea00c 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "math/rand" + "net/http" "os" "path/filepath" "reflect" @@ -106,6 +107,9 @@ func TestAllInOne(t *testing.T) { // flush trace to backend callFlush(t, tempo) + // traceql search and verify trace is searchable once cut + util.SearchTraceQLAndAssertTrace(t, apiClient, info) + // sleep time.Sleep(10 * time.Second) @@ -115,7 +119,7 @@ func TestAllInOne(t *testing.T) { // test metrics require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total")) require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics)) - require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(4), "tempo_query_frontend_queries_total")) + require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(5), "tempo_query_frontend_queries_total")) // query trace - should fetch from backend queryAndAssertTrace(t, apiClient, info) @@ -451,7 +455,7 @@ func callFlush(t *testing.T, ingester *e2e.HTTPService) { fmt.Printf("Calling /flush on %s\n", ingester.Name()) res, err := e2e.DoGet("http://" + ingester.Endpoint(3200) + "/flush") require.NoError(t, err) - require.Equal(t, 204, res.StatusCode) + require.Equal(t, http.StatusNoContent, res.StatusCode) } func callIngesterRing(t *testing.T, svc *e2e.HTTPService) { @@ -459,7 +463,7 @@ func callIngesterRing(t *testing.T, svc *e2e.HTTPService) { fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + require.Equal(t, http.StatusOK, res.StatusCode) } func callCompactorRing(t *testing.T, svc *e2e.HTTPService) { @@ -467,7 +471,7 @@ func callCompactorRing(t *testing.T, svc *e2e.HTTPService) { fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + require.Equal(t, http.StatusOK, res.StatusCode) } func callStatus(t *testing.T, svc *e2e.HTTPService) { @@ -475,16 +479,13 @@ func callStatus(t *testing.T, svc *e2e.HTTPService) { fmt.Printf("Calling %s on %s\n", endpoint, svc.Name()) res, err := e2e.DoGet("http://" + svc.Endpoint(3200) + endpoint) require.NoError(t, err) - // body, err := ioutil.ReadAll(res.Body) - // require.NoError(t, err) - // t.Logf("body: %+v", string(body)) - require.Equal(t, 200, res.StatusCode) + require.Equal(t, http.StatusOK, res.StatusCode) } func assertEcho(t *testing.T, url string) { res, err := e2e.DoGet(url) require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + require.Equal(t, http.StatusOK, res.StatusCode) defer res.Body.Close() } diff --git a/integration/util.go b/integration/util.go index 8fba74ea2b6..bc42bbe6bc0 100644 --- a/integration/util.go +++ b/integration/util.go @@ -3,6 +3,7 @@ package integration // Collection of utilities to share between our various load tests import ( + "fmt" "os" "path/filepath" "strconv" @@ -251,6 +252,31 @@ func SearchAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUti require.True(t, hasHex(info.HexID(), resp)) } +func SearchTraceQLAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo) { + expected, err := info.ConstructTraceFromEpoch() + require.NoError(t, err) + + attr := tempoUtil.RandomAttrFromTrace(expected) + query := fmt.Sprintf(`{ .%s = "%s"}`, attr.GetKey(), attr.GetValue().GetStringValue()) + + resp, err := client.SearchTraceQL(query) + require.NoError(t, err) + + hasHex := func(hexId string, resp *tempopb.SearchResponse) bool { + for _, s := range resp.Traces { + equal, err := tempoUtil.EqualHexStringTraceIDs(s.TraceID, hexId) + require.NoError(t, err) + if equal { + return true + } + } + + return false + } + + require.True(t, hasHex(info.HexID(), resp)) +} + // by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers // to look in the backend blocks func SearchAndAssertTraceBackend(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo, start int64, end int64) { diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 4e70d0be3a6..6234f7985f1 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -40,7 +40,10 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem sr := search.NewResults() defer sr.Close() - i.searchLiveTraces(ctx, p, sr) + // skip live traces in TraceQL queries + if !api.IsTraceQLQuery(req) { + i.searchLiveTraces(ctx, p, sr) + } // Lock blocks mutex until all search tasks have been created. This avoids // deadlocking with other activity (ingest, flushing), caused by releasing @@ -139,7 +142,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * // searchWAL starts a search task for every WAL block. Must be called under lock. func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) { - searchFunc := func(e *searchStreamingBlockEntry) { + searchFBEntry := func(e *searchStreamingBlockEntry) { // flat-buffers search span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchWAL") defer span.Finish() @@ -202,9 +205,11 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p go searchWalBlock(i.headBlock) } - if i.searchHeadBlock != nil { + // skip flat-buffers search in TraceQL queries + // sanity check, vParquet blocks shouldn't have flat-buffers search blocks + if i.searchHeadBlock != nil && !api.IsTraceQLQuery(req) { sr.StartWorker() - go searchFunc(i.searchHeadBlock) + go searchFBEntry(i.searchHeadBlock) } // completing blocks @@ -213,42 +218,49 @@ func (i *instance) searchWAL(ctx context.Context, req *tempopb.SearchRequest, p go searchWalBlock(b) } - for _, e := range i.searchAppendBlocks { - sr.StartWorker() - go searchFunc(e) + // skip flat-buffers search in TraceQL queries + // sanity check, vParquet blocks shouldn't have flat-buffers search blocks + if !api.IsTraceQLQuery(req) { + for _, e := range i.searchAppendBlocks { + sr.StartWorker() + go searchFBEntry(e) + } } } // searchLocalBlocks starts a search task for every local block. Must be called under lock. func (i *instance) searchLocalBlocks(ctx context.Context, req *tempopb.SearchRequest, p search.Pipeline, sr *search.Results) { - // first check the searchCompleteBlocks map. if there is an entry for a block here we want to search it first - for _, e := range i.searchCompleteBlocks { - sr.StartWorker() - go func(e *searchLocalBlockEntry) { - span, ctx := opentracing.StartSpanFromContext(ctx, "instance.fb.searchLocalBlocks") - defer span.Finish() + if !api.IsTraceQLQuery(req) { // Skip flat-buffers search in TraceQL queries - defer sr.FinishWorker() + // first check the searchCompleteBlocks map. if there is an entry for a block here we want to search it first + for _, e := range i.searchCompleteBlocks { + sr.StartWorker() + go func(e *searchLocalBlockEntry) { + span, ctx := opentracing.StartSpanFromContext(ctx, "instance.fb.searchLocalBlocks") + defer span.Finish() - e.mtx.RLock() - defer e.mtx.RUnlock() + defer sr.FinishWorker() - span.LogFields(ot_log.Event("local block entry mtx acquired")) - span.SetTag("blockID", e.b.BlockID().String()) + e.mtx.RLock() + defer e.mtx.RUnlock() - // flat-buffers search - err := e.b.Search(ctx, p, sr) - if err != nil { - level.Error(log.Logger).Log("msg", "error searching local block", "blockID", e.b.BlockID().String(), "err", err) - } - }(e) + span.LogFields(ot_log.Event("local block entry mtx acquired")) + span.SetTag("blockID", e.b.BlockID().String()) + + // flat-buffers search + err := e.b.Search(ctx, p, sr) + if err != nil { + level.Error(log.Logger).Log("msg", "error searching local block", "blockID", e.b.BlockID().String(), "err", err) + } + }(e) + } } // next check all complete blocks to see if they were not searched, if they weren't then attempt to search them for _, e := range i.completeBlocks { - _, ok := i.searchCompleteBlocks[e] - if ok { + if _, ok := i.searchCompleteBlocks[e]; ok && !api.IsTraceQLQuery(req) { // no need to search this block, we already did above + // only applies to non-traceql queries continue } diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 24ca789430d..29b4c1f579f 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -110,12 +110,16 @@ func TestInstanceSearchTraceQL(t *testing.T) { req := &tempopb.SearchRequest{Query: query, Limit: 20} + // Test live traces + sr, err := i.Search(context.Background(), req) + assert.NoError(t, err) + assert.Len(t, sr.Traces, 0) + // Test after appending to WAL - err := i.CutCompleteTraces(0, true) - require.NoError(t, err) + require.NoError(t, i.CutCompleteTraces(0, true)) assert.Equal(t, int(i.traceCount.Load()), len(i.traces)) - sr, err := i.Search(context.Background(), req) + sr, err = i.Search(context.Background(), req) assert.NoError(t, err) assert.Len(t, sr.Traces, len(ids)) checkEqual(t, ids, sr) diff --git a/pkg/util/client.go b/pkg/util/client.go index b4216103eb3..cdd126c25cb 100644 --- a/pkg/util/client.go +++ b/pkg/util/client.go @@ -161,3 +161,13 @@ func (c *Client) QueryTrace(id string) (*tempopb.Trace, error) { return m, nil } + +func (c *Client) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { + m := &tempopb.SearchResponse{} + _, err := c.getFor(c.BaseURL+"/api/search?q="+url.QueryEscape(query), m) + if err != nil { + return nil, err + } + + return m, nil +}