Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/dataobj/internal/dataset/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
}
}

r.region.Record(StatReadCalls.Observe(1))
r.region.Record(xcap.StatDatasetReadCalls.Observe(1))
ctx = xcap.ContextWithRegion(ctx, r.region)

// Our Read implementation works by:
Expand Down Expand Up @@ -149,8 +149,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
primaryColumnBytes += s[i].Size()
}

r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes))
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))
} else {
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize])
if err != nil {
Expand Down Expand Up @@ -182,8 +182,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes)
}

r.region.Record(StatSecondaryRowsRead.Observe(int64(count)))
r.region.Record(StatSecondaryRowBytes.Observe(totalBytesFilled))
r.region.Record(xcap.StatDatasetSecondaryRowsRead.Observe(int64(count)))
r.region.Record(xcap.StatDatasetSecondaryRowBytes.Observe(totalBytesFilled))
}

// We only advance r.row after we successfully read and filled rows. This
Expand Down Expand Up @@ -268,8 +268,8 @@ func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int,
readSize = passCount
}

r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes))
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))

return rowsRead, passCount, nil
}
Expand Down Expand Up @@ -555,11 +555,11 @@ func (r *Reader) initDownloader(ctx context.Context) error {

if primary {
r.primaryColumnIndexes = append(r.primaryColumnIndexes, i)
r.region.Record(StatPrimaryColumns.Observe(1))
r.region.Record(StatPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
r.region.Record(xcap.StatDatasetPrimaryColumns.Observe(1))
r.region.Record(xcap.StatDatasetPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
} else {
r.region.Record(StatSecondaryColumns.Observe(1))
r.region.Record(StatSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
r.region.Record(xcap.StatDatasetSecondaryColumns.Observe(1))
r.region.Record(xcap.StatDatasetSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
}
}

Expand Down Expand Up @@ -593,8 +593,8 @@ func (r *Reader) initDownloader(ctx context.Context) error {
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
}

r.region.Record(StatMaxRows.Observe(int64(rowsCount)))
r.region.Record(StatRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))
r.region.Record(xcap.StatDatasetMaxRows.Observe(int64(rowsCount)))
r.region.Record(xcap.StatDatasetRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))

return nil
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/dataobj/internal/dataset/reader_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,13 @@ func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *reader
if region := xcap.RegionFromContext(ctx); region != nil {
for _, page := range batch {
if page.column.primary {
region.Record(StatPrimaryPagesDownloaded.Observe(1))
region.Record(StatPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
region.Record(StatPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
region.Record(xcap.StatDatasetPrimaryPagesDownloaded.Observe(1))
region.Record(xcap.StatDatasetPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
region.Record(xcap.StatDatasetPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
} else {
region.Record(StatSecondaryPagesDownloaded.Observe(1))
region.Record(StatSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
region.Record(StatSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
region.Record(xcap.StatDatasetSecondaryPagesDownloaded.Observe(1))
region.Record(xcap.StatDatasetSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize)))
region.Record(xcap.StatDatasetSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize)))
}
}
}
Expand Down Expand Up @@ -589,13 +589,13 @@ func (page *readerPage) PageDesc() *PageDesc {

func (page *readerPage) ReadPage(ctx context.Context) (PageData, error) {
region := xcap.RegionFromContext(ctx)
region.Record(StatPagesScanned.Observe(1))
region.Record(xcap.StatDatasetPagesScanned.Observe(1))
if page.data != nil {
region.Record(StatPagesFoundInCache.Observe(1))
region.Record(xcap.StatDatasetPagesFoundInCache.Observe(1))
return page.data, nil
}

region.Record(StatPageDownloadRequests.Observe(1))
region.Record(xcap.StatDatasetPageDownloadRequests.Observe(1))
if err := page.column.dl.downloadBatch(ctx, page); err != nil {
return nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/dataobj/internal/dataset/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,14 +917,14 @@ func Test_Reader_Stats(t *testing.T) {
obsMap[obs.Statistic.Name()] = obs.Value.(int64)
}

require.Equal(t, int64(2), obsMap[StatReadCalls.Name()])
require.Equal(t, int64(2), obsMap[StatPrimaryColumns.Name()])
require.Equal(t, int64(2), obsMap[StatSecondaryColumns.Name()])
require.Equal(t, int64(5), obsMap[StatPrimaryColumnPages.Name()])
require.Equal(t, int64(8), obsMap[StatSecondaryColumnPages.Name()])

require.Equal(t, int64(len(basicReaderTestData)), obsMap[StatMaxRows.Name()])
require.Equal(t, int64(3), obsMap[StatRowsAfterPruning.Name()])
require.Equal(t, int64(3), obsMap[StatPrimaryRowsRead.Name()])
require.Equal(t, int64(1), obsMap[StatSecondaryRowsRead.Name()])
require.Equal(t, int64(2), obsMap[xcap.StatDatasetReadCalls.Name()])
require.Equal(t, int64(2), obsMap[xcap.StatDatasetPrimaryColumns.Name()])
require.Equal(t, int64(2), obsMap[xcap.StatDatasetSecondaryColumns.Name()])
require.Equal(t, int64(5), obsMap[xcap.StatDatasetPrimaryColumnPages.Name()])
require.Equal(t, int64(8), obsMap[xcap.StatDatasetSecondaryColumnPages.Name()])

require.Equal(t, int64(len(basicReaderTestData)), obsMap[xcap.StatDatasetMaxRows.Name()])
require.Equal(t, int64(3), obsMap[xcap.StatDatasetRowsAfterPruning.Name()])
require.Equal(t, int64(3), obsMap[xcap.StatDatasetPrimaryRowsRead.Name()])
require.Equal(t, int64(1), obsMap[xcap.StatDatasetSecondaryRowsRead.Name()])
}
9 changes: 2 additions & 7 deletions pkg/dataobj/metastore/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
metastoreWindowSize = 12 * time.Hour
)

var (
statIndexObjects = xcap.NewStatisticInt64("metastore.index.objects", xcap.AggregationTypeSum)
statResolvedSections = xcap.NewStatisticInt64("metastore.resolved.sections", xcap.AggregationTypeSum)
)

type ObjectMetastore struct {
bucket objstore.Bucket
parallelism int
Expand Down Expand Up @@ -193,7 +188,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
}

m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
region.Record(statIndexObjects.Observe(int64(len(indexPaths))))
region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths))))

// Return early if no index files are found
if len(indexPaths) == 0 {
Expand Down Expand Up @@ -250,7 +245,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma
duration := sectionsTimer.ObserveDuration()
m.metrics.resolvedSectionsTotal.Observe(float64(len(streamSectionPointers)))
m.metrics.resolvedSectionsRatio.Observe(float64(len(streamSectionPointers)) / float64(initialSectionPointersCount))
region.Record(statResolvedSections.Observe(int64(len(streamSectionPointers))))
region.Record(xcap.StatMetastoreResolvedSections.Observe(int64(len(streamSectionPointers))))

level.Debug(utillog.WithContext(ctx, m.logger)).Log(
"msg", "resolved sections",
Expand Down
4 changes: 2 additions & 2 deletions pkg/dataobj/sections/internal/columnar/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (dec *Decoder) Pages(ctx context.Context, columns []*datasetmd.ColumnDesc)
region := xcap.RegionFromContext(ctx)
startTime := time.Now()
defer func() {
region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds()))
}()

ranges := make([]rangeio.Range, 0, len(columns))
Expand Down Expand Up @@ -135,7 +135,7 @@ func (dec *Decoder) ReadPages(ctx context.Context, pages []*datasetmd.PageDesc)
region := xcap.RegionFromContext(ctx)
startTime := time.Now()
defer func() {
region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds()))
region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds()))
}()

ranges := make([]rangeio.Range, 0, len(pages))
Expand Down
21 changes: 9 additions & 12 deletions pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/util/httpreq"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand Down Expand Up @@ -219,6 +218,11 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
"duration_full", durFull,
)

// Close the pipeline to calculate the stats.
pipeline.Close()

region.SetStatus(codes.Ok, "")

// explicitly call End() before exporting even though we have a defer above.
// It is safe to call End() multiple times.
region.End()
Expand All @@ -227,16 +231,9 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R
level.Error(logger).Log("msg", "failed to export capture as trace", "err", err)
}

// Close the pipeline to calculate the stats.
pipeline.Close()

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this queue time accurate? Could we pass it into the ToStatsSummary already and resolve one of the TODOs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i doubt it, in the code i see its being set by the old scheduler. I removed it intentionally to track it propery in a follow-up

statsCtx := stats.FromContext(ctx)
statsCtx.AddQuerierExecTime(durFull)
stats := statsCtx.Result(durFull, queueTime, builder.Len())
// TODO: capture and report queue time
md := metadata.FromContext(ctx)

region.SetStatus(codes.Ok, "")
stats := capture.ToStatsSummary(durFull, 0, builder.Len())
result := builder.Build(stats, md)

logql.RecordRangeAndInstantQueryMetrics(ctx, logger, params, strconv.Itoa(http.StatusOK), stats, result.Data)
Expand All @@ -245,14 +242,12 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R

// buildContext initializes a request-scoped context prior to execution.
func (e *Engine) buildContext(ctx context.Context) context.Context {
statsContext, ctx := stats.NewContext(ctx)
metadataContext, ctx := metadata.NewContext(ctx)

// Inject the range config into the context for any calls to
// [rangeio.ReadRanges] to make use of.
ctx = rangeio.WithConfig(ctx, &e.rangeConfig)

statsContext.SetQueryUsedV2Engine()
metadataContext.AddWarning("Query was executed using the new experimental query engine and dataobj storage.")
return ctx
}
Expand Down Expand Up @@ -467,5 +462,7 @@ func exportCapture(ctx context.Context, capture *xcap.Capture, plan *physical.Pl
return parentID, ok
})

xcap.ExportLog(capture, logger)

return xcap.ExportTrace(ctx, capture, logger)
}
2 changes: 1 addition & 1 deletion pkg/engine/internal/executor/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
return cmp.Compare(a.name, b.name)
})

region.Record(statCompatCollisionFound.Observe(true))
region.Record(xcap.StatCompatCollisionFound.Observe(true))

// Next, update the schema with the new columns that have the _extracted suffix.
newSchema := batch.Schema()
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/internal/executor/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,17 @@ func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error)
start := time.Now()

if p.region != nil {
p.region.Record(statReadCalls.Observe(1))
p.region.Record(xcap.StatPipelineReadCalls.Observe(1))
}

rec, err := p.inner.Read(ctx)

if p.region != nil {
if rec != nil {
p.region.Record(statRowsOut.Observe(rec.NumRows()))
p.region.Record(xcap.StatPipelineRowsOut.Observe(rec.NumRows()))
}

p.region.Record(statReadDuration.Observe(time.Since(start).Nanoseconds()))
p.region.Record(xcap.StatPipelineReadDuration.Observe(time.Since(start).Seconds()))
}

return rec, err
Expand Down
9 changes: 2 additions & 7 deletions pkg/engine/internal/worker/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/bucket"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
Expand Down Expand Up @@ -158,7 +157,6 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {
},
}

statsCtx, ctx := stats.NewContext(ctx)
ctx = user.InjectOrgID(ctx, job.Task.TenantID)

ctx, capture := xcap.NewCapture(ctx, nil)
Expand Down Expand Up @@ -199,8 +197,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {
level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err)
}

var totalRows int
totalRows, err = t.drainPipeline(ctx, pipeline, job, logger)
_, err = t.drainPipeline(ctx, pipeline, job, logger)
if err != nil {
level.Warn(logger).Log("msg", "task failed", "err", err)
_ = job.Scheduler.SendMessageAsync(ctx, wire.TaskStatusMessage{
Expand Down Expand Up @@ -229,16 +226,14 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {
// to finalize the capture before it's included in the TaskStatusMessage.
capture.End()

// TODO(rfratto): We should find a way to expose queue time here.
result := statsCtx.Result(time.Since(startTime), 0, totalRows)
level.Info(logger).Log("msg", "task completed", "duration", time.Since(startTime))

// Wait for the scheduler to confirm the task has completed before
// requesting a new one. This allows the scheduler to update its bookkeeping
// for how many threads have capacity for requesting tasks.
err = job.Scheduler.SendMessage(ctx, wire.TaskStatusMessage{
ID: job.Task.ULID,
Status: workflow.TaskStatus{State: workflow.TaskStateCompleted, Statistics: &result, Capture: capture},
Status: workflow.TaskStatus{State: workflow.TaskStateCompleted, Capture: capture},
})
if err != nil {
level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err)
Expand Down
18 changes: 5 additions & 13 deletions pkg/storage/bucket/xcap_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ import (
"github.com/grafana/loki/v3/pkg/xcap"
)

// Statistics for tracking bucket operation counts.
var (
statBucketGet = xcap.NewStatisticInt64("bucket.get", xcap.AggregationTypeSum)
statBucketGetRange = xcap.NewStatisticInt64("bucket.getrange", xcap.AggregationTypeSum)
statBucketIter = xcap.NewStatisticInt64("bucket.iter", xcap.AggregationTypeSum)
statBucketAttributes = xcap.NewStatisticInt64("bucket.attributes", xcap.AggregationTypeSum)
)

// XCapBucket wraps an objstore.Bucket and records request counts to the xcap
// Region found in the context. If no Region is present in the context, the
// wrapper simply delegates to the underlying bucket without recording.
Expand Down Expand Up @@ -51,13 +43,13 @@ func (b *XCapBucket) Close() error {

// Iter calls f for each entry in the given directory (not recursive.).
func (b *XCapBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
recordOp(ctx, statBucketIter)
recordOp(ctx, xcap.StatBucketIter)
return b.bkt.Iter(ctx, dir, f, options...)
}

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
func (b *XCapBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
recordOp(ctx, statBucketIter)
recordOp(ctx, xcap.StatBucketIter)
return b.bkt.IterWithAttributes(ctx, dir, f, options...)
}

Expand All @@ -68,13 +60,13 @@ func (b *XCapBucket) SupportedIterOptions() []objstore.IterOptionType {

// Get returns a reader for the given object name.
func (b *XCapBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
recordOp(ctx, statBucketGet)
recordOp(ctx, xcap.StatBucketGet)
return b.bkt.Get(ctx, name)
}

// GetRange returns a new range reader for the given object name and range.
func (b *XCapBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
recordOp(ctx, statBucketGetRange)
recordOp(ctx, xcap.StatBucketGetRange)
return b.bkt.GetRange(ctx, name, off, length)
}

Expand All @@ -100,7 +92,7 @@ func (b *XCapBucket) IsAccessDeniedErr(err error) bool {

// Attributes returns information about the specified object.
func (b *XCapBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
recordOp(ctx, statBucketAttributes)
recordOp(ctx, xcap.StatBucketAttributes)
return b.bkt.Attributes(ctx, name)
}

Expand Down
Loading
Loading