From 0514091948cf8e00e42f44318c0e5e5b63b6388f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 18 Feb 2024 23:01:34 +0200 Subject: [PATCH] app/vlselect: follow-up for 451d2abf50171dbdea844b40f14e0205a173d37a - Consistently return the first `limit` log entries if the total size of found log entries doesn't exceed 1Mb. See app/vlselect/logsql/sort_writer.go . Previously random log entries could be returned with each request. - Document the change at docs/VictoriaLogs/CHANGELOG.md - Document the `limit` query arg at docs/VictoriaLogs/querying/README.md - Make the change less intrusive. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778 --- app/vlselect/logsql/logsql.go | 37 ++++------ app/vlselect/logsql/sort_writer.go | 97 +++++++++++++++++++++---- app/vlselect/logsql/sort_writer_test.go | 23 ++++-- app/vlselect/main.go | 10 ++- app/vlstorage/main.go | 2 +- docs/CHANGELOG.md | 1 - docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/querying/README.md | 9 +++ lib/logstorage/filters_test.go | 3 +- lib/logstorage/storage_search.go | 40 +++------- lib/logstorage/storage_search_test.go | 59 ++++++--------- 11 files changed, 168 insertions(+), 115 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 343593f44c18..acdc927a1659 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -2,7 +2,6 @@ package logsql import ( "net/http" - "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -19,13 +18,18 @@ var ( ) // ProcessQueryRequest handles /select/logsql/query request -func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) { +func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}, cancel func()) { // Extract tenantID tenantID, err := logstorage.GetTenantIDFromRequest(r) if err != nil { httpserver.Errorf(w, r, "%s", err) return } + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } qStr := r.FormValue("query") q, err := logstorage.ParseQuery(qStr) @@ -33,36 +37,27 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err) return } - limit, err := httputils.GetInt(r, "limit") - if err != nil { - httpserver.Errorf(w, r, "cannot parse limit from the request: %s", err) - return - } w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") + sw := getSortWriter() - sw.Init(w, maxSortBufferSize.IntN()) + sw.Init(w, maxSortBufferSize.IntN(), limit) tenantIDs := []logstorage.TenantID{tenantID} - - var mx sync.Mutex - vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) bool { + vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) { if len(columns) == 0 { - return true + return } rowsCount := len(columns[0].Values) - mx.Lock() - if rowsCount > limit { - rowsCount = limit - } - limit = limit - rowsCount - mx.Unlock() + bb := blockResultPool.Get() for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { WriteJSONRow(bb, columns, rowIdx) } - sw.MustWrite(bb.B) - blockResultPool.Put(bb) - return limit == 0 + if !sw.TryWrite(bb.B) { + cancel() + } + + blockResultPool.Put(bb) }) sw.FinalFlush() putSortWriter(sw) diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go index 919fc8310b57..291d401b26b0 100644 --- a/app/vlselect/logsql/sort_writer.go +++ b/app/vlselect/logsql/sort_writer.go @@ -36,8 +36,12 @@ var sortWriterPool sync.Pool // If the buf isn't empty at FinalFlush() call, then the buffered data // is sorted by _time field. type sortWriter struct { - mu sync.Mutex - w io.Writer + mu sync.Mutex + w io.Writer + + maxLines int + linesWritten int + maxBufLen int buf []byte bufFlushed bool @@ -47,58 +51,121 @@ type sortWriter struct { func (sw *sortWriter) reset() { sw.w = nil + + sw.maxLines = 0 + sw.linesWritten = 0 + sw.maxBufLen = 0 sw.buf = sw.buf[:0] sw.bufFlushed = false sw.hasErr = false } -func (sw *sortWriter) Init(w io.Writer, maxBufLen int) { +// Init initializes sw. +// +// If maxLines is set to positive value, then sw accepts up to maxLines +// and then rejects all the other lines by returning false from TryWrite. +func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) { sw.reset() sw.w = w sw.maxBufLen = maxBufLen + sw.maxLines = maxLines } -func (sw *sortWriter) MustWrite(p []byte) { +// TryWrite writes p to sw. +// +// True is returned on successful write, false otherwise. +// +// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw. +func (sw *sortWriter) TryWrite(p []byte) bool { sw.mu.Lock() defer sw.mu.Unlock() if sw.hasErr { - return + return false } if sw.bufFlushed { - if _, err := sw.w.Write(p); err != nil { + if !sw.writeToUnderlyingWriterLocked(p) { sw.hasErr = true + return false } - return + return true } + if len(sw.buf)+len(p) < sw.maxBufLen { sw.buf = append(sw.buf, p...) - return + return true } + sw.bufFlushed = true - if len(sw.buf) > 0 { - if _, err := sw.w.Write(sw.buf); err != nil { - sw.hasErr = true - return + if !sw.writeToUnderlyingWriterLocked(sw.buf) { + sw.hasErr = true + return false + } + sw.buf = sw.buf[:0] + + if !sw.writeToUnderlyingWriterLocked(p) { + sw.hasErr = true + return false + } + return true +} + +func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool { + if len(p) == 0 { + return true + } + if sw.maxLines > 0 { + if sw.linesWritten >= sw.maxLines { + return false } - sw.buf = sw.buf[:0] + var linesLeft int + p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten) + println("DEBUG: end trimLines", string(p), linesLeft) + sw.linesWritten += linesLeft } if _, err := sw.w.Write(p); err != nil { - sw.hasErr = true + return false } + return true } +func trimLines(p []byte, maxLines int) ([]byte, int) { + println("DEBUG: start trimLines", string(p), maxLines) + if maxLines <= 0 { + return nil, 0 + } + n := bytes.Count(p, newline) + if n < maxLines { + return p, n + } + for n >= maxLines { + idx := bytes.LastIndexByte(p, '\n') + p = p[:idx] + n-- + } + return p[:len(p)+1], maxLines +} + +var newline = []byte("\n") + func (sw *sortWriter) FinalFlush() { if sw.hasErr || sw.bufFlushed { return } + rs := getRowsSorter() rs.parseRows(sw.buf) rs.sort() - WriteJSONRows(sw.w, rs.rows) + + rows := rs.rows + if sw.maxLines > 0 && len(rows) > sw.maxLines { + rows = rows[:sw.maxLines] + } + WriteJSONRows(sw.w, rows) + putRowsSorter(rs) } diff --git a/app/vlselect/logsql/sort_writer_test.go b/app/vlselect/logsql/sort_writer_test.go index 9313bed85dde..3c33257267d3 100644 --- a/app/vlselect/logsql/sort_writer_test.go +++ b/app/vlselect/logsql/sort_writer_test.go @@ -7,15 +7,16 @@ import ( ) func TestSortWriter(t *testing.T) { - f := func(maxBufLen int, data string, expectedResult string) { + f := func(maxBufLen, maxLines int, data string, expectedResult string) { t.Helper() var bb bytes.Buffer sw := getSortWriter() - sw.Init(&bb, maxBufLen) - + sw.Init(&bb, maxBufLen, maxLines) for _, s := range strings.Split(data, "\n") { - sw.MustWrite([]byte(s + "\n")) + if !sw.TryWrite([]byte(s + "\n")) { + break + } } sw.FinalFlush() putSortWriter(sw) @@ -26,14 +27,20 @@ func TestSortWriter(t *testing.T) { } } - f(100, "", "") - f(100, "{}", "{}\n") + f(100, 0, "", "") + f(100, 0, "{}", "{}\n") data := `{"_time":"def","_msg":"xxx"} {"_time":"abc","_msg":"foo"}` resultExpected := `{"_time":"abc","_msg":"foo"} {"_time":"def","_msg":"xxx"} ` - f(100, data, resultExpected) - f(10, data, data+"\n") + f(100, 0, data, resultExpected) + f(10, 0, data, data+"\n") + + // Test with the maxLines + f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n") + f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n") + f(10, 2, data, data+"\n") + f(100, 2, data, resultExpected) } diff --git a/app/vlselect/main.go b/app/vlselect/main.go index df8e94925445..14e80dd4ce73 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -1,6 +1,7 @@ package vlselect import ( + "context" "embed" "flag" "fmt" @@ -101,7 +102,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { // Limit the number of concurrent queries, which can consume big amounts of CPU. startTime := time.Now() - stopCh := r.Context().Done() + ctx := r.Context() + stopCh := ctx.Done() select { case concurrencyLimitCh <- struct{}{}: defer func() { <-concurrencyLimitCh }() @@ -139,11 +141,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } } + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + stopCh = ctxWithCancel.Done() + switch { case path == "/logsql/query": logsqlQueryRequests.Inc() httpserver.EnableCORS(w, r) - logsql.ProcessQueryRequest(w, r, stopCh) + logsql.ProcessQueryRequest(w, r, stopCh, cancel) return true default: return false diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index ed5eba88d951..b1b55675c991 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) { } // RunQuery runs the given q and calls processBlock for the returned data blocks -func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn) bool) { +func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) { strg.RunQuery(tenantIDs, q, stopCh, processBlock) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d8e747b4ea07..6a4bc7ec2a95 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -63,7 +63,6 @@ Released at 2024-02-14 * FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed. * FEATURE: [vmalert](https://docs.victoriametrics.com/#vmalert): support [filtering](https://prometheus.io/docs/prometheus/2.49/querying/api/#rules) for `/api/v1/rules` API. See [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5749) by @victoramsantos. * FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5724). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5738). -* FEATURE: enable `limit` query param for `/select/logsql/query` VictoriaLogs API endpoint. By default this limit sets to 1000 lines, but it can be changed via query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when `-promscrape.dropOriginalLabels` command-line flag is set. This issue has been introduced in [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0) when addressing [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): properly release memory during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4690). diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index a90d1ca92200..dc6492bc1787 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: support the ability to limit the number of returned log entries from [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) by passing `limit` query arg. Previously all the matching log entries were returned until closing the response stream. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674). Thanks to @dmitryk-dk for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778). + ## [v0.4.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.4.2-victorialogs) Released at 2023-11-15 diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 8d7e0f40f3d1..a1aaff819bdd 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -44,6 +44,15 @@ See [LogsQL docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) for The `query` arg must be properly encoded with [percent encoding](https://en.wikipedia.org/wiki/URL_encoding) when passing it to `curl` or similar tools. +By default the `/select/logsql/query` returns all the log entries matching the given `query`. The response size can be limited in the following ways: + +- By closing the response stream at any time. In this case VictoriaLogs stops query execution and frees all the resources occupied by the request. +- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following request returns + up to 10 matching log entries: + ```sh + curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' + ``` + The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/), where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`. Example response: diff --git a/lib/logstorage/filters_test.go b/lib/logstorage/filters_test.go index a5d017ee47e9..cf7d6e7827d0 100644 --- a/lib/logstorage/filters_test.go +++ b/lib/logstorage/filters_test.go @@ -9226,7 +9226,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi resultColumnNames: []string{resultColumnName}, } workersCount := 3 - s.search(workersCount, so, nil, func(workerID uint, br *blockResult) bool { + s.search(workersCount, so, nil, func(workerID uint, br *blockResult) { // Verify tenantID if !br.streamID.tenantID.equal(&tenantID) { t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID) @@ -9248,7 +9248,6 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi if !reflect.DeepEqual(br.timestamps, expectedTimestamps) { t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps) } - return false }) } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 136e2b82123e..c728d9998a30 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -1,7 +1,6 @@ package logstorage import ( - "context" "math" "sort" "sync" @@ -44,7 +43,7 @@ type searchOptions struct { } // RunQuery runs the given q and calls processBlock for results -func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn) bool) { +func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn)) { resultColumnNames := q.getResultColumnNames() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -52,7 +51,7 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ resultColumnNames: resultColumnNames, } workersCount := cgroup.AvailableCPUs() - s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) bool { + s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { brs := getBlockRows() cs := brs.cs @@ -62,11 +61,10 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ Values: br.getColumnValues(i), }) } - limitReached := processBlock(cs) + processBlock(cs) brs.cs = cs putBlockRows(brs) - return limitReached }) } @@ -120,7 +118,7 @@ const blockSearchWorksPerBatch = 64 // searchResultFunc must process sr. // // The callback is called at the worker with the given workerID. -type searchResultFunc func(workerID uint, br *blockResult) bool +type searchResultFunc func(workerID uint, br *blockResult) // search searches for the matching rows according to so. // @@ -130,34 +128,19 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch var wg sync.WaitGroup workCh := make(chan []*blockSearchWork, workersCount) wg.Add(workersCount) - - ctx, cancelFn := context.WithCancel(context.Background()) - for i := 0; i < workersCount; i++ { go func(workerID uint) { - defer wg.Done() bs := getBlockSearch() - defer putBlockSearch(bs) - for { - select { - case bsws, ok := <-workCh: - if !ok { - return + for bsws := range workCh { + for _, bsw := range bsws { + bs.search(bsw) + if bs.br.RowsCount() > 0 { + processBlockResult(workerID, &bs.br) } - for _, bsw := range bsws { - bs.search(bsw) - if bs.br.RowsCount() > 0 { - limitReached := processBlockResult(workerID, &bs.br) - if limitReached { - cancelFn() - return - } - } - } - case <-ctx.Done(): - return } } + putBlockSearch(bs) + wg.Done() }(uint(i)) } @@ -195,7 +178,6 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch // Wait until workers finish their work close(workCh) wg.Wait() - cancelFn() // Decrement references to parts for _, pw := range pws { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 9a0440d82daa..98b8f902a145 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -84,7 +84,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 0, ProjectID: 0, } - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -96,7 +96,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -111,7 +111,7 @@ func TestStorageRunQuery(t *testing.T) { } expectedTenantID := tenantID.String() rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { hasTenantIDColumn := false var columnNames []string for _, c := range columns { @@ -132,7 +132,6 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) } atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -146,9 +145,8 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-multiple-tenant-ids", func(t *testing.T) { q := mustParseQuery(`"log message"`) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -160,9 +158,8 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-in-filter", func(t *testing.T) { q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -173,7 +170,7 @@ func TestStorageRunQuery(t *testing.T) { }) t.Run("stream-filter-mismatch", func(t *testing.T) { q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { panic(fmt.Errorf("unexpected match")) } s.RunQuery(allTenantIDs, q, nil, processBlock) @@ -187,7 +184,7 @@ func TestStorageRunQuery(t *testing.T) { } expectedStreamID := fmt.Sprintf("stream_id=%d", i) rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { hasStreamIDColumn := false var columnNames []string for _, c := range columns { @@ -208,7 +205,6 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) } atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -226,9 +222,8 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -247,9 +242,8 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -268,9 +262,8 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } rowsCount := uint32(0) - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) - return false } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -288,7 +281,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -302,7 +295,7 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(columns []BlockColumn) bool { + processBlock := func(columns []BlockColumn) { panic(fmt.Errorf("unexpected match")) } tenantIDs := []TenantID{tenantID} @@ -412,7 +405,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -430,7 +423,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -448,7 +441,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -468,12 +461,11 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -493,9 +485,8 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -514,7 +505,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock) @@ -535,12 +526,11 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -565,12 +555,11 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -603,12 +592,11 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -632,9 +620,8 @@ func TestStorageSearch(t *testing.T) { resultColumnNames: []string{"_msg"}, } rowsCount := uint32(0) - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) - return false } s.search(workersCount, so, nil, processBlock) @@ -657,7 +644,7 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - processBlock := func(workerID uint, br *blockResult) bool { + processBlock := func(workerID uint, br *blockResult) { panic(fmt.Errorf("unexpected match")) } s.search(workersCount, so, nil, processBlock)