Skip to content

Commit

Permalink
lib/logstorage: work-in-progress
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed May 14, 2024
1 parent cb35e62 commit da3af09
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 369 deletions.
47 changes: 47 additions & 0 deletions app/vlselect/logsql/buffered_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package logsql

import (
"bufio"
"io"
"sync"
)

func getBufferedWriter(w io.Writer) *bufferedWriter {
v := bufferedWriterPool.Get()
if v == nil {
return &bufferedWriter{
bw: bufio.NewWriter(w),
}
}
bw := v.(*bufferedWriter)
bw.bw.Reset(w)
return bw
}

func putBufferedWriter(bw *bufferedWriter) {
bw.reset()
bufferedWriterPool.Put(bw)
}

var bufferedWriterPool sync.Pool

type bufferedWriter struct {
mu sync.Mutex
bw *bufio.Writer
}

func (bw *bufferedWriter) reset() {
// nothing to do
}

func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) {
bw.mu.Lock()
_, _ = bw.bw.Write(p)
bw.mu.Unlock()
}

func (bw *bufferedWriter) FlushIgnoreErrors() {
bw.mu.Lock()
_ = bw.bw.Flush()
bw.mu.Unlock()
}
28 changes: 9 additions & 19 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)

var (
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
"if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+
"too big value for this flag may result in high memory usage since the sorting is performed in memory")
)

// ProcessQueryRequest handles /select/logsql/query request.
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// Extract tenantID
Expand All @@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")

sw := getSortWriter()
sw.Init(w, maxSortBufferSize.IntN(), limit)
if limit > 0 {
q.AddPipeLimit(uint64(limit))
}

tenantIDs := []logstorage.TenantID{tenantID}

ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
bw := getBufferedWriter(w)

writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 {
Expand All @@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
for i := range timestamps {
WriteJSONRow(bb, columns, i)
}

if !sw.TryWrite(bb.B) {
cancel()
}

bw.WriteIgnoreErrors(bb.B)
blockResultPool.Put(bb)
}

err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)

sw.FinalFlush()
putSortWriter(sw)
bw.FlushIgnoreErrors()
putBufferedWriter(bw)

if err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err)
Expand Down
290 changes: 0 additions & 290 deletions app/vlselect/logsql/sort_writer.go

This file was deleted.

Loading

0 comments on commit da3af09

Please sign in to comment.