Skip to content

Commit

Permalink
app/vlselect: follow-up for 451d2ab
Browse files Browse the repository at this point in the history
- Consistently return the first `limit` log entries if the total size of found log entries doesn't exceed 1Mb.
  See app/vlselect/logsql/sort_writer.go . Previously random log entries could be returned with each request.
- Document the change at docs/VictoriaLogs/CHANGELOG.md
- Document the `limit` query arg at docs/VictoriaLogs/querying/README.md
- Make the change less intrusive.

Updates #5674
Updates #5778
  • Loading branch information
valyala committed Feb 18, 2024
1 parent 451d2ab commit 0514091
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 115 deletions.
37 changes: 16 additions & 21 deletions app/vlselect/logsql/logsql.go
Expand Up @@ -2,7 +2,6 @@ package logsql

import (
"net/http"
"sync"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
Expand All @@ -19,50 +18,46 @@ var (
)

// ProcessQueryRequest handles /select/logsql/query request
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}, cancel func()) {
// Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}

qStr := r.FormValue("query")
q, err := logstorage.ParseQuery(qStr)
if err != nil {
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
return
}
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "cannot parse limit from the request: %s", err)
return
}
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")

sw := getSortWriter()
sw.Init(w, maxSortBufferSize.IntN())
sw.Init(w, maxSortBufferSize.IntN(), limit)
tenantIDs := []logstorage.TenantID{tenantID}

var mx sync.Mutex
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) bool {
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
if len(columns) == 0 {
return true
return
}
rowsCount := len(columns[0].Values)
mx.Lock()
if rowsCount > limit {
rowsCount = limit
}
limit = limit - rowsCount
mx.Unlock()

bb := blockResultPool.Get()
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
WriteJSONRow(bb, columns, rowIdx)
}
sw.MustWrite(bb.B)
blockResultPool.Put(bb)

return limit == 0
if !sw.TryWrite(bb.B) {
cancel()
}

blockResultPool.Put(bb)
})
sw.FinalFlush()
putSortWriter(sw)
Expand Down
97 changes: 82 additions & 15 deletions app/vlselect/logsql/sort_writer.go
Expand Up @@ -36,8 +36,12 @@ var sortWriterPool sync.Pool
// If the buf isn't empty at FinalFlush() call, then the buffered data
// is sorted by _time field.
type sortWriter struct {
mu sync.Mutex
w io.Writer
mu sync.Mutex
w io.Writer

maxLines int
linesWritten int

maxBufLen int
buf []byte
bufFlushed bool
Expand All @@ -47,58 +51,121 @@ type sortWriter struct {

func (sw *sortWriter) reset() {
sw.w = nil

sw.maxLines = 0
sw.linesWritten = 0

sw.maxBufLen = 0
sw.buf = sw.buf[:0]
sw.bufFlushed = false
sw.hasErr = false
}

func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
// Init initializes sw.
//
// If maxLines is set to positive value, then sw accepts up to maxLines
// and then rejects all the other lines by returning false from TryWrite.
func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
sw.reset()

sw.w = w
sw.maxBufLen = maxBufLen
sw.maxLines = maxLines
}

func (sw *sortWriter) MustWrite(p []byte) {
// TryWrite writes p to sw.
//
// True is returned on successful write, false otherwise.
//
// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
func (sw *sortWriter) TryWrite(p []byte) bool {
sw.mu.Lock()
defer sw.mu.Unlock()

if sw.hasErr {
return
return false
}

if sw.bufFlushed {
if _, err := sw.w.Write(p); err != nil {
if !sw.writeToUnderlyingWriterLocked(p) {
sw.hasErr = true
return false
}
return
return true
}

if len(sw.buf)+len(p) < sw.maxBufLen {
sw.buf = append(sw.buf, p...)
return
return true
}

sw.bufFlushed = true
if len(sw.buf) > 0 {
if _, err := sw.w.Write(sw.buf); err != nil {
sw.hasErr = true
return
if !sw.writeToUnderlyingWriterLocked(sw.buf) {
sw.hasErr = true
return false
}
sw.buf = sw.buf[:0]

if !sw.writeToUnderlyingWriterLocked(p) {
sw.hasErr = true
return false
}
return true
}

func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
if len(p) == 0 {
return true
}
if sw.maxLines > 0 {
if sw.linesWritten >= sw.maxLines {
return false
}
sw.buf = sw.buf[:0]
var linesLeft int
p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
println("DEBUG: end trimLines", string(p), linesLeft)
sw.linesWritten += linesLeft
}
if _, err := sw.w.Write(p); err != nil {
sw.hasErr = true
return false
}
return true
}

func trimLines(p []byte, maxLines int) ([]byte, int) {
println("DEBUG: start trimLines", string(p), maxLines)
if maxLines <= 0 {
return nil, 0
}
n := bytes.Count(p, newline)
if n < maxLines {
return p, n
}
for n >= maxLines {
idx := bytes.LastIndexByte(p, '\n')
p = p[:idx]
n--
}
return p[:len(p)+1], maxLines
}

var newline = []byte("\n")

func (sw *sortWriter) FinalFlush() {
if sw.hasErr || sw.bufFlushed {
return
}

rs := getRowsSorter()
rs.parseRows(sw.buf)
rs.sort()
WriteJSONRows(sw.w, rs.rows)

rows := rs.rows
if sw.maxLines > 0 && len(rows) > sw.maxLines {
rows = rows[:sw.maxLines]
}
WriteJSONRows(sw.w, rows)

putRowsSorter(rs)
}

Expand Down
23 changes: 15 additions & 8 deletions app/vlselect/logsql/sort_writer_test.go
Expand Up @@ -7,15 +7,16 @@ import (
)

func TestSortWriter(t *testing.T) {
f := func(maxBufLen int, data string, expectedResult string) {
f := func(maxBufLen, maxLines int, data string, expectedResult string) {
t.Helper()

var bb bytes.Buffer
sw := getSortWriter()
sw.Init(&bb, maxBufLen)

sw.Init(&bb, maxBufLen, maxLines)
for _, s := range strings.Split(data, "\n") {
sw.MustWrite([]byte(s + "\n"))
if !sw.TryWrite([]byte(s + "\n")) {
break
}
}
sw.FinalFlush()
putSortWriter(sw)
Expand All @@ -26,14 +27,20 @@ func TestSortWriter(t *testing.T) {
}
}

f(100, "", "")
f(100, "{}", "{}\n")
f(100, 0, "", "")
f(100, 0, "{}", "{}\n")

data := `{"_time":"def","_msg":"xxx"}
{"_time":"abc","_msg":"foo"}`
resultExpected := `{"_time":"abc","_msg":"foo"}
{"_time":"def","_msg":"xxx"}
`
f(100, data, resultExpected)
f(10, data, data+"\n")
f(100, 0, data, resultExpected)
f(10, 0, data, data+"\n")

// Test with the maxLines
f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n")
f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n")
f(10, 2, data, data+"\n")
f(100, 2, data, resultExpected)
}
10 changes: 8 additions & 2 deletions app/vlselect/main.go
@@ -1,6 +1,7 @@
package vlselect

import (
"context"
"embed"
"flag"
"fmt"
Expand Down Expand Up @@ -101,7 +102,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {

// Limit the number of concurrent queries, which can consume big amounts of CPU.
startTime := time.Now()
stopCh := r.Context().Done()
ctx := r.Context()
stopCh := ctx.Done()
select {
case concurrencyLimitCh <- struct{}{}:
defer func() { <-concurrencyLimitCh }()
Expand Down Expand Up @@ -139,11 +141,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
}

ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
stopCh = ctxWithCancel.Done()

switch {
case path == "/logsql/query":
logsqlQueryRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessQueryRequest(w, r, stopCh)
logsql.ProcessQueryRequest(w, r, stopCh, cancel)
return true
default:
return false
Expand Down
2 changes: 1 addition & 1 deletion app/vlstorage/main.go
Expand Up @@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) {
}

// RunQuery runs the given q and calls processBlock for the returned data blocks
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn) bool) {
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) {
strg.RunQuery(tenantIDs, q, stopCh, processBlock)
}

Expand Down
1 change: 0 additions & 1 deletion docs/CHANGELOG.md
Expand Up @@ -63,7 +63,6 @@ Released at 2024-02-14
* FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed.
* FEATURE: [vmalert](https://docs.victoriametrics.com/#vmalert): support [filtering](https://prometheus.io/docs/prometheus/2.49/querying/api/#rules) for `/api/v1/rules` API. See [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5749) by @victoramsantos.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5724). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5738).
* FEATURE: enable `limit` query param for `/select/logsql/query` VictoriaLogs API endpoint. By default this limit sets to 1000 lines, but it can be changed via query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674).

* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when `-promscrape.dropOriginalLabels` command-line flag is set. This issue has been introduced in [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0) when addressing [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): properly release memory during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4690).
Expand Down
2 changes: 2 additions & 0 deletions docs/VictoriaLogs/CHANGELOG.md
Expand Up @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta

## tip

* FEATURE: support the ability to limit the number of returned log entries from [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) by passing `limit` query arg. Previously all the matching log entries were returned until closing the response stream. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674). Thanks to @dmitryk-dk for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778).

## [v0.4.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.4.2-victorialogs)

Released at 2023-11-15
Expand Down
9 changes: 9 additions & 0 deletions docs/VictoriaLogs/querying/README.md
Expand Up @@ -44,6 +44,15 @@ See [LogsQL docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) for
The `query` arg must be properly encoded with [percent encoding](https://en.wikipedia.org/wiki/URL_encoding) when passing it to `curl`
or similar tools.

By default the `/select/logsql/query` returns all the log entries matching the given `query`. The response size can be limited in the following ways:

- By closing the response stream at any time. In this case VictoriaLogs stops query execution and frees all the resources occupied by the request.
- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following request returns
up to 10 matching log entries:
```sh
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
```

The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/),
where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`.
Example response:
Expand Down
3 changes: 1 addition & 2 deletions lib/logstorage/filters_test.go
Expand Up @@ -9226,7 +9226,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
resultColumnNames: []string{resultColumnName},
}
workersCount := 3
s.search(workersCount, so, nil, func(workerID uint, br *blockResult) bool {
s.search(workersCount, so, nil, func(workerID uint, br *blockResult) {
// Verify tenantID
if !br.streamID.tenantID.equal(&tenantID) {
t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID)
Expand All @@ -9248,7 +9248,6 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
if !reflect.DeepEqual(br.timestamps, expectedTimestamps) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps)
}
return false
})
}

Expand Down

0 comments on commit 0514091

Please sign in to comment.