diff --git a/pkg/dataobj/internal/dataset/reader.go b/pkg/dataobj/internal/dataset/reader.go index ea2ae3dfd567a..b9da611971bd9 100644 --- a/pkg/dataobj/internal/dataset/reader.go +++ b/pkg/dataobj/internal/dataset/reader.go @@ -75,7 +75,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) { } } - r.region.Record(StatReadCalls.Observe(1)) + r.region.Record(xcap.StatDatasetReadCalls.Observe(1)) ctx = xcap.ContextWithRegion(ctx, r.region) // Our Read implementation works by: @@ -149,8 +149,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) { primaryColumnBytes += s[i].Size() } - r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead))) - r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes)) + r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead))) + r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes)) } else { rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize]) if err != nil { @@ -182,8 +182,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (int, error) { totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes) } - r.region.Record(StatSecondaryRowsRead.Observe(int64(count))) - r.region.Record(StatSecondaryRowBytes.Observe(totalBytesFilled)) + r.region.Record(xcap.StatDatasetSecondaryRowsRead.Observe(int64(count))) + r.region.Record(xcap.StatDatasetSecondaryRowBytes.Observe(totalBytesFilled)) } // We only advance r.row after we successfully read and filled rows. This @@ -268,8 +268,8 @@ func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, readSize = passCount } - r.region.Record(StatPrimaryRowsRead.Observe(int64(rowsRead))) - r.region.Record(StatPrimaryRowBytes.Observe(primaryColumnBytes)) + r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead))) + r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes)) return rowsRead, passCount, nil } @@ -555,11 +555,11 @@ func (r *Reader) initDownloader(ctx context.Context) error { if primary { r.primaryColumnIndexes = append(r.primaryColumnIndexes, i) - r.region.Record(StatPrimaryColumns.Observe(1)) - r.region.Record(StatPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount))) + r.region.Record(xcap.StatDatasetPrimaryColumns.Observe(1)) + r.region.Record(xcap.StatDatasetPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount))) } else { - r.region.Record(StatSecondaryColumns.Observe(1)) - r.region.Record(StatSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount))) + r.region.Record(xcap.StatDatasetSecondaryColumns.Observe(1)) + r.region.Record(xcap.StatDatasetSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount))) } } @@ -593,8 +593,8 @@ func (r *Reader) initDownloader(ctx context.Context) error { rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount)) } - r.region.Record(StatMaxRows.Observe(int64(rowsCount))) - r.region.Record(StatRowsAfterPruning.Observe(int64(ranges.TotalRowCount()))) + r.region.Record(xcap.StatDatasetMaxRows.Observe(int64(rowsCount))) + r.region.Record(xcap.StatDatasetRowsAfterPruning.Observe(int64(ranges.TotalRowCount()))) return nil } diff --git a/pkg/dataobj/internal/dataset/reader_downloader.go b/pkg/dataobj/internal/dataset/reader_downloader.go index e2d870932a43c..b0f5d1cbc0254 100644 --- a/pkg/dataobj/internal/dataset/reader_downloader.go +++ b/pkg/dataobj/internal/dataset/reader_downloader.go @@ -232,13 +232,13 @@ func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *reader if region := xcap.RegionFromContext(ctx); region != nil { for _, page := range batch { if page.column.primary { - region.Record(StatPrimaryPagesDownloaded.Observe(1)) - region.Record(StatPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize))) - region.Record(StatPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize))) + region.Record(xcap.StatDatasetPrimaryPagesDownloaded.Observe(1)) + region.Record(xcap.StatDatasetPrimaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize))) + region.Record(xcap.StatDatasetPrimaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize))) } else { - region.Record(StatSecondaryPagesDownloaded.Observe(1)) - region.Record(StatSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize))) - region.Record(StatSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize))) + region.Record(xcap.StatDatasetSecondaryPagesDownloaded.Observe(1)) + region.Record(xcap.StatDatasetSecondaryColumnBytes.Observe(int64(page.inner.PageDesc().CompressedSize))) + region.Record(xcap.StatDatasetSecondaryColumnUncompressedBytes.Observe(int64(page.inner.PageDesc().UncompressedSize))) } } } @@ -589,13 +589,13 @@ func (page *readerPage) PageDesc() *PageDesc { func (page *readerPage) ReadPage(ctx context.Context) (PageData, error) { region := xcap.RegionFromContext(ctx) - region.Record(StatPagesScanned.Observe(1)) + region.Record(xcap.StatDatasetPagesScanned.Observe(1)) if page.data != nil { - region.Record(StatPagesFoundInCache.Observe(1)) + region.Record(xcap.StatDatasetPagesFoundInCache.Observe(1)) return page.data, nil } - region.Record(StatPageDownloadRequests.Observe(1)) + region.Record(xcap.StatDatasetPageDownloadRequests.Observe(1)) if err := page.column.dl.downloadBatch(ctx, page); err != nil { return nil, err } diff --git a/pkg/dataobj/internal/dataset/reader_test.go b/pkg/dataobj/internal/dataset/reader_test.go index a0fc57a95d6d8..a304d7253f6fd 100644 --- a/pkg/dataobj/internal/dataset/reader_test.go +++ b/pkg/dataobj/internal/dataset/reader_test.go @@ -917,14 +917,14 @@ func Test_Reader_Stats(t *testing.T) { obsMap[obs.Statistic.Name()] = obs.Value.(int64) } - require.Equal(t, int64(2), obsMap[StatReadCalls.Name()]) - require.Equal(t, int64(2), obsMap[StatPrimaryColumns.Name()]) - require.Equal(t, int64(2), obsMap[StatSecondaryColumns.Name()]) - require.Equal(t, int64(5), obsMap[StatPrimaryColumnPages.Name()]) - require.Equal(t, int64(8), obsMap[StatSecondaryColumnPages.Name()]) - - require.Equal(t, int64(len(basicReaderTestData)), obsMap[StatMaxRows.Name()]) - require.Equal(t, int64(3), obsMap[StatRowsAfterPruning.Name()]) - require.Equal(t, int64(3), obsMap[StatPrimaryRowsRead.Name()]) - require.Equal(t, int64(1), obsMap[StatSecondaryRowsRead.Name()]) + require.Equal(t, int64(2), obsMap[xcap.StatDatasetReadCalls.Name()]) + require.Equal(t, int64(2), obsMap[xcap.StatDatasetPrimaryColumns.Name()]) + require.Equal(t, int64(2), obsMap[xcap.StatDatasetSecondaryColumns.Name()]) + require.Equal(t, int64(5), obsMap[xcap.StatDatasetPrimaryColumnPages.Name()]) + require.Equal(t, int64(8), obsMap[xcap.StatDatasetSecondaryColumnPages.Name()]) + + require.Equal(t, int64(len(basicReaderTestData)), obsMap[xcap.StatDatasetMaxRows.Name()]) + require.Equal(t, int64(3), obsMap[xcap.StatDatasetRowsAfterPruning.Name()]) + require.Equal(t, int64(3), obsMap[xcap.StatDatasetPrimaryRowsRead.Name()]) + require.Equal(t, int64(1), obsMap[xcap.StatDatasetSecondaryRowsRead.Name()]) } diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index cb0901657c1a5..4d8662f7a9e0a 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -39,11 +39,6 @@ const ( metastoreWindowSize = 12 * time.Hour ) -var ( - statIndexObjects = xcap.NewStatisticInt64("metastore.index.objects", xcap.AggregationTypeSum) - statResolvedSections = xcap.NewStatisticInt64("metastore.resolved.sections", xcap.AggregationTypeSum) -) - type ObjectMetastore struct { bucket objstore.Bucket parallelism int @@ -193,7 +188,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma } m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths))) - region.Record(statIndexObjects.Observe(int64(len(indexPaths)))) + region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths)))) // Return early if no index files are found if len(indexPaths) == 0 { @@ -250,7 +245,7 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma duration := sectionsTimer.ObserveDuration() m.metrics.resolvedSectionsTotal.Observe(float64(len(streamSectionPointers))) m.metrics.resolvedSectionsRatio.Observe(float64(len(streamSectionPointers)) / float64(initialSectionPointersCount)) - region.Record(statResolvedSections.Observe(int64(len(streamSectionPointers)))) + region.Record(xcap.StatMetastoreResolvedSections.Observe(int64(len(streamSectionPointers)))) level.Debug(utillog.WithContext(ctx, m.logger)).Log( "msg", "resolved sections", diff --git a/pkg/dataobj/sections/internal/columnar/decoder.go b/pkg/dataobj/sections/internal/columnar/decoder.go index 8488ac8e7c06f..8ff85c014901f 100644 --- a/pkg/dataobj/sections/internal/columnar/decoder.go +++ b/pkg/dataobj/sections/internal/columnar/decoder.go @@ -79,7 +79,7 @@ func (dec *Decoder) Pages(ctx context.Context, columns []*datasetmd.ColumnDesc) region := xcap.RegionFromContext(ctx) startTime := time.Now() defer func() { - region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds())) + region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds())) }() ranges := make([]rangeio.Range, 0, len(columns)) @@ -135,7 +135,7 @@ func (dec *Decoder) ReadPages(ctx context.Context, pages []*datasetmd.PageDesc) region := xcap.RegionFromContext(ctx) startTime := time.Now() defer func() { - region.Record(dataset.StatPageDownloadTime.Observe(time.Since(startTime).Nanoseconds())) + region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds())) }() ranges := make([]rangeio.Range, 0, len(pages)) diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index f1744f30208a9..f72e0d26ef1f7 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -29,7 +29,6 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -219,6 +218,11 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R "duration_full", durFull, ) + // Close the pipeline to calculate the stats. + pipeline.Close() + + region.SetStatus(codes.Ok, "") + // explicitly call End() before exporting even though we have a defer above. // It is safe to call End() multiple times. region.End() @@ -227,16 +231,9 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R level.Error(logger).Log("msg", "failed to export capture as trace", "err", err) } - // Close the pipeline to calculate the stats. - pipeline.Close() - - queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) - statsCtx := stats.FromContext(ctx) - statsCtx.AddQuerierExecTime(durFull) - stats := statsCtx.Result(durFull, queueTime, builder.Len()) + // TODO: capture and report queue time md := metadata.FromContext(ctx) - - region.SetStatus(codes.Ok, "") + stats := capture.ToStatsSummary(durFull, 0, builder.Len()) result := builder.Build(stats, md) logql.RecordRangeAndInstantQueryMetrics(ctx, logger, params, strconv.Itoa(http.StatusOK), stats, result.Data) @@ -245,14 +242,12 @@ func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.R // buildContext initializes a request-scoped context prior to execution. func (e *Engine) buildContext(ctx context.Context) context.Context { - statsContext, ctx := stats.NewContext(ctx) metadataContext, ctx := metadata.NewContext(ctx) // Inject the range config into the context for any calls to // [rangeio.ReadRanges] to make use of. ctx = rangeio.WithConfig(ctx, &e.rangeConfig) - statsContext.SetQueryUsedV2Engine() metadataContext.AddWarning("Query was executed using the new experimental query engine and dataobj storage.") return ctx } @@ -467,5 +462,7 @@ func exportCapture(ctx context.Context, capture *xcap.Capture, plan *physical.Pl return parentID, ok }) + xcap.ExportLog(capture, logger) + return xcap.ExportTrace(ctx, capture, logger) } diff --git a/pkg/engine/internal/executor/compat.go b/pkg/engine/internal/executor/compat.go index 4b81e4225c4f8..d4470b263ee42 100644 --- a/pkg/engine/internal/executor/compat.go +++ b/pkg/engine/internal/executor/compat.go @@ -81,7 +81,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin return cmp.Compare(a.name, b.name) }) - region.Record(statCompatCollisionFound.Observe(true)) + region.Record(xcap.StatCompatCollisionFound.Observe(true)) // Next, update the schema with the new columns that have the _extracted suffix. newSchema := batch.Schema() diff --git a/pkg/engine/internal/executor/pipeline.go b/pkg/engine/internal/executor/pipeline.go index 14f4c7770e948..23fea975ffa17 100644 --- a/pkg/engine/internal/executor/pipeline.go +++ b/pkg/engine/internal/executor/pipeline.go @@ -337,17 +337,17 @@ func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) start := time.Now() if p.region != nil { - p.region.Record(statReadCalls.Observe(1)) + p.region.Record(xcap.StatPipelineReadCalls.Observe(1)) } rec, err := p.inner.Read(ctx) if p.region != nil { if rec != nil { - p.region.Record(statRowsOut.Observe(rec.NumRows())) + p.region.Record(xcap.StatPipelineRowsOut.Observe(rec.NumRows())) } - p.region.Record(statReadDuration.Observe(time.Since(start).Nanoseconds())) + p.region.Record(xcap.StatPipelineReadDuration.Observe(time.Since(start).Seconds())) } return rec, err diff --git a/pkg/engine/internal/worker/thread.go b/pkg/engine/internal/worker/thread.go index bc230525e5be9..e1325123e0ec4 100644 --- a/pkg/engine/internal/worker/thread.go +++ b/pkg/engine/internal/worker/thread.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" "github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire" "github.com/grafana/loki/v3/pkg/engine/internal/workflow" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/bucket" utillog "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/xcap" @@ -158,7 +157,6 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) { }, } - statsCtx, ctx := stats.NewContext(ctx) ctx = user.InjectOrgID(ctx, job.Task.TenantID) ctx, capture := xcap.NewCapture(ctx, nil) @@ -199,8 +197,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) { level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err) } - var totalRows int - totalRows, err = t.drainPipeline(ctx, pipeline, job, logger) + _, err = t.drainPipeline(ctx, pipeline, job, logger) if err != nil { level.Warn(logger).Log("msg", "task failed", "err", err) _ = job.Scheduler.SendMessageAsync(ctx, wire.TaskStatusMessage{ @@ -229,8 +226,6 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) { // to finalize the capture before it's included in the TaskStatusMessage. capture.End() - // TODO(rfratto): We should find a way to expose queue time here. - result := statsCtx.Result(time.Since(startTime), 0, totalRows) level.Info(logger).Log("msg", "task completed", "duration", time.Since(startTime)) // Wait for the scheduler to confirm the task has completed before @@ -238,7 +233,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) { // for how many threads have capacity for requesting tasks. err = job.Scheduler.SendMessage(ctx, wire.TaskStatusMessage{ ID: job.Task.ULID, - Status: workflow.TaskStatus{State: workflow.TaskStateCompleted, Statistics: &result, Capture: capture}, + Status: workflow.TaskStatus{State: workflow.TaskStateCompleted, Capture: capture}, }) if err != nil { level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err) diff --git a/pkg/storage/bucket/xcap_bucket.go b/pkg/storage/bucket/xcap_bucket.go index 26f3eca8d4447..687d6cd5bcc80 100644 --- a/pkg/storage/bucket/xcap_bucket.go +++ b/pkg/storage/bucket/xcap_bucket.go @@ -9,14 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/xcap" ) -// Statistics for tracking bucket operation counts. -var ( - statBucketGet = xcap.NewStatisticInt64("bucket.get", xcap.AggregationTypeSum) - statBucketGetRange = xcap.NewStatisticInt64("bucket.getrange", xcap.AggregationTypeSum) - statBucketIter = xcap.NewStatisticInt64("bucket.iter", xcap.AggregationTypeSum) - statBucketAttributes = xcap.NewStatisticInt64("bucket.attributes", xcap.AggregationTypeSum) -) - // XCapBucket wraps an objstore.Bucket and records request counts to the xcap // Region found in the context. If no Region is present in the context, the // wrapper simply delegates to the underlying bucket without recording. @@ -51,13 +43,13 @@ func (b *XCapBucket) Close() error { // Iter calls f for each entry in the given directory (not recursive.). func (b *XCapBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { - recordOp(ctx, statBucketIter) + recordOp(ctx, xcap.StatBucketIter) return b.bkt.Iter(ctx, dir, f, options...) } // IterWithAttributes calls f for each entry in the given directory similar to Iter. func (b *XCapBucket) IterWithAttributes(ctx context.Context, dir string, f func(objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { - recordOp(ctx, statBucketIter) + recordOp(ctx, xcap.StatBucketIter) return b.bkt.IterWithAttributes(ctx, dir, f, options...) } @@ -68,13 +60,13 @@ func (b *XCapBucket) SupportedIterOptions() []objstore.IterOptionType { // Get returns a reader for the given object name. func (b *XCapBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - recordOp(ctx, statBucketGet) + recordOp(ctx, xcap.StatBucketGet) return b.bkt.Get(ctx, name) } // GetRange returns a new range reader for the given object name and range. func (b *XCapBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - recordOp(ctx, statBucketGetRange) + recordOp(ctx, xcap.StatBucketGetRange) return b.bkt.GetRange(ctx, name, off, length) } @@ -100,7 +92,7 @@ func (b *XCapBucket) IsAccessDeniedErr(err error) bool { // Attributes returns information about the specified object. func (b *XCapBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { - recordOp(ctx, statBucketAttributes) + recordOp(ctx, xcap.StatBucketAttributes) return b.bkt.Attributes(ctx, name) } diff --git a/pkg/util/rangeio/rangeio.go b/pkg/util/rangeio/rangeio.go index 613f05c0d23c5..bc577a78bd380 100644 --- a/pkg/util/rangeio/rangeio.go +++ b/pkg/util/rangeio/rangeio.go @@ -24,16 +24,6 @@ import ( "github.com/grafana/loki/v3/pkg/xcap" ) -// xcap statistics for RangeIO operations. -var ( - statInputRangesCount = xcap.NewStatisticInt64("input.ranges", xcap.AggregationTypeSum) - statInputRangesSize = xcap.NewStatisticInt64("input.ranges.size.bytes", xcap.AggregationTypeSum) - statOptimizedRangesCount = xcap.NewStatisticInt64("optimized.ranges", xcap.AggregationTypeSum) - statOptimizedRangesSize = xcap.NewStatisticInt64("optimized.ranges.size.bytes", xcap.AggregationTypeSum) - // pick the min value when aggregating to track the slowest read. - statOptimizedThroughput = xcap.NewStatisticFloat64("optimized.ranges.min.throughput", xcap.AggregationTypeMin) -) - // Range represents a range of data to be read. type Range struct { // Data to read into; exactly len(Data) bytes will be read, or an error will @@ -439,16 +429,16 @@ func recordRangeStats(ranges, optimizedRanges []Range, region *xcap.Region) { origSize := rangesSize(ranges) optimizedSize := rangesSize(optimizedRanges) - region.Record(statInputRangesCount.Observe(int64(len(ranges)))) - region.Record(statInputRangesSize.Observe(int64(origSize))) - region.Record(statOptimizedRangesCount.Observe(int64(len(optimizedRanges)))) - region.Record(statOptimizedRangesSize.Observe(int64(optimizedSize))) + region.Record(xcap.StatRangeIOInputCount.Observe(int64(len(ranges)))) + region.Record(xcap.StatRangeIOInputSize.Observe(int64(origSize))) + region.Record(xcap.StatRangeIOOptimizedCount.Observe(int64(len(optimizedRanges)))) + region.Record(xcap.StatRangeIOOptimizedSize.Observe(int64(optimizedSize))) } func recordThroughputStat(region *xcap.Region, startTime time.Time, optimizedRanges []Range) { size := rangesSize(optimizedRanges) bytesPerSec := float64(size) / time.Since(startTime).Seconds() - region.Record(statOptimizedThroughput.Observe(bytesPerSec)) + region.Record(xcap.StatRangeIOThroughput.Observe(bytesPerSec)) } type bytesStringer uint64 diff --git a/pkg/xcap/aggregation.go b/pkg/xcap/aggregation.go index fab8b30cbafca..31826762e2d2f 100644 --- a/pkg/xcap/aggregation.go +++ b/pkg/xcap/aggregation.go @@ -10,10 +10,21 @@ type AggregatedObservation struct { // Record aggregates a new observation into this aggregated observation. // It updates the value according to the statistic's aggregation type. func (a *AggregatedObservation) Record(obs Observation) { - stat := obs.statistic() - val := obs.value() + a.aggregate(obs.statistic().Aggregation(), obs.value()) + a.Count++ +} + +// Merge aggregates another AggregatedObservation into this one. +func (a *AggregatedObservation) Merge(other *AggregatedObservation) { + if other == nil { + return + } + a.aggregate(a.Statistic.Aggregation(), other.Value) + a.Count += other.Count +} - switch stat.Aggregation() { +func (a *AggregatedObservation) aggregate(aggType AggregationType, val any) { + switch aggType { case AggregationTypeSum: switch v := val.(type) { case int64: @@ -21,7 +32,6 @@ func (a *AggregatedObservation) Record(obs Observation) { case float64: a.Value = a.Value.(float64) + v } - case AggregationTypeMin: switch v := val.(type) { case int64: @@ -33,7 +43,6 @@ func (a *AggregatedObservation) Record(obs Observation) { a.Value = v } } - case AggregationTypeMax: switch v := val.(type) { case int64: @@ -50,16 +59,36 @@ func (a *AggregatedObservation) Record(obs Observation) { a.Value = v } } - case AggregationTypeLast: - // Last value overwrites a.Value = val - case AggregationTypeFirst: + // Keep the first value, don't update if a.Value == nil { a.Value = val } } +} - a.Count++ +func (a *AggregatedObservation) Int64() (int64, bool) { + if a.Statistic.DataType() != DataTypeInt64 { + return 0, false + } + + return a.Value.(int64), true +} + +func (a *AggregatedObservation) Float64() (float64, bool) { + if a.Statistic.DataType() != DataTypeFloat64 { + return 0, false + } + + return a.Value.(float64), true +} + +func (a *AggregatedObservation) Bool() bool { + if a.Statistic.DataType() != DataTypeBool { + return false + } + + return a.Value.(bool) } diff --git a/pkg/xcap/exporter.go b/pkg/xcap/exporter.go index 523c3ed568dbd..bb68eab72bb0b 100644 --- a/pkg/xcap/exporter.go +++ b/pkg/xcap/exporter.go @@ -115,3 +115,77 @@ func observationToAttribute(key StatisticKey, obs *AggregatedObservation) attrib // Fallback: convert to string return attrKey.String(fmt.Sprintf("%v", obs.Value)) } + +// ExportLog exports a Capture as a structured log line with aggregated statistics. +func ExportLog(capture *Capture, logger log.Logger) { + if capture == nil || logger == nil { + return + } + + summary := summarizeObservations(capture) + level.Info(logger).Log(summary.toLogValues()...) +} + +// summarizeObservations collects and summarizes observations from the capture. +func summarizeObservations(capture *Capture) *observations { + if capture == nil { + return nil + } + + collect := newObservationCollector(capture) + result := newObservations() + + // collect observations from all DataObjScan regions. observations from + // child regions are rolled-up to include dataset reader and bucket stats. + // streamView is excluded as it is handled separately below. + result.merge( + collect.fromRegions("DataObjScan", true, "streamsView.init"). + filter( + // object store calls + StatBucketGet.Key(), StatBucketGetRange.Key(), StatBucketAttributes.Key(), + // dataset reader stats + StatDatasetMaxRows.Key(), StatDatasetRowsAfterPruning.Key(), StatDatasetReadCalls.Key(), + StatDatasetPrimaryPagesDownloaded.Key(), StatDatasetSecondaryPagesDownloaded.Key(), + StatDatasetPrimaryColumnBytes.Key(), StatDatasetSecondaryColumnBytes.Key(), + StatDatasetPrimaryRowsRead.Key(), StatDatasetSecondaryRowsRead.Key(), + StatDatasetPrimaryRowBytes.Key(), StatDatasetSecondaryRowBytes.Key(), + StatDatasetPagesScanned.Key(), StatDatasetPagesFoundInCache.Key(), + StatDatasetPageDownloadRequests.Key(), StatDatasetPageDownloadTime.Key(), + ). + prefix("logs_dataset_"). + normalizeKeys(), + ) + + // metastore index and resolved section stats + result.merge( + collect.fromRegions("ObjectMetastore.Sections", true). + filter(StatMetastoreIndexObjects.Key(), StatMetastoreResolvedSections.Key()). + normalizeKeys(), + ) + + // metastore bucket and dataset reader stats + result.merge( + collect.fromRegions("ObjectMetastore.Sections", true). + filter( + StatBucketGet.Key(), StatBucketGetRange.Key(), StatBucketAttributes.Key(), + StatDatasetPrimaryPagesDownloaded.Key(), StatDatasetSecondaryPagesDownloaded.Key(), + StatDatasetPrimaryColumnBytes.Key(), StatDatasetSecondaryColumnBytes.Key(), + ). + prefix("metastore_"). + normalizeKeys(), + ) + + // streamsView bucket and dataset reader stats + result.merge( + collect.fromRegions("streamsView.init", true). + filter( + StatBucketGet.Key(), StatBucketGetRange.Key(), StatBucketAttributes.Key(), + StatDatasetPrimaryPagesDownloaded.Key(), StatDatasetSecondaryPagesDownloaded.Key(), + StatDatasetPrimaryColumnBytes.Key(), StatDatasetSecondaryColumnBytes.Key(), + ). + prefix("streams_"). + normalizeKeys(), + ) + + return result +} diff --git a/pkg/xcap/stats_definitions.go b/pkg/xcap/stats_definitions.go new file mode 100644 index 0000000000000..46fa31774a11a --- /dev/null +++ b/pkg/xcap/stats_definitions.go @@ -0,0 +1,69 @@ +package xcap + +// Common pipeline statistics tracked across executor nodes. +var ( + StatPipelineRowsOut = NewStatisticInt64("rows.out", AggregationTypeSum) + StatPipelineReadCalls = NewStatisticInt64("read.calls", AggregationTypeSum) + StatPipelineReadDuration = NewStatisticFloat64("read.duration", AggregationTypeSum) +) + +// ColumnCompat statistics. +var ( + StatCompatCollisionFound = NewStatisticFlag("collision.found") +) + +var ( + // Dataset column statistics. + StatDatasetPrimaryColumns = NewStatisticInt64("primary.columns", AggregationTypeSum) + StatDatasetSecondaryColumns = NewStatisticInt64("secondary.columns", AggregationTypeSum) + StatDatasetPrimaryColumnPages = NewStatisticInt64("primary.column.pages", AggregationTypeSum) + StatDatasetSecondaryColumnPages = NewStatisticInt64("secondary.column.pages", AggregationTypeSum) + + // Dataset row statistics. + StatDatasetMaxRows = NewStatisticInt64("rows.max", AggregationTypeSum) + StatDatasetRowsAfterPruning = NewStatisticInt64("rows.after.pruning", AggregationTypeSum) + StatDatasetPrimaryRowsRead = NewStatisticInt64("primary.rows.read", AggregationTypeSum) + StatDatasetSecondaryRowsRead = NewStatisticInt64("secondary.rows.read", AggregationTypeSum) + StatDatasetPrimaryRowBytes = NewStatisticInt64("primary.row.read.bytes", AggregationTypeSum) + StatDatasetSecondaryRowBytes = NewStatisticInt64("secondary.row.read.bytes", AggregationTypeSum) + + // Dataset page scan statistics. + StatDatasetPagesScanned = NewStatisticInt64("pages.scanned", AggregationTypeSum) + StatDatasetPagesFoundInCache = NewStatisticInt64("pages.cache.hit", AggregationTypeSum) + StatDatasetPageDownloadRequests = NewStatisticInt64("pages.download.requests", AggregationTypeSum) + StatDatasetPageDownloadTime = NewStatisticFloat64("pages.download.duration", AggregationTypeSum) + + // Dataset page download byte statistics. + StatDatasetPrimaryPagesDownloaded = NewStatisticInt64("primary.pages.downloaded", AggregationTypeSum) + StatDatasetSecondaryPagesDownloaded = NewStatisticInt64("secondary.pages.downloaded", AggregationTypeSum) + StatDatasetPrimaryColumnBytes = NewStatisticInt64("primary.pages.compressed.bytes", AggregationTypeSum) + StatDatasetSecondaryColumnBytes = NewStatisticInt64("secondary.pages.compressed.bytes", AggregationTypeSum) + StatDatasetPrimaryColumnUncompressedBytes = NewStatisticInt64("primary.column.uncompressed.bytes", AggregationTypeSum) + StatDatasetSecondaryColumnUncompressedBytes = NewStatisticInt64("secondary.column.uncompressed.bytes", AggregationTypeSum) + + // Dataset read operation statistics. + StatDatasetReadCalls = NewStatisticInt64("dataset.read.calls", AggregationTypeSum) +) + +// Range IO statistics. +var ( + StatRangeIOInputCount = NewStatisticInt64("input.ranges", AggregationTypeSum) + StatRangeIOInputSize = NewStatisticInt64("input.ranges.size.bytes", AggregationTypeSum) + StatRangeIOOptimizedCount = NewStatisticInt64("optimized.ranges", AggregationTypeSum) + StatRangeIOOptimizedSize = NewStatisticInt64("optimized.ranges.size.bytes", AggregationTypeSum) + StatRangeIOThroughput = NewStatisticFloat64("optimized.ranges.min.throughput", AggregationTypeMin) +) + +// Bucket operation statistics. +var ( + StatBucketGet = NewStatisticInt64("bucket.get", AggregationTypeSum) + StatBucketGetRange = NewStatisticInt64("bucket.getrange", AggregationTypeSum) + StatBucketIter = NewStatisticInt64("bucket.iter", AggregationTypeSum) + StatBucketAttributes = NewStatisticInt64("bucket.attributes", AggregationTypeSum) +) + +// Metastore statistics. +var ( + StatMetastoreIndexObjects = NewStatisticInt64("metastore.index.objects", AggregationTypeSum) + StatMetastoreResolvedSections = NewStatisticInt64("metastore.resolved.sections", AggregationTypeSum) +) diff --git a/pkg/xcap/summary.go b/pkg/xcap/summary.go new file mode 100644 index 0000000000000..a7e36fb539ca9 --- /dev/null +++ b/pkg/xcap/summary.go @@ -0,0 +1,312 @@ +package xcap + +import ( + "sort" + "strings" + "time" + + "github.com/dustin/go-humanize" + + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" +) + +// observations holds aggregated observations that can be transformed and merged. +// +// All transformation methods (filter, prefix, normalizeKeys) return new instances, +// leaving the original unchanged. +type observations struct { + data map[StatisticKey]*AggregatedObservation +} + +// newObservations creates an empty observations. +func newObservations() *observations { + return &observations{data: make(map[StatisticKey]*AggregatedObservation)} +} + +// filter returns a new observations containing only entries with matching stat keys. +func (o *observations) filter(keys ...StatisticKey) *observations { + if len(keys) == 0 || o == nil { + return o + } + + keySet := make(map[StatisticKey]struct{}, len(keys)) + for _, k := range keys { + keySet[k] = struct{}{} + } + + result := newObservations() + for k, obs := range o.data { + if _, ok := keySet[k]; ok { + result.data[k] = obs + } + } + return result +} + +// prefix returns a new observations with all stat names prefixed. +func (o *observations) prefix(p string) *observations { + if p == "" || o == nil { + return o + } + + result := newObservations() + for k, obs := range o.data { + newKey := StatisticKey{ + Name: p + k.Name, + DataType: k.DataType, + Aggregation: k.Aggregation, + } + result.data[newKey] = obs + } + return result +} + +// normalizeKeys returns a new observations with dots replaced by underscores in stat names. +func (o *observations) normalizeKeys() *observations { + if o == nil { + return o + } + + result := newObservations() + for k, obs := range o.data { + newKey := StatisticKey{ + Name: strings.ReplaceAll(k.Name, ".", "_"), + DataType: k.DataType, + Aggregation: k.Aggregation, + } + result.data[newKey] = obs + } + return result +} + +// merge merges another observations into this one. +func (o *observations) merge(other *observations) { + if other == nil { + return + } + for k, obs := range other.data { + if existing, ok := o.data[k]; ok { + existing.Merge(obs) + } else { + o.data[k] = &AggregatedObservation{ + Statistic: obs.Statistic, + Value: obs.Value, + Count: obs.Count, + } + } + } +} + +// ToLogValues converts observations to a slice suitable for go-kit/log. +// Keys are sorted for deterministic output. +func (o *observations) toLogValues() []any { + if o == nil { + return nil + } + + // Collect key-value pairs for sorting by name. + type kv struct { + name string + value any + } + pairs := make([]kv, 0, len(o.data)) + for k, obs := range o.data { + pairs = append(pairs, kv{name: k.Name, value: obs.Value}) + } + sort.Slice(pairs, func(i, j int) bool { + return strings.Compare(pairs[i].name, pairs[j].name) < 0 + }) + + result := make([]any, 0, len(pairs)*2) + for _, p := range pairs { + value := p.value + + // Format bytes values (keys ending with "_bytes") + if strings.HasSuffix(p.name, "_bytes") { + switch val := value.(type) { + case uint64: + value = humanize.Bytes(val) + case int64: + value = humanize.Bytes(uint64(val)) + } + } + + // Format duration values (keys ending with "duration") + if strings.HasSuffix(p.name, "duration") { + switch val := value.(type) { + case float64: + value = time.Duration(val * 1000).String() + case int64: + value = time.Duration(val * 1000).String() + case uint64: + value = time.Duration(val * 1000).String() + } + } + + result = append(result, p.name, value) + } + return result +} + +// observationCollector provides methods to collect observations from a Capture. +type observationCollector struct { + capture *Capture + childrenMap map[identifier][]*Region + nameToRegions map[string][]*Region +} + +// newObservationCollector creates a new collector for gathering observations from the given capture. +func newObservationCollector(capture *Capture) *observationCollector { + if capture == nil { + return nil + } + + // Build + // - parent -> children + // - name -> matching regions + childrenMap := make(map[identifier][]*Region) + nameToRegions := make(map[string][]*Region) + for _, r := range capture.regions { + childrenMap[r.parentID] = append(childrenMap[r.parentID], r) + nameToRegions[r.name] = append(nameToRegions[r.name], r) + } + + return &observationCollector{ + capture: capture, + childrenMap: childrenMap, + nameToRegions: nameToRegions, + } +} + +// fromRegions collects observations from regions with the given name. +// If rollUp is true, each region's stats include all its descendant stats +// aggregated according to each stat's aggregation type. +func (c *observationCollector) fromRegions(name string, rollUp bool, excluded ...string) *observations { + result := newObservations() + + if c == nil { + return result + } + + regions := c.nameToRegions[name] + if len(regions) == 0 { + return result + } + + excludedSet := make(map[string]struct{}, len(excluded)) + for _, name := range excluded { + excludedSet[name] = struct{}{} + } + + for _, region := range regions { + var obs *observations + if rollUp { + obs = c.rollUpObservations(region, excludedSet) + } else { + obs = c.getRegionObservations(region) + } + + result.merge(obs) + } + + return result +} + +// getRegionObservations returns a copy of a region's observations. +func (c *observationCollector) getRegionObservations(region *Region) *observations { + result := newObservations() + for k, obs := range region.observations { + result.data[k] = &AggregatedObservation{ + Statistic: obs.Statistic, + Value: obs.Value, + Count: obs.Count, + } + } + return result +} + +// rollUpObservations computes observations for a region including all its descendants. +// Stats are aggregated according to their aggregation type. +func (c *observationCollector) rollUpObservations(region *Region, excludedSet map[string]struct{}) *observations { + result := c.getRegionObservations(region) + + // Recursively aggregate from children. + for _, child := range c.childrenMap[region.id] { + // Skip children with excluded names. + if _, excluded := excludedSet[child.name]; excluded { + continue + } + result.merge(c.rollUpObservations(child, excludedSet)) + } + + return result +} + +// Region name for data object scan operations. +const regionNameDataObjScan = "DataObjScan" + +// ToStatsSummary computes a stats.Result from observations in the capture. +func (c *Capture) ToStatsSummary(execTime, queueTime time.Duration, totalEntriesReturned int) stats.Result { + result := stats.Result{ + Summary: stats.Summary{ + ExecTime: execTime.Seconds(), + QueueTime: queueTime.Seconds(), + TotalEntriesReturned: int64(totalEntriesReturned), + }, + Querier: stats.Querier{ + Store: stats.Store{ + QueryUsedV2Engine: true, + }, + }, + } + + if c == nil { + return result + } + + // Collect observations from DataObjScan as the summary stats mainly relate to log lines. + // In practice, new engine would process more bytes while scanning metastore objects and stream sections. + collector := newObservationCollector(c) + observations := collector.fromRegions(regionNameDataObjScan, true).filter( + StatPipelineRowsOut.Key(), + StatDatasetPrimaryRowsRead.Key(), + StatDatasetPrimaryColumnUncompressedBytes.Key(), + StatDatasetSecondaryColumnUncompressedBytes.Key(), + ) + + // TotalBytesProcessed: sum of uncompressed bytes from primary and secondary columns + result.Summary.TotalBytesProcessed = readInt64(observations, StatDatasetPrimaryColumnUncompressedBytes.Key()) + + readInt64(observations, StatDatasetSecondaryColumnUncompressedBytes.Key()) + + // TotalLinesProcessed: primary rows read + result.Summary.TotalLinesProcessed = readInt64(observations, StatDatasetPrimaryRowsRead.Key()) + + // TotalPostFilterLines: rows output after filtering + // TODO: this will report the wrong value if the plan has a filter stage. + // pick the min of row_out from filter and scan nodes. + result.Summary.TotalPostFilterLines = readInt64(observations, StatPipelineRowsOut.Key()) + + // TODO: track and report TotalStructuredMetadataBytesProcessed + + if execTime > 0 { + execSeconds := execTime.Seconds() + result.Summary.BytesProcessedPerSecond = int64(float64(result.Summary.TotalBytesProcessed) / execSeconds) + result.Summary.LinesProcessedPerSecond = int64(float64(result.Summary.TotalLinesProcessed) / execSeconds) + } + + return result +} + +// readInt64 reads an int64 observation for the given stat key. +func readInt64(o *observations, key StatisticKey) int64 { + if o == nil { + return 0 + } + + if agg, ok := o.data[key]; ok { + if v, ok := agg.Int64(); ok { + return v + } + } + return 0 +} diff --git a/pkg/xcap/summary_test.go b/pkg/xcap/summary_test.go new file mode 100644 index 0000000000000..2f2ba4b807356 --- /dev/null +++ b/pkg/xcap/summary_test.go @@ -0,0 +1,232 @@ +package xcap + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestObservations(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + + statA := NewStatisticInt64("stat.one", AggregationTypeSum) + statB := NewStatisticInt64("stat.two", AggregationTypeSum) + statC := NewStatisticInt64("stat.three", AggregationTypeSum) + + _, region := StartRegion(ctx, "Test") + region.Record(statA.Observe(10)) + region.Record(statB.Observe(20)) + region.Record(statC.Observe(30)) + region.End() + + collector := newObservationCollector(capture) + obs := collector.fromRegions("Test", false) + + t.Run("filter", func(t *testing.T) { + filtered := obs.filter(statA.Key(), statB.Key()) + require.Len(t, filtered.data, 2) + require.Equal(t, int64(10), filtered.data[statA.Key()].Value) + require.Equal(t, int64(20), filtered.data[statB.Key()].Value) + }) + + t.Run("prefix", func(t *testing.T) { + prefixed := obs.filter(statA.Key()).prefix("metastore_") + expectedKey := StatisticKey{Name: "metastore_stat.one", DataType: DataTypeInt64, Aggregation: AggregationTypeSum} + require.Len(t, prefixed.data, 1) + require.Equal(t, int64(10), prefixed.data[expectedKey].Value) + }) + + t.Run("normalizeKeys", func(t *testing.T) { + normalized := obs.filter(statC.Key()).normalizeKeys() + expectedKey := StatisticKey{Name: "stat_three", DataType: DataTypeInt64, Aggregation: AggregationTypeSum} + require.Len(t, normalized.data, 1) + require.Equal(t, int64(30), normalized.data[expectedKey].Value) + }) + + t.Run("merge", func(t *testing.T) { + target := newObservations() + target.merge(obs.filter(statA.Key(), statB.Key())) + target.merge(obs.filter(statB.Key(), statC.Key())) + + require.Len(t, target.data, 3) + require.Equal(t, int64(10), target.data[statA.Key()].Value) + require.Equal(t, int64(40), target.data[statB.Key()].Value) + require.Equal(t, int64(30), target.data[statC.Key()].Value) + }) + + t.Run("toLogValues", func(t *testing.T) { + logValues := obs.toLogValues() + // Sorted: stat.one, stat.two, stat.three + require.Equal(t, []any{"stat.one", int64(10), "stat.three", int64(30), "stat.two", int64(20)}, logValues) + }) + + t.Run("chaining", func(t *testing.T) { + result := obs.filter(statC.Key()).prefix("logs_").normalizeKeys().toLogValues() + require.Equal(t, []any{"logs_stat_three", int64(30)}, result) + }) +} + +func TestRollups(t *testing.T) { + t.Run("includes child observations when rollup=true", func(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + stat := NewStatisticInt64("count", AggregationTypeSum) + + ctx, parent := StartRegion(ctx, "Parent") + parent.Record(stat.Observe(10)) + + ctx, child := StartRegion(ctx, "Child") + child.Record(stat.Observe(20)) + + _, grandchild := StartRegion(ctx, "Grandchild") + grandchild.Record(stat.Observe(30)) + + grandchild.End() + child.End() + parent.End() + + collector := newObservationCollector(capture) + + withRollup := collector.fromRegions("Parent", true) + require.Equal(t, int64(60), withRollup.data[stat.Key()].Value) // 10 + 20 + 30 + + withoutRollup := collector.fromRegions("Parent", false) + require.Equal(t, int64(10), withoutRollup.data[stat.Key()].Value) // parent only + }) + + t.Run("excludes matching descendants", func(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + stat := NewStatisticInt64("count", AggregationTypeSum) + + ctx, parent := StartRegion(ctx, "Parent") + parent.Record(stat.Observe(1)) + + _, included := StartRegion(ctx, "included") + included.Record(stat.Observe(10)) + included.End() + + ctx2, excluded := StartRegion(ctx, "excluded") + excluded.Record(stat.Observe(100)) + + _, excludedChild := StartRegion(ctx2, "excludedChild") + excludedChild.Record(stat.Observe(1000)) + excludedChild.End() + + excluded.End() + parent.End() + + collector := newObservationCollector(capture) + stats := collector.fromRegions("Parent", true, "excluded") + require.Equal(t, int64(11), stats.data[stat.Key()].Value) // 1 + 10, excludes 100 + 1000 + }) +} + +func TestToStatsSummary(t *testing.T) { + t.Run("nil capture returns empty summary with provided values", func(t *testing.T) { + execTime := 2 * time.Second + queueTime := 100 * time.Millisecond + entriesReturned := 42 + + var capture *Capture + result := capture.ToStatsSummary(execTime, queueTime, entriesReturned) + + require.Equal(t, execTime.Seconds(), result.Summary.ExecTime) + require.Equal(t, queueTime.Seconds(), result.Summary.QueueTime) + require.Equal(t, int64(entriesReturned), result.Summary.TotalEntriesReturned) + require.Equal(t, int64(0), result.Summary.TotalBytesProcessed) + require.Equal(t, int64(0), result.Summary.TotalLinesProcessed) + require.Equal(t, int64(0), result.Summary.TotalPostFilterLines) + }) + + t.Run("computes bytes and lines from DataObjScan regions", func(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + + // Create DataObjScan regions with observations using registry stats + _, region1 := StartRegion(ctx, "DataObjScan") + region1.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(1000)) + region1.Record(StatDatasetSecondaryColumnUncompressedBytes.Observe(500)) + region1.Record(StatDatasetPrimaryRowsRead.Observe(100)) + region1.Record(StatPipelineRowsOut.Observe(80)) + region1.End() + + _, region2 := StartRegion(ctx, "DataObjScan") + region2.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(2000)) + region2.Record(StatDatasetSecondaryColumnUncompressedBytes.Observe(1000)) + region2.Record(StatDatasetPrimaryRowsRead.Observe(200)) + region2.Record(StatPipelineRowsOut.Observe(150)) + region2.End() + + // Other region - should be ignored + _, otherRegion := StartRegion(ctx, "OtherRegion") + otherRegion.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(5000)) + otherRegion.End() + + capture.End() + + execTime := 2 * time.Second + queueTime := 100 * time.Millisecond + entriesReturned := 42 + + result := capture.ToStatsSummary(execTime, queueTime, entriesReturned) + + require.Equal(t, execTime.Seconds(), result.Summary.ExecTime) + require.Equal(t, queueTime.Seconds(), result.Summary.QueueTime) + require.Equal(t, int64(entriesReturned), result.Summary.TotalEntriesReturned) + + // TotalBytesProcessed = primary + secondary = (1000+2000) + (500+1000) = 4500 + require.Equal(t, int64(4500), result.Summary.TotalBytesProcessed) + + // TotalLinesProcessed = primary_rows_read = 100 + 200 = 300 + require.Equal(t, int64(300), result.Summary.TotalLinesProcessed) + + // TotalPostFilterLines = rows_out = 80 + 150 = 230 + require.Equal(t, int64(230), result.Summary.TotalPostFilterLines) + + // BytesProcessedPerSecond = 4500 / 2 = 2250 + require.Equal(t, int64(2250), result.Summary.BytesProcessedPerSecond) + + // LinesProcessedPerSecond = 300 / 2 = 150 + require.Equal(t, int64(150), result.Summary.LinesProcessedPerSecond) + }) + + t.Run("missing statistics result in zero values", func(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + + // Only record some statistics + _, region := StartRegion(ctx, "DataObjScan") + region.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(1000)) + region.End() + capture.End() + + execTime := 1 * time.Second + result := capture.ToStatsSummary(execTime, 0, 0) + + // Only primary bytes recorded + require.Equal(t, int64(1000), result.Summary.TotalBytesProcessed) + // No rows recorded + require.Equal(t, int64(0), result.Summary.TotalLinesProcessed) + require.Equal(t, int64(0), result.Summary.TotalPostFilterLines) + }) + + t.Run("rolls up child region observations into DataObjScan", func(t *testing.T) { + ctx, capture := NewCapture(context.Background(), nil) + + // Parent DataObjScan region + ctx, parent := StartRegion(ctx, "DataObjScan") + parent.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(500)) + + // Child region (should be rolled up into parent) + _, child := StartRegion(ctx, "child_operation") + child.Record(StatDatasetPrimaryColumnUncompressedBytes.Observe(300)) + child.End() + + parent.End() + capture.End() + + result := capture.ToStatsSummary(time.Second, 0, 0) + + // Child observations rolled up into DataObjScan: 500 + 300 = 800 + require.Equal(t, int64(800), result.Summary.TotalBytesProcessed) + }) +}