diff --git a/docs/querying/functions.md b/docs/querying/functions.md index c8fda2865..7a2ca66d2 100644 --- a/docs/querying/functions.md +++ b/docs/querying/functions.md @@ -421,6 +421,11 @@ by the number of seconds under the specified time range window, and should be used primarily for human readability. Use `rate` in recording rules so that increases are tracked consistently on a per-second basis. +## `info()` + +For each time series in `v`, `info(v instant-vector, [label-selector string])` finds all info metrics with corresponding +identifying labels, and adds the union of their data labels to the time series, that gets returned. + ## `irate()` `irate(v range-vector)` calculates the per-second instant rate of increase of diff --git a/prompb/types.proto b/prompb/types.proto index 08182d61a..888ed8a56 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -39,12 +39,12 @@ message MetricMetadata { } message Sample { - double value = 1; + double value = 1; // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. - int64 timestamp = 2; + int64 timestamp = 2; // The set of identifying labels for info metrics, as array indices. - repeated int32 identifyingLabels = 3; + repeated int32 identifyingLabels = 3; } message Exemplar { @@ -125,10 +125,10 @@ message BucketSpan { message TimeSeries { // For a timeseries to be valid, and for the samples and exemplars // to be ingested by the remote system properly, the labels field is required. - repeated Label labels = 1 [(gogoproto.nullable) = false]; - repeated Sample samples = 2 [(gogoproto.nullable) = false]; - repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; - repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; + repeated Label labels = 1 [(gogoproto.nullable) = false]; + repeated Sample samples = 2 [(gogoproto.nullable) = false]; + repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; + repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; } message Label { diff --git a/promql/engine.go b/promql/engine.go index 9c92df321..a45a2b221 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -228,10 +228,10 @@ func (q *query) Cancel() { // Close implements the Query interface. func (q *query) Close() { - // TODO: Handle info metric samples for _, s := range q.matrix { putFPointSlice(s.Floats) putHPointSlice(s.Histograms) + putInfoPointSlice(s.InfoSamples) } } @@ -714,6 +714,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + querier: querier, } query.sampleStats.InitStepTracking(start, start, 1) @@ -744,10 +745,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval for i, s := range mat { // Point might have a different timestamp, force it to the evaluation // timestamp as that is when we ran the evaluation. - // TODO: Handle info metric samples - if len(s.Histograms) > 0 { + switch { + case len(s.Histograms) > 0: vector[i] = Sample{Metric: s.Metric, H: s.Histograms[0].H, T: start} - } else { + case len(s.InfoSamples) > 0: + level.Debug(ng.logger).Log("msg", "adding info metric sample to vector", "index", i) + vector[i] = Sample{Metric: s.Metric, IdentifyingLabels: s.InfoSamples[0].IdentifyingLabels, T: start, F: 1} + default: vector[i] = Sample{Metric: s.Metric, F: s.Floats[0].F, T: start} } } @@ -773,6 +777,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, + querier: querier, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) @@ -988,6 +993,7 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations return nil, nil } series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) + fmt.Printf("checkAndExpandSeriesSet: found %d series\n", len(series)) if e.SkipHistogramBuckets { for i := range series { series[i] = newHistogramStatsSeries(series[i]) @@ -1035,6 +1041,8 @@ type evaluator struct { lookbackDelta time.Duration samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 + + querier storage.Querier } // errorf causes a panic with the input formatted into an error. @@ -1104,6 +1112,8 @@ type EvalNodeHelper struct { rightSigs map[string]Sample matchedSigs map[string]map[uint64]struct{} resultMetric map[string]labels.Labels + + Querier storage.Querier } func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { @@ -1153,7 +1163,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) biggestLen = len(matrixes[i]) } } - enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} + enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), Querier: ev.querier} type seriesAndTimestamp struct { Series ts int64 @@ -1182,6 +1192,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } } + // For each timestamp, iterate over the expressions, and for each series in each expression's matrix, + // append a corresponding sample to the expression's input vector. + // Then, call funcCall. for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) @@ -1197,7 +1210,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } for si, series := range matrixes[i] { - // TODO: Handle info metric samples switch { case len(series.Floats) > 0 && series.Floats[0].T == ts: vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) @@ -1207,6 +1219,12 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) case len(series.Histograms) > 0 && series.Histograms[0].T == ts: vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) matrixes[i][si].Histograms = series.Histograms[1:] + case len(series.InfoSamples) > 0 && series.InfoSamples[0].T == ts: + level.Debug(ev.logger).Log("msg", "appending info metric sample to vector/matrix", "i", i) + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, IdentifyingLabels: series.InfoSamples[0].IdentifyingLabels, T: ts, F: 1}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].InfoSamples = series.InfoSamples[1:] default: continue } @@ -1249,11 +1267,14 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } mat := make(Matrix, len(result)) for i, s := range result { - // TODO: Handle info metric samples - if s.H == nil { - mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}} - } else { + switch { + case s.H != nil: mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}} + case s.IdentifyingLabels != nil: + level.Debug(ev.logger).Log("msg", "appending info metric series to matrix", "i", i) + mat[i] = Series{Metric: s.Metric, InfoSamples: []InfoPoint{{T: ts, IdentifyingLabels: s.IdentifyingLabels}}} + default: + mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}} } } ev.currentSamples = originalNumSamples + mat.TotalSamples() @@ -1273,8 +1294,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } else { ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} } - // TODO: Handle info metric samples - addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps) + addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, sample.IdentifyingLabels, numSteps) seriess[h] = ss } } @@ -1282,9 +1302,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // Reuse the original point slices. for _, m := range origMatrixes { for _, s := range m { - // TODO: Handle info metric samples putFPointSlice(s.Floats) putHPointSlice(s.Histograms) + putInfoPointSlice(s.InfoSamples) } } // Assemble the output matrix. By the time we get here we know we don't have too many samples. @@ -1596,6 +1616,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Reuse objects across steps to save memory allocations. var floats []FPoint var histograms []HPoint + var infoSamples []InfoPoint var prevSS *Series inMatrix := make(Matrix, 1) inArgs[matrixArgIndex] = inMatrix @@ -1607,14 +1628,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } - ev.currentSamples -= len(floats) + totalHPointSize(histograms) - // TODO: Handle info metric samples + ev.currentSamples -= len(floats) + totalHPointSize(histograms) + len(infoSamples) if floats != nil { floats = floats[:0] } if histograms != nil { histograms = histograms[:0] } + if infoSamples != nil { + infoSamples = infoSamples[:0] + } chkIter = s.Iterator(chkIter) it.Reset(chkIter) metric := selVS.Series[i].Labels() @@ -1645,14 +1668,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio if ts == ev.startTimestamp || selVS.Timestamp == nil { maxt := ts - offset mint := maxt - selRange - floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms) + floats, histograms, infoSamples = ev.matrixIterSlice(it, mint, maxt, floats, histograms, infoSamples) } - if len(floats)+len(histograms) == 0 { + if len(floats)+len(histograms)+len(infoSamples) == 0 { continue } - // TODO: Handle info metric samples inMatrix[0].Floats = floats inMatrix[0].Histograms = histograms + inMatrix[0].InfoSamples = infoSamples enh.Ts = ts // Make the function call. outVec, annos := call(inArgs, e.Args, enh) @@ -1703,10 +1726,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio } ev.samplesStats.UpdatePeak(ev.currentSamples) - // TODO: Handle info metric samples - ev.currentSamples -= len(floats) + totalHPointSize(histograms) + ev.currentSamples -= len(floats) + totalHPointSize(histograms) + len(infoSamples) putFPointSlice(floats) putMatrixSelectorHPointSlice(histograms) + putInfoPointSlice(infoSamples) // The absent_over_time function returns 0 or 1 series. So far, the matrix // contains multiple series. The following code will create a new series @@ -1715,8 +1738,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio steps := int(1 + (ev.endTimestamp-ev.startTimestamp)/ev.interval) // Iterate once to look for a complete series. for _, s := range mat { - // TODO: Handle info metric samples - if len(s.Floats)+len(s.Histograms) == steps { + if len(s.Floats)+len(s.Histograms)+len(s.InfoSamples) == steps { return Matrix{}, warnings } } @@ -1724,13 +1746,15 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio found := map[int64]struct{}{} for i, s := range mat { - // TODO: Handle info metric samples for _, p := range s.Floats { found[p.T] = struct{}{} } for _, p := range s.Histograms { found[p.T] = struct{}{} } + for _, p := range s.InfoSamples { + found[p.T] = struct{}{} + } if i > 0 && len(found) == steps { return Matrix{}, warnings } @@ -1838,10 +1862,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for i, s := range e.Series { + fmt.Printf("eval: Processing series %s\n", s.Labels()) if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } chkIter = s.Iterator(chkIter) + fmt.Printf("Obtained sample iterator %T from %T\n", chkIter, s) it.Reset(chkIter) ss := Series{ Metric: e.Series[i].Labels(), @@ -1849,20 +1875,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ - _, f, h, ok := ev.vectorSelectorSingle(it, e, ts) + fmt.Printf("eval: ts: %d, step: %d\n", ts, step) + _, f, h, ils, ok := ev.vectorSelectorSingle(it, e, ts) if ok { - // TODO: Handle info metric samples - if h == nil { - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtStep(step, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - if ss.Floats == nil { - ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) - } - ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) - } else { + switch { + case h != nil: point := HPoint{H: h, T: ts} histSize := point.size() ev.currentSamples += histSize @@ -1874,12 +1891,32 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } ss.Histograms = append(ss.Histograms, point) + case ils != nil: + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtStep(step, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if ss.InfoSamples == nil { + ss.InfoSamples = reuseOrGetInfoPointSlices(prevSS, numSteps) + } + level.Debug(ev.logger).Log("msg", "eval: appending info metric point to ss.InfoSamples") + ss.InfoSamples = append(ss.InfoSamples, InfoPoint{T: ts, IdentifyingLabels: ils}) + default: + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtStep(step, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if ss.Floats == nil { + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) + } + ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) } } } - // TODO: Handle info metric samples - if len(ss.Floats)+len(ss.Histograms) > 0 { + if len(ss.Floats)+len(ss.Histograms)+len(ss.InfoSamples) > 0 { mat = append(mat, ss) prevSS = &mat[len(mat)-1] } @@ -1972,24 +2009,30 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio panic(fmt.Errorf("unexpected result in StepInvariantExpr evaluation: %T", expr)) } for i := range mat { - // TODO: Handle info metric samples - if len(mat[i].Floats)+len(mat[i].Histograms) != 1 { + if len(mat[i].Floats)+len(mat[i].Histograms)+len(mat[i].InfoSamples) != 1 { panic(fmt.Errorf("unexpected number of samples")) } for ts := ev.startTimestamp + ev.interval; ts <= ev.endTimestamp; ts += ev.interval { - if len(mat[i].Floats) > 0 { + switch { + case len(mat[i].Floats) > 0: mat[i].Floats = append(mat[i].Floats, FPoint{ T: ts, F: mat[i].Floats[0].F, }) ev.currentSamples++ - } else { + case len(mat[i].Histograms) > 0: point := HPoint{ T: ts, H: mat[i].Histograms[0].H, } mat[i].Histograms = append(mat[i].Histograms, point) ev.currentSamples += point.size() + case len(mat[i].InfoSamples) > 0: + mat[i].InfoSamples = append(mat[i].InfoSamples, InfoPoint{ + T: ts, + IdentifyingLabels: mat[i].InfoSamples[0].IdentifyingLabels, + }) + ev.currentSamples++ } if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) @@ -2027,6 +2070,18 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { return getFPointSlice(numSteps) } +// reuseOrGetInfoPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. +func reuseOrGetInfoPointSlices(prevSS *Series, numSteps int) (r []InfoPoint) { + if prevSS != nil && cap(prevSS.InfoSamples)-2*len(prevSS.InfoSamples) > 0 { + r = prevSS.InfoSamples[len(prevSS.InfoSamples):] + prevSS.InfoSamples = prevSS.InfoSamples[0:len(prevSS.InfoSamples):len(prevSS.InfoSamples)] + return + } + + return getInfoPointSlice(numSteps) +} + func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { @@ -2050,21 +2105,34 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec vec := make(Vector, 0, len(vs.Series)) for i, s := range vs.Series { it := seriesIterators[i] - t, _, _, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) - if !ok { - continue - } - - // Note that we ignore the sample values because call only cares about the timestamp. - vec = append(vec, Sample{ - Metric: s.Labels(), - T: t, - }) + t, f, h, ils, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + if ok { + if ils != nil { + f = 1 + } + vec = append(vec, Sample{ + Metric: s.Labels(), + T: t, + F: f, + H: h, + IdentifyingLabels: ils, + }) + histSize := 0 + if h != nil { + histSize = h.Size() / 16 // 16 bytes per sample. + } + numSamples := 1 + histSize + if ils != nil { + // Arve: Is this correct? + numSamples++ + } + ev.currentSamples += numSamples - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) + // Arve: is it correct to use numSamples here? + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(numSamples)) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } } } ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -2075,15 +2143,16 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) ( - int64, float64, *histogram.FloatHistogram, bool, + int64, float64, *histogram.FloatHistogram, []int, bool, ) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 var h *histogram.FloatHistogram + var ils []int valueType := it.Seek(refTime) - // TODO: Handle info metric samples + fmt.Printf("vectorSelectorSingle: processing sample at refTime %d, valueType: %s\n", refTime, valueType.String()) switch valueType { case chunkenc.ValNone: if it.Err() != nil { @@ -2093,25 +2162,33 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no t, v = it.At() case chunkenc.ValFloatHistogram: t, h = it.AtFloatHistogram() + case chunkenc.ValInfoSample: + t, ils = it.AtInfoSample() + fmt.Printf("vectorSelectorSingle: Info metric encoding, t: %d, ils: %#v\n", t, ils) default: panic(fmt.Errorf("unknown value type %v", valueType)) } if valueType == chunkenc.ValNone || t > refTime { var ok bool - t, v, h, _, ok = it.PeekPrev() + t, v, h, ils, ok = it.PeekPrev() if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { - return 0, 0, nil, false + fmt.Printf("vectorSelectorSingle: It is none\n") + return 0, 0, nil, nil, false } + + fmt.Printf("vectorSelectorSingle: It is none, with value %f and ils %#v\n", v, ils) } + // TODO: Handle info metric staleness if value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) { - return 0, 0, nil, false + return 0, 0, nil, nil, false } - return t, v, h, true + return t, v, h, ils, true } var ( - fPointPool zeropool.Pool[[]FPoint] - hPointPool zeropool.Pool[[]HPoint] + fPointPool zeropool.Pool[[]FPoint] + hPointPool zeropool.Pool[[]HPoint] + infoPointPool zeropool.Pool[[]InfoPoint] // matrixSelectorHPool holds reusable histogram slices used by the matrix // selector. The key difference between this pool and the hPointPool is that @@ -2161,6 +2238,26 @@ func putHPointSlice(p []HPoint) { } } +// getInfoPointSlice will return an InfoPoint slice of size max(maxPointsSliceSize, sz). +// This function is called with an estimated size which often can be over-estimated. +func getInfoPointSlice(sz int) []InfoPoint { + if p := infoPointPool.Get(); p != nil { + return p + } + + if sz > maxPointsSliceSize { + sz = maxPointsSliceSize + } + + return make([]InfoPoint, 0, sz) +} + +func putInfoPointSlice(p []InfoPoint) { + if p != nil { + infoPointPool.Put(p[:0]) + } +} + func getMatrixSelectorHPoints() []HPoint { if p := matrixSelectorHPool.Get(); p != nil { return p @@ -2204,9 +2301,8 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota Metric: series[i].Labels(), } - // TODO: Handle info metric samples - ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil) - totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms)) + ss.Floats, ss.Histograms, ss.InfoSamples = ev.matrixIterSlice(it, mint, maxt, nil, nil, nil) + totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms)) + int64(len(ss.InfoSamples)) ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize) if totalSize > 0 { @@ -2214,6 +2310,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota } else { putFPointSlice(ss.Floats) putHPointSlice(ss.Histograms) + putInfoPointSlice(ss.InfoSamples) } } return matrix, ws @@ -2229,11 +2326,10 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota // are populated from the iterator. func (ev *evaluator) matrixIterSlice( it *storage.BufferedSeriesIterator, mint, maxt int64, - floats []FPoint, histograms []HPoint, -) ([]FPoint, []HPoint) { - mintFloats, mintHistograms := mint, mint + floats []FPoint, histograms []HPoint, infoSamples []InfoPoint, +) ([]FPoint, []HPoint, []InfoPoint) { + mintFloats, mintHistograms, mintInfoSamples := mint, mint, mint - // TODO: Handle info metric samples // First floats... if len(floats) > 0 && floats[len(floats)-1].T >= mint { // There is an overlap between previous and current ranges, retain common @@ -2283,6 +2379,28 @@ func (ev *evaluator) matrixIterSlice( } } + // ...then the same for info metric samples. TODO: Use generics? + if len(infoSamples) > 0 && infoSamples[len(infoSamples)-1].T >= mint { + // There is an overlap between previous and current ranges, retain common + // points. In most such cases: + // (a) the overlap is significantly larger than the eval step; and/or + // (b) the number of samples is relatively small. + // so a linear search will be as fast as a binary search. + var drop int + for drop = 0; infoSamples[drop].T < mint; drop++ { + } + ev.currentSamples -= drop + copy(infoSamples, infoSamples[drop:]) + infoSamples = infoSamples[:len(infoSamples)-drop] + // Only append points with timestamps after the last timestamp we have. + mintInfoSamples = infoSamples[len(infoSamples)-1].T + 1 + } else { + ev.currentSamples -= len(infoSamples) + if infoSamples != nil { + infoSamples = infoSamples[:0] + } + } + soughtValueType := it.Seek(maxt) if soughtValueType == chunkenc.ValNone { if it.Err() != nil { @@ -2293,7 +2411,6 @@ func (ev *evaluator) matrixIterSlice( buf := it.Buffer() loop: for { - // TODO: Handle info metric samples switch buf.Next() { case chunkenc.ValNone: break loop @@ -2336,6 +2453,21 @@ loop: } floats = append(floats, FPoint{T: t, F: f}) } + case chunkenc.ValInfoSample: + t, ils := buf.AtInfoSample() + // TODO: Handle staleness? + // Timestamps in the buffer are guaranteed to be smaller than maxt. + if t >= mintInfoSamples { + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if infoSamples == nil { + infoSamples = getInfoPointSlice(16) + } + level.Debug(ev.logger).Log("msg", "matrixIterSlice: appending info metric point") + infoSamples = append(infoSamples, InfoPoint{T: t, IdentifyingLabels: ils}) + } } } // The sought sample might also be in the range. @@ -2362,7 +2494,6 @@ loop: if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } - case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { @@ -2375,9 +2506,23 @@ loop: } floats = append(floats, FPoint{T: t, F: f}) } + case chunkenc.ValInfoSample: + t, ils := it.AtInfoSample() + // TODO: Check for staleness? + if t == maxt { + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if infoSamples == nil { + infoSamples = getInfoPointSlice(16) + } + level.Debug(ev.logger).Log("msg", "matrixIterSlice: appending info metric point") + infoSamples = append(infoSamples, InfoPoint{T: t, IdentifyingLabels: ils}) + } } ev.samplesStats.UpdatePeak(ev.currentSamples) - return floats, histograms + return floats, histograms, infoSamples } func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { @@ -2525,12 +2670,13 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching * // Account for potentially swapped sidedness. fl, fr := ls.F, rs.F hl, hr := ls.H, rs.H + ilsL, ilsR := ls.IdentifyingLabels, rs.IdentifyingLabels if matching.Card == parser.CardOneToMany { fl, fr = fr, fl hl, hr = hr, hl + ilsL, ilsR = ilsR, ilsL } - // TODO: Handle info metric samples - floatValue, histogramValue, keep := vectorElemBinop(op, fl, fr, hl, hr) + floatValue, histogramValue, ils, keep := vectorElemBinop(op, fl, fr, hl, hr, ilsL, ilsR) switch { case returnBool: if keep { @@ -2566,11 +2712,14 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching * insertedSigs[insertSig] = struct{}{} } - // TODO: Handle info metric samples + if ils != nil { + floatValue = 1 + } enh.Out = append(enh.Out, Sample{ - Metric: metric, - F: floatValue, - H: histogramValue, + Metric: metric, + F: floatValue, + H: histogramValue, + IdentifyingLabels: ils, }) } return enh.Out @@ -2648,7 +2797,7 @@ func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scala lh, rh = rh, lh } // TODO: Handle info metric samples - float, histogram, keep := vectorElemBinop(op, lf, rf, lh, rh) + float, histogram, _, keep := vectorElemBinop(op, lf, rf, lh, rh, nil, nil) // Catch cases where the scalar is the LHS in a scalar-vector comparison operation. // We want to always keep the vector element value as the output value, even if it's on the RHS. if op.IsComparisonOperator() && swap { @@ -2709,50 +2858,50 @@ func scalarBinop(op parser.ItemType, lhs, rhs float64) float64 { } // vectorElemBinop evaluates a binary operation between two Vector elements. -// TODO: Handle info metric samples. -func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool) { +func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram, ilsL, ilsR []int) (float64, *histogram.FloatHistogram, []int, bool) { + // TODO: Implement for ilsL/ilsR switch op { case parser.ADD: if hlhs != nil && hrhs != nil { - return 0, hlhs.Copy().Add(hrhs).Compact(0), true + return 0, hlhs.Copy().Add(hrhs).Compact(0), nil, true } - return lhs + rhs, nil, true + return lhs + rhs, nil, nil, true case parser.SUB: if hlhs != nil && hrhs != nil { - return 0, hlhs.Copy().Sub(hrhs).Compact(0), true + return 0, hlhs.Copy().Sub(hrhs).Compact(0), nil, true } - return lhs - rhs, nil, true + return lhs - rhs, nil, nil, true case parser.MUL: if hlhs != nil && hrhs == nil { - return 0, hlhs.Copy().Mul(rhs), true + return 0, hlhs.Copy().Mul(rhs), nil, true } if hlhs == nil && hrhs != nil { - return 0, hrhs.Copy().Mul(lhs), true + return 0, hrhs.Copy().Mul(lhs), nil, true } - return lhs * rhs, nil, true + return lhs * rhs, nil, nil, true case parser.DIV: if hlhs != nil && hrhs == nil { - return 0, hlhs.Copy().Div(rhs), true + return 0, hlhs.Copy().Div(rhs), nil, true } - return lhs / rhs, nil, true + return lhs / rhs, nil, nil, true case parser.POW: - return math.Pow(lhs, rhs), nil, true + return math.Pow(lhs, rhs), nil, nil, true case parser.MOD: - return math.Mod(lhs, rhs), nil, true + return math.Mod(lhs, rhs), nil, nil, true case parser.EQLC: - return lhs, nil, lhs == rhs + return lhs, nil, nil, lhs == rhs case parser.NEQ: - return lhs, nil, lhs != rhs + return lhs, nil, nil, lhs != rhs case parser.GTR: - return lhs, nil, lhs > rhs + return lhs, nil, nil, lhs > rhs case parser.LSS: - return lhs, nil, lhs < rhs + return lhs, nil, nil, lhs < rhs case parser.GTE: - return lhs, nil, lhs >= rhs + return lhs, nil, nil, lhs >= rhs case parser.LTE: - return lhs, nil, lhs <= rhs + return lhs, nil, nil, lhs <= rhs case parser.ATAN2: - return math.Atan2(lhs, rhs), nil, true + return math.Atan2(lhs, rhs), nil, nil, true } panic(fmt.Errorf("operator %q not allowed for operations between Vectors", op)) } @@ -2946,7 +3095,8 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } ss := &outputMatrix[ri] - addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps) + // TODO: Handle info metrics + addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, nil, numSteps) } return annos @@ -3033,7 +3183,8 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma if !ok { ss = Series{Metric: lbls} } - addToSeries(&ss, enh.Ts, f, nil, numSteps) + // TODO: Handle info metrics? + addToSeries(&ss, enh.Ts, f, nil, nil, numSteps) seriess[hash] = ss } } @@ -3109,18 +3260,24 @@ func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping [] return enh.Out, nil } -func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) { - if h == nil { +func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, identifyingLabels []int, numSteps int) { + switch { + case h != nil: + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) + case len(identifyingLabels) > 0: + if ss.InfoSamples == nil { + ss.InfoSamples = getInfoPointSlice(numSteps) + } + ss.InfoSamples = append(ss.InfoSamples, InfoPoint{T: ts, IdentifyingLabels: identifyingLabels}) + default: if ss.Floats == nil { ss.Floats = getFPointSlice(numSteps) } ss.Floats = append(ss.Floats, FPoint{T: ts, F: f}) - return - } - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) } func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) { diff --git a/promql/engine_test.go b/promql/engine_test.go index 708666731..56da64770 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -235,6 +235,10 @@ func (q *errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*lab return errSeriesSet{err: q.err} } +func (q *errQuerier) InfoMetricDataLabels(context.Context, labels.Labels, int64, ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return nil, nil, q.err +} + func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/promql/functions.go b/promql/functions.go index bd30b9e00..a8cce8575 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,6 +14,7 @@ package promql import ( + "context" "fmt" "math" "slices" @@ -1492,6 +1493,58 @@ func funcYear(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) }), nil } +// === info(vector parser.ValueTypeVector, [ls label-selector]) (Vector, Annotations) === +func funcInfo(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + vector := vals[0].(Vector) + var dataLabelMatchers []*labels.Matcher + if len(args) > 1 { + // TODO: Introduce a dedicated LabelSelector type + labelSelector := args[1].(*parser.VectorSelector) + dataLabelMatchers = labelSelector.LabelMatchers + } + + if len(vector) == 0 { + return enh.Out, nil + } + + ts := vector[0].T + + for _, s := range vector { + lb := labels.NewBuilder(s.Metric) + + // Find info metrics for which all of their identifying labels are contained among the sample's labels. + // Pick the union of the data labels belonging to the various info metrics. + dataLabels, annots, err := enh.Querier.InfoMetricDataLabels(context.TODO(), s.Metric, ts, dataLabelMatchers...) + if err != nil { + annots.Add(fmt.Errorf("querying for info metric data labels: %w", err)) + return nil, annots + } + + fmt.Printf("Found %d data labels\n", dataLabels.Len()) + + dataLabels.Range(func(l labels.Label) { + fmt.Printf("Data label %s = %s\n", l.Name, l.Value) + if lb.Get(l.Name) == "" { + lb.Set(l.Name, l.Value) + } + }) + + // If info metric data label matchers are specified, we should only include series where + // info metric data labels are found + if len(dataLabelMatchers) > 0 && dataLabels.IsEmpty() { + continue + } + + enh.Out = append(enh.Out, Sample{ + Metric: lb.Labels(), + F: s.F, + H: s.H, + }) + } + + return enh.Out, nil +} + // FunctionCalls is a list of all functions supported by PromQL, including their types. var FunctionCalls = map[string]FunctionCall{ "abs": funcAbs, @@ -1532,6 +1585,7 @@ var FunctionCalls = map[string]FunctionCall{ "hour": funcHour, "idelta": funcIdelta, "increase": funcIncrease, + "info": funcInfo, "irate": funcIrate, "label_replace": funcLabelReplace, "label_join": funcLabelJoin, diff --git a/promql/parser/functions.go b/promql/parser/functions.go index 99b41321f..434d3cdc1 100644 --- a/promql/parser/functions.go +++ b/promql/parser/functions.go @@ -223,6 +223,13 @@ var Functions = map[string]*Function{ ArgTypes: []ValueType{ValueTypeMatrix}, ReturnType: ValueTypeVector, }, + "info": { + Name: "info", + ArgTypes: []ValueType{ValueTypeVector, ValueTypeVector}, + ReturnType: ValueTypeVector, + Experimental: true, + Variadic: 1, + }, "irate": { Name: "irate", ArgTypes: []ValueType{ValueTypeMatrix}, diff --git a/promql/value.go b/promql/value.go index 83d2bd4a4..c1df4a556 100644 --- a/promql/value.go +++ b/promql/value.go @@ -203,7 +203,7 @@ func (p InfoPoint) String() string { } b.WriteString(strconv.Itoa(il)) } - return fmt.Sprintf("%s @[%v]", b.String(), p.T) + return fmt.Sprintf("%s @[%d]", b.String(), p.T) } // MarshalJSON implements json.Marshaler. @@ -237,7 +237,6 @@ func (s Sample) String() string { p := FPoint{T: s.T, F: s.F} str = p.String() } - return fmt.Sprintf("%s => %s", s.Metric, str) } @@ -592,7 +591,7 @@ func (ssi *storageSeriesIterator) Next() chunkenc.ValueType { ssi.currILs = p.IdentifyingLabels ssi.currF = 0 ssi.currH = nil - return chunkenc.ValFloat + return chunkenc.ValInfoSample default: panic("storageSeriesIterater.Next failed to pick value type") } diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 913e2fe24..7f89ae39d 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -236,6 +236,10 @@ func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels return storage.ErrSeriesSet(errSelect) } +func (errQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return nil, nil, errors.New("info metric data labels error") +} + func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, errors.New("label values error") } diff --git a/storage/interface.go b/storage/interface.go index 656904179..846085c3b 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -130,6 +130,10 @@ func (q *MockQuerier) LabelNames(context.Context, ...*labels.Matcher) ([]string, return nil, nil, nil } +func (q *MockQuerier) InfoMetricDataLabels(context.Context, labels.Labels, int64, ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return labels.Labels{}, nil, nil +} + func (q *MockQuerier) Close() error { return nil } @@ -168,6 +172,12 @@ type LabelQuerier interface { // to label names of metrics matching the matchers. LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) + // InfoMetricDataLabels queries for the data labels of info metrics which identifying labels are contained in lbls, + // at time t. + // If matchers are specified, these will filter chosen info metrics based on their data labels, and also constrain + // which data labels are returned. + InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) + // Close releases the resources of the Querier. Close() error } diff --git a/storage/memoized_iterator.go b/storage/memoized_iterator.go index e5fd28343..d42333357 100644 --- a/storage/memoized_iterator.go +++ b/storage/memoized_iterator.go @@ -14,6 +14,7 @@ package storage import ( + "fmt" "math" "github.com/prometheus/prometheus/model/histogram" @@ -40,6 +41,7 @@ type MemoizedSeriesIterator struct { // NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator. func NewMemoizedEmptyIterator(delta int64) *MemoizedSeriesIterator { + fmt.Printf("NewMemoizedEmptyIterator: Creating nopIterator\n") return NewMemoizedIterator(chunkenc.NewNopIterator(), delta) } @@ -82,7 +84,6 @@ func (b *MemoizedSeriesIterator) Seek(t int64) chunkenc.ValueType { b.prevTime = math.MinInt64 b.valueType = b.it.Seek(t0) - // TODO: Handle info metric samples switch b.valueType { case chunkenc.ValNone: return chunkenc.ValNone diff --git a/storage/merge.go b/storage/merge.go index 320550eb5..70b03322d 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -149,6 +149,32 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints }} } +func (q *mergeGenericQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + seen := make(map[string]struct{}) + var annots annotations.Annotations + var lb labels.ScratchBuilder + for _, querier := range q.queriers { + lbls, anns, err := querier.InfoMetricDataLabels(ctx, lbls, t, matchers...) + if len(anns) > 0 { + annots.Merge(anns) + } + if err != nil { + return labels.Labels{}, annots, fmt.Errorf("InfoMetricDataLabels() from merge generic querier: %w", err) + } + lbls.Range(func(l labels.Label) { + if _, ok := seen[l.Name]; ok { + return + } + + seen[l.Name] = struct{}{} + lb.Add(l.Name, l.Value) + }) + } + + lb.Sort() + return lb.Labels(), annots, nil +} + type labelGenericQueriers []genericQuerier func (l labelGenericQueriers) Len() int { return len(l) } diff --git a/storage/merge_test.go b/storage/merge_test.go index e6273e923..909072e23 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1376,6 +1376,10 @@ func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err} } +func (m *mockGenericQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + panic("not implemented") +} + func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { m.mtx.Lock() m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ diff --git a/storage/noop.go b/storage/noop.go index be5741ddd..d9d612d15 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -31,6 +31,10 @@ func (noopQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matche return NoopSeriesSet() } +func (noopQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return labels.Labels{}, nil, nil +} + func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } @@ -54,6 +58,10 @@ func (noopChunkQuerier) Select(context.Context, bool, *SelectHints, ...*labels.M return NoopChunkedSeriesSet() } +func (noopChunkQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return labels.Labels{}, nil, nil +} + func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index c4132aef8..bad964650 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -522,6 +522,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta if settings.DisableTargetInfo || timestamp == 0 { return } + fmt.Printf("Adding resource target info\n") attributes := resource.Attributes() identifyingAttrs := []string{ @@ -538,6 +539,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta } if nonIdentifyingAttrsCount == 0 { // If we only have job + instance, then target_info isn't useful, so don't add it. + fmt.Printf("Only have job + instance\n") return } @@ -545,18 +547,21 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta if len(settings.Namespace) > 0 { name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, model.MetricNameLabel, name) + + labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) // Ensure consistent label ordering sort.Sort(ByLabelName(labels)) identifyingLabels := make([]int32, 0, 2) for i, l := range labels { - if l.Name == model.InstanceLabel || l.Name == model.JobLabel { + if l.Name == model.JobLabel || l.Name == model.InstanceLabel { + fmt.Printf("Have identifying label %s=%s\n", l.Name, l.Value) identifyingLabels = append(identifyingLabels, int32(i)) } } - if len(identifyingLabels) != 2 { - // target_info has to be identified by the job/instance tuple, one of them isn't enough on its own. - identifyingLabels = nil + if len(identifyingLabels) == 0 { + // We need at least one identifying label to generate target_info. + fmt.Printf("Got no identifying labels\n") + return } sample := &prompb.Sample{ Value: float64(1), diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 65dac99c5..b3e817735 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -177,5 +177,21 @@ func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Lab ts, _ := c.getOrCreateTimeSeries(lbls) ts.Samples = append(ts.Samples, *sample) + + if len(sample.IdentifyingLabels) > 0 { + ils := make([]prompb.Label, 0, len(sample.IdentifyingLabels)) + for _, ix := range sample.IdentifyingLabels { + ils = append(ils, ts.Labels[ix]) + } + var name string + for _, l := range ts.Labels { + if l.Name == "__name__" { + name = l.Value + break + } + } + fmt.Printf("Adding sample for OTel info metric %s, identifying labels: %#v\n", name, ils) + } + return ts } diff --git a/storage/remote/read.go b/storage/remote/read.go index 723030091..0f22b5da8 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -172,6 +172,12 @@ func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.Se return newSeriesSetFilter(FromQueryResult(sortSeries, res), added) } +// InfoMetricDataLabels implements storage.LabelQuerier and is a noop. +func (q *querier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + // TODO: Implement. + return labels.Labels{}, nil, errors.New("not implemented") +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 09b2f3731..8f8df9221 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -18,6 +18,8 @@ import ( "errors" "fmt" "net/http" + "strconv" + "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -129,9 +131,15 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err } else { // This is an info metric sample ils := make([]int, 0, len(s.IdentifyingLabels)) - for _, idx := range s.IdentifyingLabels { + var ilb strings.Builder + for i, idx := range s.IdentifyingLabels { ils = append(ils, int(idx)) + if i > 0 { + ilb.WriteRune(',') + } + ilb.WriteString(strconv.Itoa(int(idx))) } + level.Debug(h.logger).Log("msg", "appending info metric sample", "identifying_labels", ilb.String()) ref, err = app.AppendInfoSample(ref, labels, s.Timestamp, ils) } if err != nil { diff --git a/storage/series.go b/storage/series.go index a3638394c..9566ac12e 100644 --- a/storage/series.go +++ b/storage/series.go @@ -57,8 +57,8 @@ func NewListSeries(lset labels.Labels, s []chunks.Sample) *SeriesEntry { } } -// NewListChunkSeriesFromSamples returns a chunk series entry that allows to iterate over provided samples. -// NOTE: It uses an inefficient chunks encoding implementation, not caring about chunk size. +// NewListChunkSeriesFromSamples returns chunk series entry that allows to iterate over provided samples. +// NOTE: It uses inefficient chunks encoding implementation, not caring about chunk size. // Use only for testing. func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]chunks.Sample) *ChunkSeriesEntry { chksFromSamples := make([]chunks.Meta, 0, len(samples)) diff --git a/tsdb/block.go b/tsdb/block.go index 20d66763c..a3063b9a2 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -38,6 +38,7 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" ) // IndexWriter serializes the index for a block of series data. @@ -121,6 +122,12 @@ type IndexReader interface { // The names returned are sorted. LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error) + // InfoMetricDataLabels queries for the data labels of info metrics which identifying labels are contained in lbls, + // at time t. + // If matchers are specified, these will filter chosen info metrics based on their data labels, and also constrain + // which data labels are returned. + InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) + // Close releases the underlying resources of the reader. Close() error } @@ -538,6 +545,10 @@ func (r blockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma return labelNamesWithMatchers(ctx, r.ir, matchers...) } +func (r blockIndexReader) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return r.ir.InfoMetricDataLabels(ctx, lbls, t, matchers...) +} + func (r blockIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { p, err := r.ir.Postings(ctx, name, values...) if err != nil { diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 232ec2b91..a2b89e92c 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -42,7 +42,7 @@ type BlockWriter struct { // ErrNoSeriesAppended is returned if the series count is zero while flushing blocks. var ErrNoSeriesAppended = errors.New("no series appended, aborting") -// NewBlockWriter creates a new block writer. +// NewBlockWriter create a new block writer. // // The returned writer accumulates all the series in the Head block until `Flush` is called. // diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 606e229b6..8cc59f3ea 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -69,12 +69,6 @@ const ( one bit = true ) -// Reset resets b around stream. -func (b *bstream) Reset(stream []byte) { - b.stream = stream - b.count = 0 -} - func (b *bstream) writeBit(bit bit) { if b.count == 0 { b.stream = append(b.stream, 0) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index f065b3cb1..4d931dfd8 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -353,7 +353,6 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { func (p *pool) Put(c Chunk) error { var sp *sync.Pool - var ok bool switch c.Encoding() { case EncXOR: _, ok := c.(*XORChunk) diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index b72492a08..b83d99238 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -68,6 +68,7 @@ func testChunk(t *testing.T, c Chunk) { app.Append(ts, v) exp = append(exp, pair{t: ts, v: v}) } + require.Equal(t, 300, c.NumSamples()) // 1. Expand iterator in simple case. it1 := c.Iterator(nil) diff --git a/tsdb/chunkenc/infometric.go b/tsdb/chunkenc/infometric.go index 6c96682e5..62965300d 100644 --- a/tsdb/chunkenc/infometric.go +++ b/tsdb/chunkenc/infometric.go @@ -26,10 +26,9 @@ type InfoSampleChunk struct { b bstream } -// NewInfoSampleChunk returns a new chunk with info metric encoding of the given -// size. +// NewInfoSampleChunk returns a new chunk with info metric encoding. func NewInfoSampleChunk() *InfoSampleChunk { - b := make([]byte, 3, 128) + b := make([]byte, 2, 128) return &InfoSampleChunk{b: bstream{stream: b, count: 0}} } @@ -85,28 +84,24 @@ func (c *InfoSampleChunk) Appender() (Appender, error) { }, nil } -// Iterator implements the Chunk interface. -func (c *InfoSampleChunk) Iterator(it Iterator) Iterator { - return c.iterator(it) -} - func (c *InfoSampleChunk) iterator(it Iterator) *infoSampleIterator { if iter, ok := it.(*infoSampleIterator); ok { iter.Reset(c.b.bytes()) return iter } - iter := &infoSampleIterator{ + return &infoSampleIterator{ // The first 2 bytes contain chunk headers. // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), t: math.MinInt64, } - // The first 3 bytes contain chunk headers. - // We skip that for actual samples. - _, _ = iter.br.readBits(24) - return iter +} + +// Iterator implements the Chunk interface. +func (c *InfoSampleChunk) Iterator(it Iterator) Iterator { + return c.iterator(it) } type infoSampleAppender struct { @@ -151,10 +146,8 @@ func (a *infoSampleAppender) AppendInfoSample(t int64, identifyingLabels []int) } else { tDelta := t - a.t tDod := tDelta - a.tDelta - - putVarbitInt(a.b, tDod) - a.tDelta = tDelta + putVarbitInt(a.b, tDod) if slices.Equal(a.ils, identifyingLabels) { // The labels do not change @@ -165,7 +158,6 @@ func (a *infoSampleAppender) AppendInfoSample(t int64, identifyingLabels []int) lDelta := len(identifyingLabels) - len(a.ils) lDod := int64(lDelta - a.lDelta) a.lDelta = lDelta - putVarbitInt(a.b, lDod) for _, ix := range identifyingLabels { putVarbitInt(a.b, int64(ix)) @@ -175,6 +167,7 @@ func (a *infoSampleAppender) AppendInfoSample(t int64, identifyingLabels []int) a.t = t a.ils = identifyingLabels + binary.BigEndian.PutUint16(a.b.bytes(), num+1) } type infoSampleIterator struct { diff --git a/tsdb/chunkenc/infometric_test.go b/tsdb/chunkenc/infometric_test.go new file mode 100644 index 000000000..3b24011d2 --- /dev/null +++ b/tsdb/chunkenc/infometric_test.go @@ -0,0 +1,88 @@ +package chunkenc + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +type infoPair struct { + t int64 + ils []int +} + +func TestInfoSampleChunk(t *testing.T) { + c := NewInfoSampleChunk() + app, err := c.Appender() + require.NoError(t, err) + + var exp []infoPair + var ( + ts = int64(1234123324) + ) + for i := 0; i < 300; i++ { + ts += int64(rand.Intn(10000) + 1) + var ils []int + if i%2 == 0 { + ils = []int{0, 1} + } else { + ils = []int{2, 3} + } + + // Start with a new appender every 10th sample. This emulates starting + // appending to a partially filled chunk. + if i%10 == 0 { + app, err = c.Appender() + require.NoError(t, err) + } + + app.AppendInfoSample(ts, ils) + exp = append(exp, infoPair{t: ts, ils: ils}) + require.Equal(t, i+1, c.NumSamples()) + } + require.Equal(t, 300, c.NumSamples()) + require.Len(t, exp, 300) + + // 1. Expand iterator in simple case. + it1 := c.Iterator(nil) + var res1 []infoPair + for it1.Next() == ValInfoSample { + ts, ils := it1.AtInfoSample() + res1 = append(res1, infoPair{t: ts, ils: ils}) + } + require.NoError(t, it1.Err()) + require.Len(t, res1, len(exp)) + require.Equal(t, exp[1], res1[1]) + require.Equal(t, exp, res1) + + // 2. Expand second iterator while reusing first one. + it2 := c.Iterator(it1) + var res2 []infoPair + for it2.Next() == ValInfoSample { + ts, ils := it2.AtInfoSample() + res2 = append(res2, infoPair{t: ts, ils: ils}) + } + require.NoError(t, it2.Err()) + require.Equal(t, exp, res2) + + // 3. Test iterator Seek. + mid := len(exp) / 2 + + it3 := c.Iterator(nil) + var res3 []infoPair + require.Equal(t, ValInfoSample, it3.Seek(exp[mid].t)) + // Below ones should not matter. + require.Equal(t, ValInfoSample, it3.Seek(exp[mid].t)) + require.Equal(t, ValInfoSample, it3.Seek(exp[mid].t)) + ts, ils := it3.AtInfoSample() + res3 = append(res3, infoPair{t: ts, ils: ils}) + + for it3.Next() == ValInfoSample { + ts, ils := it3.AtInfoSample() + res3 = append(res3, infoPair{t: ts, ils: ils}) + } + require.NoError(t, it3.Err()) + require.Equal(t, exp[mid:], res3) + require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1)) +} diff --git a/tsdb/chunkenc/varbit.go b/tsdb/chunkenc/varbit.go index 574edec48..b43574dcb 100644 --- a/tsdb/chunkenc/varbit.go +++ b/tsdb/chunkenc/varbit.go @@ -61,7 +61,7 @@ func putVarbitInt(b *bstream, val int64) { } } -// readVarbitInt reads an int64 encoded with putVarbitInt. +// readVarbitInt reads an int64 encoced with putVarbitInt. func readVarbitInt(b *bstreamReader) (int64, error) { var d byte for i := 0; i < 8; i++ { @@ -166,7 +166,7 @@ func putVarbitUint(b *bstream, val uint64) { } } -// readVarbitUint reads a uint64 encoded with putVarbitUint. +// readVarbitUint reads a uint64 encoced with putVarbitUint. func readVarbitUint(b *bstreamReader) (uint64, error) { var d byte for i := 0; i < 8; i++ { diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 0bc89099b..1dce507ec 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -60,7 +60,7 @@ type XORChunk struct { b bstream } -// NewXORChunk returns a new chunk with XOR encoding of the given size. +// NewXORChunk returns a new chunk with XOR encoding. func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) return &XORChunk{b: bstream{stream: b, count: 0}} diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index f4df1d3e4..0b7b664f6 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -722,7 +722,7 @@ func nextSequenceFile(dir string) (string, int, error) { } // It is not necessary that we find the files in number order, // for example with '1000000' and '200000', '1000000' would come first. - // Though this is a very very rare case, we check anyway for the max id. + // Though this is a very very race case, we check anyway for the max id. if j > i { i = j } diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index e46df3f89..9a72a708e 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -188,8 +188,8 @@ func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 { return bytes } -// ChunkDiskMapper is for writing the Head block chunks to disk -// and access chunks via mmapped files. +// ChunkDiskMapper is for writing the Head block chunks to the disk +// and access chunks via mmapped file. type ChunkDiskMapper struct { /// Writer. dir *os.File @@ -231,7 +231,7 @@ type ChunkDiskMapper struct { closed bool } -// mmappedChunkFile provides mmap access to an entire head chunks file that holds many chunks. +// mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks. type mmappedChunkFile struct { byteSlice ByteSlice maxt int64 // Max timestamp among all of this file's chunks. @@ -240,7 +240,7 @@ type mmappedChunkFile struct { // NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory // using the default head chunk file duration. // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all files. +// to set the maxt of all the file. func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) { // Validate write buffer size. if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { @@ -452,8 +452,8 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro return files, nil } -// WriteChunk writes the chunk to disk. -// The returned chunk ref is the reference for where the chunk encoding starts for the chunk. +// WriteChunk writes the chunk to the disk. +// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) { // cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue). cdm.evtlPosMtx.Lock() @@ -812,7 +812,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it // and runs the provided function with information about each chunk. It returns on the first error encountered. // NOTE: This method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all files. +// to set the maxt of all the file. func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() diff --git a/tsdb/compact.go b/tsdb/compact.go index e2a7ccf4d..0507af352 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -405,7 +405,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { t0 = tr * ((m.MinTime - tr + 1) / tr) } // Skip blocks that don't fall into the range. This can happen via mis-alignment or - // by being a multiple of the intended range. + // by being the multiple of the intended range. if m.MaxTime > t0+tr { i++ continue @@ -428,7 +428,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { return splitDirs } -// CompactBlockMetas merges many block metas into one, combining its source blocks together +// CompactBlockMetas merges many block metas into one, combining it's source blocks together // and adjusting compaction level. Min/Max time of result block meta covers all input blocks. func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { res := &BlockMeta{ @@ -495,7 +495,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) ([ return ulids, nil } -// shardedBlock describes a single *output* block during compaction. This struct is passed between +// shardedBlock describes single *output* block during compaction. This struct is passed between // compaction methods to wrap output block details, index and chunk writer together. // Shard index is determined by the position of this structure in the slice of output blocks. type shardedBlock struct { @@ -1151,7 +1151,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa chksIter := s.Iterator(chksIter) var chks []chunks.Meta for chksIter.Next() { - // We are not iterating in a streaming way over chunks as + // We are not iterating in streaming way over chunk as // it's more efficient to do bulk write for index and // chunk file purposes. chks = append(chks, chksIter.At()) @@ -1160,7 +1160,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa return fmt.Errorf("chunk iter: %w", err) } - // Skip series with all deleted chunks. + // Skip the series with all deleted chunks. if len(chks) == 0 { continue } diff --git a/tsdb/docs/format/index.md b/tsdb/docs/format/index.md index e0ef21bd5..53b77d9ab 100644 --- a/tsdb/docs/format/index.md +++ b/tsdb/docs/format/index.md @@ -40,7 +40,7 @@ Most of the sections described below start with a `len` field. It always specifi ### Symbol Table -The symbol table holds a sorted list of deduplicated strings that occur in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size. +The symbol table holds a sorted list of deduplicated strings that occurred in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size. The section contains a sequence of the string entries, each prefixed with the string's length in raw bytes. All strings are utf-8 encoded. Strings are referenced by sequential indexing. The strings are sorted in lexicographically ascending order. diff --git a/tsdb/docs/usage.md b/tsdb/docs/usage.md index 7bc1ae6c5..4b4278004 100644 --- a/tsdb/docs/usage.md +++ b/tsdb/docs/usage.md @@ -18,7 +18,7 @@ A `DB` has the following main components: * [`Head`](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Head) * [Blocks (persistent blocks)](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Blocks) -The `Head` is responsible for a lot. Here are its main components: +The `Head` is responsible for a lot. Here are its main components: * [WAL](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb/wal#WAL) (Write Ahead Log). * [`stripeSeries`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/head.go#L1292): diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 7545ab9a6..2dadb124e 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -111,7 +111,7 @@ func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics { return &m } -// NewCircularExemplarStorage creates a circular in memory exemplar storage. +// NewCircularExemplarStorage creates an circular in memory exemplar storage. // If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in // 1GB of extra memory, accounting for the fact that this is heap allocated space. // If len <= 0, then the exemplar storage is essentially a noop storage but can later be diff --git a/tsdb/head_append.go b/tsdb/head_append.go index fea57b933..97a85a9c3 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,8 @@ import ( "fmt" "math" "slices" + "strconv" + "strings" "github.com/go-kit/log/level" @@ -477,6 +479,17 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (*memSeries, error) { return s, nil } +func (a *headAppender) getOrCreateInfoMetric(lset labels.Labels, t int64, identifyingLabels []int) (*memSeries, error) { + s, err := a.getOrCreate(lset) + if err != nil { + return s, err + } + + a.head.postings.AddInfoMetric(storage.SeriesRef(s.ref), lset, t, identifyingLabels) + + return s, nil +} + // appendable checks whether the given sample is valid for appending to the series. (if we return false and no error) // The sample belongs to the out of order chunk if we return true and no error. // An error signifies the sample cannot be handled. @@ -754,12 +767,21 @@ func (a *headAppender) AppendInfoSample(ref storage.SeriesRef, lset labels.Label s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) if s == nil { var err error - s, err = a.getOrCreate(lset) + s, err = a.getOrCreateInfoMetric(lset, t, identifyingLabels) if err != nil { return 0, err } } + var ilb strings.Builder + for i, idx := range identifyingLabels { + if i > 0 { + ilb.WriteRune(',') + } + ilb.WriteString(strconv.Itoa(idx)) + } + level.Debug(a.head.logger).Log("msg", "trying to append info metric sample", "series_ref", ref, "t", t, "identifying_labels", ilb.String()) + s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -788,6 +810,7 @@ func (a *headAppender) AppendInfoSample(ref storage.SeriesRef, lset labels.Label a.maxt = t } + level.Debug(a.head.logger).Log("msg", "appending info sample", "series_ref", s.ref, "t", t, "identifying_labels", ilb.String()) a.infoSamples = append(a.infoSamples, record.RefInfoSample{ Ref: s.ref, T: t, @@ -890,6 +913,7 @@ func (a *headAppender) log() error { } } if len(a.infoSamples) > 0 { + level.Debug(a.head.logger).Log("msg", "writing info samples to WAL", "count", len(a.infoSamples)) rec = enc.InfoSamples(a.infoSamples, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { @@ -1261,6 +1285,7 @@ func (a *headAppender) Commit() (err error) { floatsAppended-- */ } + level.Debug(a.head.logger).Log("msg", "committed info sample to WAL and added to head", "series_ref", series.ref, "chunk_created", chunkCreated) } if chunkCreated { @@ -1468,6 +1493,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // appendInfoSample adds an info metric sample. // s should be the corresponding info metric/time series. +// Returns whether sample is in order and whether a chunk was created. func (s *memSeries) appendInfoSample(t int64, identifyingLabels []int, appendID uint64, o chunkOpts) (bool, bool) { // Every time we receive an OTLP write, a sample is written to target_info for the resource's job/instance combo // plus the resource's other attributes. The sample has the timestamp of the most recent timestamp among the @@ -1524,6 +1550,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts } // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. + // TODO: Reconsider chunk size heuristics for info metric samples. if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true diff --git a/tsdb/head_read.go b/tsdb/head_read.go index b8aa574f0..be77e327a 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "slices" + "strings" "sync" "github.com/go-kit/log/level" @@ -28,6 +29,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/annotations" ) func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { @@ -103,12 +105,29 @@ func (h *headIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Ma return labelNamesWithMatchers(ctx, h, matchers...) } +func (h *headIndexReader) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + dls := h.head.postings.InfoMetricDataLabels(ctx, lbls, t, matchers...) + return dls, nil, nil +} + // Postings returns the postings list iterator for the label pairs. func (h *headIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { switch len(values) { case 0: return index.EmptyPostings(), nil case 1: + if name == "__name__" && values[0] == "target_info" { + var vb strings.Builder + for i, v := range values { + if i > 0 { + vb.WriteRune(',') + } + vb.WriteString(v) + } + level.Debug(h.head.logger).Log("msg", "Looking up target_info", "values", vb.String()) + } else { + level.Debug(h.head.logger).Log("msg", "Looking up another metric than target_info", "name", name, "value", values[0]) + } return h.head.postings.Get(name, values[0]), nil default: res := make([]index.Postings, 0, len(values)) @@ -369,9 +388,11 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. s := h.head.series.getByID(sid) // This means that the series has been garbage collected. if s == nil { + fmt.Printf("headChunkReader.chunk: unable to get series by ID %d\n", sid) return nil, 0, storage.ErrNotFound } + fmt.Printf("headChunkReader.chunk: got series %#v\n", s) s.Lock() c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool) if err != nil { @@ -390,6 +411,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. // This means that the chunk is outside the specified range. if !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() + fmt.Printf("headChunkReader.chunk: Chunk is outside of specified range\n") return nil, 0, storage.ErrNotFound } @@ -408,6 +430,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc. } s.Unlock() + fmt.Printf("headChunkReader.chunk: Successfully got chunk: %#s\n", chk.Encoding()) return &safeHeadChunk{ Chunk: chk, s: s, @@ -697,6 +720,9 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState * ix := int(id) - int(s.firstChunkID) numSamples := c.NumSamples() + if numSamples == 0 { + fmt.Printf("memSeries.iterator: numSamples is 0, c: %T\n", c) + } stopAfter := numSamples if isoState != nil && !isoState.IsolationDisabled() { @@ -744,6 +770,8 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState * } stopAfter = numSamples - (appendIDsToConsider - index) if stopAfter < 0 { + fmt.Printf("Stopped in a previous chunk, numSamples: %d, appendIDsToConsider: %d, index: %d\n", + numSamples, appendIDsToConsider, index) stopAfter = 0 // Stopped in a previous chunk. } break @@ -751,11 +779,14 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState * } if stopAfter == 0 { + fmt.Printf("memSeries.iterator, creating nopIterator due to stopAfter == 0: numSamples: %d, c: %T\n", numSamples, c) return chunkenc.NewNopIterator() } if stopAfter == numSamples { + fmt.Printf("memSeries.iterator, creating iterator since stopAfter == numSamples: %T\n", c) return c.Iterator(it) } + fmt.Printf("memSeries.iterator, making stop iterator since stopAfter != numSamples, stopAfter: %d\n", stopAfter) return makeStopIterator(c, it, stopAfter) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index bd9f33237..5727a77f9 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -227,6 +227,7 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } return } + level.Debug(h.logger).Log("msg", "decoded info metric samples from WAL", "count", len(samples)) decoded <- samples case record.Metadata: meta := metadataPool.Get()[:0] @@ -630,6 +631,7 @@ func (wp *walSubsetProcessor) reuseInfoBuf() []record.RefInfoSample { func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, unknownInfoRefs, mmapOverlappingChunks uint64) { defer close(wp.output) defer close(wp.histogramsOutput) + defer close(wp.infoOutput) minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) @@ -704,6 +706,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp mint = s.t } } + select { + case wp.histogramsOutput <- in.histogramSamples: + default: + } for _, s := range in.infoSamples { if s.T < minValidTime { @@ -729,16 +735,6 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp mint = s.T } } - select { - case wp.output <- in.samples: - default: - } - - select { - case wp.histogramsOutput <- in.histogramSamples: - default: - } - select { case wp.infoOutput <- in.infoSamples: default: diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1d257d1fa..0aef1a502 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -36,6 +36,7 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/hashcache" + "github.com/prometheus/prometheus/util/annotations" ) const ( @@ -162,7 +163,7 @@ type Writer struct { postingsEncoder PostingsEncoder } -// TOC represents the index Table Of Contents that states where each section of the index starts. +// TOC represents index Table Of Content that states where each section of index starts. type TOC struct { Symbols uint64 Series uint64 @@ -1568,6 +1569,15 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe return values, err } +func (r *Reader) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + d := encoding.NewDecbufUvarintAt(r.b, int(r.toc.Series), castagnoliTable) + if d.Err() != nil { + return labels.Labels{}, nil, d.Err() + } + lbls, err := r.dec.InfoMetricDataLabels(ctx, d.Get(), lbls, t, matchers...) + return lbls, nil, err +} + // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. func (r *Reader) LabelNamesFor(ctx context.Context, postings Postings) ([]string, error) { @@ -2104,6 +2114,70 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu return d.Err() } +func (dec *Decoder) InfoMetricDataLabels(ctx context.Context, b []byte, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, error) { + d := encoding.Decbuf{B: b} + + // Skip past labels + k := d.Uvarint() + if d.Err() != nil { + return labels.Labels{}, fmt.Errorf("read series label count: %w", d.Err()) + } + for i := 0; i < k; i++ { + // Read label name and value offsets + d.Uvarint() + d.Uvarint() + if d.Err() != nil { + return labels.Labels{}, fmt.Errorf("read series label offsets: %w", d.Err()) + } + } + + // Read the chunks metadata. + k = d.Uvarint() + if d.Err() != nil { + return labels.Labels{}, fmt.Errorf("read series chunks count: %w", d.Err()) + } + if k == 0 { + return labels.Labels{}, nil + } + + t0 := d.Varint64() + maxt := int64(d.Uvarint64()) + t0 + ref0 := int64(d.Uvarint64()) + if d.Err() != nil { + return labels.Labels{}, fmt.Errorf("read series chunk base times: %w", d.Err()) + } + + /* + if t0 <= t && maxt >= t { + // TODO: Figure out whether this first chunk has info metric samples and whether + // there's a sample matching t, that has identifying labels contained in lbls. + } + */ + + t0 = maxt + + for i := 1; i < k; i++ { + mint := int64(d.Uvarint64()) + t0 + maxt := int64(d.Uvarint64()) + mint + + ref0 += d.Varint64() + t0 = maxt + + if d.Err() != nil { + return labels.Labels{}, fmt.Errorf("read meta for chunk %d: %w", i, d.Err()) + } + + /* + if mint <= t && maxt >= t { + // TODO: Figure out whether this chunk has info metric samples and whether + // there's a sample matching t, that has identifying labels contained in lbls. + } + */ + } + + return labels.Labels{}, d.Err() +} + func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index e1032ff12..c2bbed319 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -54,16 +54,18 @@ var ensureOrderBatchPool = sync.Pool{ // EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { - mtx sync.RWMutex - m map[string]map[string][]storage.SeriesRef - ordered bool + mtx sync.RWMutex + m map[string]map[string][]storage.SeriesRef + ordered bool + infoMetrics map[storage.SeriesRef][]infoMetricEntry } // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), - ordered: true, + m: make(map[string]map[string][]storage.SeriesRef, 512), + ordered: true, + infoMetrics: make(map[storage.SeriesRef][]infoMetricEntry, 512), } } @@ -71,8 +73,9 @@ func NewMemPostings() *MemPostings { // until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), - ordered: false, + m: make(map[string]map[string][]storage.SeriesRef, 512), + ordered: false, + infoMetrics: make(map[storage.SeriesRef][]infoMetricEntry, 512), } } @@ -218,8 +221,14 @@ func (p *MemPostings) Get(name, value string) Postings { p.mtx.RUnlock() if lp == nil { + if name == "__name__" && value == "target_info" { + fmt.Printf("did not find postings for label %s=%s\n", name, value) + } return EmptyPostings() } + if name == "__name__" && value == "target_info" { + fmt.Printf("found postings for label %s=%s: %#v\n", name, value, lp) + } return newListPostings(lp...) } @@ -400,6 +409,59 @@ func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { p.mtx.Unlock() } +// AddInfoMetric adds an info metric to the info metric index. +func (p *MemPostings) AddInfoMetric(id storage.SeriesRef, lset labels.Labels, t int64, identifyingLabels []int) { + p.mtx.Lock() + defer p.mtx.Unlock() + + ilsB := labels.NewScratchBuilder(len(identifyingLabels)) + dlsB := labels.NewScratchBuilder(lset.Len() - len(identifyingLabels)) + j := 0 + i := 0 + lset.Range(func(l labels.Label) { + if j >= len(identifyingLabels) && l.Name != "__name__" { + dlsB.Add(l.Name, l.Value) + return + } + + if i == identifyingLabels[j] { + j++ + ilsB.Add(l.Name, l.Value) + } else if l.Name != "__name__" { + dlsB.Add(l.Name, l.Value) + } + + i++ + }) + + entries := p.infoMetrics[id] + if entries == nil { + fmt.Printf("Adding new info metric entry for series %d, t: %d, identifying labels: %#v\n", id, t, ilsB) + p.infoMetrics[id] = []infoMetricEntry{ + { + MinT: t, + IdentifyingLabels: ilsB.Labels(), + DataLabels: dlsB.Labels(), + }, + } + return + } + + ils := ilsB.Labels() + dls := dlsB.Labels() + if !labels.Equal(entries[len(entries)-1].IdentifyingLabels, ils) { + fmt.Printf("Adding info metric entry for series %d, t: %d, identifying labels: %#v\n", id, t, ils) + entries[len(entries)-1].MaxT = t - 1 + entries = append(entries, infoMetricEntry{ + MinT: t, + IdentifyingLabels: ils, + DataLabels: dls, + }) + + p.infoMetrics[id] = entries + } +} + func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm, ok := p.m[l.Name] if !ok { @@ -424,6 +486,93 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { } } +func (p *MemPostings) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) labels.Labels { + p.mtx.RLock() + defer p.mtx.RUnlock() + + var infoMetrics map[storage.SeriesRef][]infoMetricEntry + if len(matchers) == 0 { + // Consider all info metrics. + infoMetrics = p.infoMetrics + fmt.Printf("MemPostings: not filtering info metrics\n") + } else { + // Pick only info metrics that have matching (potential data) labels. + infoMetrics = map[storage.SeriesRef][]infoMetricEntry{} + for _, m := range matchers { + for v, srs := range p.m[m.Name] { + if !m.Matches(v) { + continue + } + + for _, sr := range srs { + entries := p.infoMetrics[sr] + if len(entries) > 0 { + infoMetrics[sr] = entries + } + } + } + } + } + if len(infoMetrics) == 0 { + fmt.Printf("MemPostings: there are no info metrics\n") + return labels.Labels{} + } + + lblMap := make(map[string]labels.Label, lbls.Len()) + lbls.Range(func(l labels.Label) { + lblMap[l.Name] = l + }) + + dataLabels := map[string]string{} + for _, entries := range infoMetrics { + for _, entry := range entries { + // entry.MaxT == 0 means it's current + if entry.MinT > t || (entry.MaxT > 0 && entry.MaxT < t) { + fmt.Printf("MemPostings: Entry of MinT %d and MaxT %d doesn't fit timestamp %d\n", entry.MinT, entry.MaxT, t) + continue + } + + fmt.Printf("MemPostings: Entry of MinT %d and MaxT %d fits timestamp %d\n", entry.MinT, entry.MaxT, t) + + // This entry is for a time range corresponding to t. + isMatch := true + entry.IdentifyingLabels.Range(func(il labels.Label) { + if !isMatch { + return + } + + l, ok := lblMap[il.Name] + if !ok || l.Value != il.Value { + isMatch = false + fmt.Printf("MemPostings: This entry doesn't have matching identifying labels: %s = %s\n", il.Name, il.Value) + } + }) + if !isMatch { + // Entry doesn't have corresponding identifying labels. + continue + } + + fmt.Printf("MemPostings: This entry matches: %s\n", entry.DataLabels.String()) + entry.DataLabels.Range(func(l labels.Label) { + dataLabels[l.Name] = l.Value + }) + } + } + + dls := labels.NewScratchBuilder(len(dataLabels)) + for n, v := range dataLabels { + dls.Add(n, v) + } + return dls.Labels() +} + +type infoMetricEntry struct { + MinT int64 + MaxT int64 + IdentifyingLabels labels.Labels + DataLabels labels.Labels +} + func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { // We'll copy the values into a slice and then match over that, // this way we don't need to hold the mutex while we're matching, diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index acc0d45f7..e899a9c6e 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" ) var _ IndexReader = &OOOHeadIndexReader{} @@ -190,6 +191,11 @@ func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matc return labelValuesWithMatchers(ctx, oh, name, matchers...) } +func (oh *OOOHeadIndexReader) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + // TODO + panic("not implemented") +} + type chunkMetaAndChunkDiskMapperRef struct { meta chunks.Meta ref chunks.ChunkDiskMapperRef @@ -493,6 +499,10 @@ func (ir *OOOCompactionHeadIndexReader) LabelNames(context.Context, ...*labels.M return nil, errors.New("not implemented") } +func (ir *OOOCompactionHeadIndexReader) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return labels.Labels{}, nil, errors.New("not implemented") +} + func (ir *OOOCompactionHeadIndexReader) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) { return "", errors.New("not implemented") } diff --git a/tsdb/querier.go b/tsdb/querier.go index 7fbbb0350..36760763c 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "math" + "runtime/debug" "slices" "github.com/oklog/ulid" @@ -77,6 +78,10 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er }, nil } +func (q *blockBaseQuerier) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return q.index.InfoMetricDataLabels(ctx, lbls, t, matchers...) +} + func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.SortedLabelValues(ctx, name, matchers...) return res, nil, err @@ -130,17 +135,29 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora p = q.index.SortedPostings(p) } + isTargetInfo := false + for _, m := range ms { + if m.Name == "__name__" && m.Value == "target_info" { + isTargetInfo = true + break + } + } + + if isTargetInfo { + fmt.Printf("blockQuerier: returning block series set for target_info\n") + } + if hints != nil { mint = hints.Start maxt = hints.End disableTrimming = hints.DisableTrimming if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. - return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt, disableTrimming, isTargetInfo) } } - return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming, isTargetInfo) } // blockChunkQuerier provides chunk querying access to a single block database. @@ -178,6 +195,17 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints * if sortSeries { p = q.index.SortedPostings(p) } + + isTargetInfo := false + for _, m := range ms { + if m.Name == "__name__" && m.Value == "target_info" { + isTargetInfo = true + break + } + } + if isTargetInfo { + fmt.Printf("blockChunkQuerier: returning block series set for target_info\n") + } return NewBlockChunkSeriesSet(q.blockID, q.index, q.chunks, q.tombstones, p, mint, maxt, disableTrimming) } @@ -233,6 +261,13 @@ func PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, ms ...*lab return +1 }) + isTargetInfo := false + for _, m := range ms { + if m.Name == "__name__" && m.Value == "target_info" { + isTargetInfo = true + } + } + for _, m := range ms { if ctx.Err() != nil { return nil, ctx.Err() @@ -281,13 +316,22 @@ func PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, ms ...*lab its = append(its, it) default: // l="a" // Non-Not matcher, use normal postingsForMatcher. + if isTargetInfo { + fmt.Printf("Matcher %s\n", m.String()) + } it, err := postingsForMatcher(ctx, ix, m) if err != nil { return nil, err } if index.IsEmptyPostingsType(it) { + if isTargetInfo { + fmt.Printf("Matcher %s returned zero postings\n", m.String()) + } return index.EmptyPostings(), nil } + if isTargetInfo { + fmt.Printf("Matcher %s returned non-zero postings\n", m.String()) + } its = append(its, it) } default: // l="" @@ -581,6 +625,8 @@ type blockBaseSeriesSet struct { tombstones tombstones.Reader mint, maxt int64 disableTrimming bool + // XXX: Debugging + isDebug bool curr seriesData @@ -591,6 +637,7 @@ type blockBaseSeriesSet struct { func (b *blockBaseSeriesSet) Next() bool { for b.p.Next() { + fmt.Printf("blockBaseSeriesSet.Next: Getting series from %v (%T)\n", b.index, b.index) if err := b.index.Series(b.p.At(), &b.builder, &b.bufChks); err != nil { // Postings may be stale. Skip if no underlying series exists. if errors.Is(err, storage.ErrNotFound) { @@ -663,6 +710,7 @@ func (b *blockBaseSeriesSet) Next() bool { } b.curr.labels = b.builder.Labels() + // fmt.Printf("current labels: %s\n", b.curr.labels.String()) b.curr.chks = chks b.curr.intervals = intervals return true @@ -730,11 +778,13 @@ func (p *populateWithDelGenericSeriesIterator) reset(blockID ulid.ULID, cr Chunk // not copied irrespective of copyHeadChunk because it will be re-encoded later anyway. func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { if p.err != nil || p.i >= len(p.metas)-1 { + fmt.Printf("populateWithDelGenericSeriesIterator: nothing to return, err: %v\n", p.err) return false } p.i++ p.currMeta = p.metas[p.i] + fmt.Printf("populateWithDelGenericSeriesIterator: p.currMeta: %#v\n", p.currMeta) p.bufIter.Intervals = p.bufIter.Intervals[:0] for _, interval := range p.intervals { @@ -748,15 +798,18 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { if ok && copyHeadChunk && len(p.bufIter.Intervals) == 0 { // ChunkWithCopy will copy the head chunk. var maxt int64 + fmt.Printf("populateWithDelGenericSeriesIterator: Calling hcr.ChunkWithCopy\n") p.currMeta.Chunk, maxt, p.err = hcr.ChunkWithCopy(p.currMeta) // For the in-memory head chunk the index reader sets maxt as MaxInt64. We fix it here. p.currMeta.MaxTime = maxt } else { + fmt.Printf("populateWithDelGenericSeriesIterator: Calling p.cr.ChunkOrIterable: %T\n", p.cr) p.currMeta.Chunk, iterable, p.err = p.cr.ChunkOrIterable(p.currMeta) } if p.err != nil { p.err = fmt.Errorf("cannot populate chunk %d from block %s: %w", p.currMeta.Ref, p.blockID.String(), p.err) + fmt.Printf("populateWithDelGenericSeriesIterator: nothing to return, err: %v\n", p.err) return false } @@ -766,18 +819,22 @@ func (p *populateWithDelGenericSeriesIterator) next(copyHeadChunk bool) bool { // If there is no overlap with deletion intervals and a single chunk is // returned, we can take chunk as it is. p.currDelIter = nil + fmt.Printf("populateWithDelGenericSeriesIterator: Returning true because no intervals\n") return true } // Otherwise we need to iterate over the samples in the single chunk // and create new chunks. p.bufIter.Iter = p.currMeta.Chunk.Iterator(p.bufIter.Iter) p.currDelIter = &p.bufIter + fmt.Printf("populateWithDelGenericSeriesIterator: Returning true because we have an iterator: %T\n", p.bufIter) return true } + fmt.Printf("Don't have p.currMeta.Chunk\n") // Otherwise, use the iterable to create an iterator. p.bufIter.Iter = iterable.Iterator(p.bufIter.Iter) p.currDelIter = &p.bufIter + fmt.Printf("populateWithDelGenericSeriesIterator: Returning true because we created an iterator: %T\n", p.currDelIter) return true } @@ -839,11 +896,16 @@ func (p *populateWithDelSeriesIterator) Next() chunkenc.ValueType { for p.next(false) { if p.currDelIter != nil { p.curr = p.currDelIter + fmt.Printf("populateWithDelSeriesIterator.Next: Obtained current iterator: %T\n", p.curr) } else { p.curr = p.currMeta.Chunk.Iterator(p.curr) + fmt.Printf("populateWithDelSeriesIterator.Next: Obtained current iterator: %T\n", p.curr) } if valueType := p.curr.Next(); valueType != chunkenc.ValNone { + fmt.Printf("populateWithDelSeriesIterator: Value type is not none %s\n", valueType) return valueType + } else { + fmt.Printf("populateWithDelSeriesIterator: Value type is none :(\n") } } return chunkenc.ValNone @@ -1036,6 +1098,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { } var ils []int t, ils = p.currDelIter.AtInfoSample() + fmt.Printf("populateWithDelChunkSeriesIterator.populateCurrForSingleChunk: appending info sample; t: %d, ils: %#v\n", t, ils) app.AppendInfoSample(t, ils) } default: @@ -1135,6 +1198,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { { var ils []int t, ils = p.currDelIter.AtInfoSample() + fmt.Printf("populateWithDelChunkSeriesIterator.populateChunksFromIterable: appending info sample; t: %d, ils: %#v\n", t, ils) app.AppendInfoSample(t, ils) } default: @@ -1188,7 +1252,7 @@ type blockSeriesSet struct { blockBaseSeriesSet } -func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.SeriesSet { +func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming, isDebug bool) storage.SeriesSet { return &blockSeriesSet{ blockBaseSeriesSet{ index: i, @@ -1198,12 +1262,17 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde mint: mint, maxt: maxt, disableTrimming: disableTrimming, + isDebug: isDebug, }, } } func (b *blockSeriesSet) At() storage.Series { // At can be looped over before iterating, so save the current values locally. + if b.isDebug { + fmt.Printf("blockSeriesSet: Returning series %s\n", b.curr.labels.String()) + debug.PrintStack() + } return &blockSeriesEntry{ chunks: b.chunks, blockID: b.blockID, @@ -1353,6 +1422,7 @@ func (it *DeletedIterator) Seek(t int64) chunkenc.ValueType { } func (it *DeletedIterator) Next() chunkenc.ValueType { + fmt.Printf("DeletedIterator.Next, wrapping %T\n", it.Iter) Outer: for valueType := it.Iter.Next(); valueType != chunkenc.ValNone; valueType = it.Iter.Next() { ts := it.AtT() diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 61766e3ee..744ac07c1 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2475,6 +2475,10 @@ func (m mockIndex) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([ return l, nil } +func (m mockIndex) InfoMetricDataLabels(ctx context.Context, lbls labels.Labels, t int64, matchers ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + panic("not implemented") +} + func BenchmarkQueryIterator(b *testing.B) { cases := []struct { numBlocks int diff --git a/tsdb/record/record.go b/tsdb/record/record.go index 6ffbb87b0..d771b7072 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -167,7 +167,7 @@ type RefMetadata struct { Help string } -// RefExemplar is an exemplar with the labels, timestamp, value the exemplar was collected/observed with, and a reference to a series. +// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series. type RefExemplar struct { Ref chunks.HeadSeriesRef T int64 @@ -889,7 +889,7 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b return buf.Get() } -// EncodeFloatHistogram encodes the Float Histogram into a byte slice. +// Encode encodes the Float Histogram into a byte slice. func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) { buf.PutByte(byte(h.CounterResetHint)) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index db4990598..2b8c93a89 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -246,12 +246,13 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head } stats.TotalSamples += len(floatHistogramSamples) stats.DroppedSamples += len(floatHistogramSamples) - len(repl) - + case record.InfoSamples: infoSamples, err = dec.InfoSamples(rec, infoSamples) if err != nil { return nil, fmt.Errorf("decode info samples: %w", err) } + level.Debug(logger).Log("msg", "Checkpointing info samples", "count", len(infoSamples)) // Drop irrelevant infoSamples in place. repl := infoSamples[:0] for _, s := range infoSamples { @@ -262,6 +263,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head if len(repl) > 0 { buf = enc.InfoSamples(repl, buf) } + level.Debug(logger).Log("msg", "Counting filtered info samples from checkpointing", "count", len(repl)) stats.TotalSamples += len(infoSamples) stats.DroppedSamples += len(infoSamples) - len(repl) diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 5c23fc5c7..830b97f8c 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -667,12 +667,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { if !tail { break } - samples, err := dec.InfoSamples(rec, infoSamples[:0]) + infoSamples, err := dec.InfoSamples(rec, infoSamples[:0]) if err != nil { w.recordDecodeFailsMetric.Inc() return err } - for _, s := range samples { + for _, s := range infoSamples { if s.T > w.startTimestamp { if !w.sendSamples { w.sendSamples = true @@ -683,6 +683,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { } } if len(infoSamplesToSend) > 0 { + level.Debug(w.logger).Log("msg", "read segment - appending info samples", "count", len(infoSamplesToSend)) w.writer.AppendInfoSamples(infoSamplesToSend) infoSamplesToSend = infoSamplesToSend[:0] } diff --git a/util/jsonutil/marshal.go b/util/jsonutil/marshal.go index d715eabe6..1f597072c 100644 --- a/util/jsonutil/marshal.go +++ b/util/jsonutil/marshal.go @@ -135,3 +135,18 @@ func MarshalHistogram(h *histogram.FloatHistogram, stream *jsoniter.Stream) { } stream.WriteObjectEnd() } + +// MarshalInfoSample marshals an info sample using the passed jsoniter stream. +func MarshalInfoSample(identifyingLabels []int, stream *jsoniter.Stream) { + stream.WriteObjectStart() + stream.WriteObjectField(`identifyingLabels`) + stream.WriteArrayStart() + for i, idx := range identifyingLabels { + if i > 0 { + stream.WriteMore() + } + stream.WriteInt(idx) + } + stream.WriteArrayEnd() + stream.WriteObjectEnd() +} diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index e76a1a3d3..19a8f04da 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -189,6 +189,10 @@ func (t errorTestQuerier) Select(_ context.Context, sortSeries bool, hints *stor return storage.ErrSeriesSet(t.err) } +func (t errorTestQuerier) InfoMetricDataLabels(context.Context, labels.Labels, int64, ...*labels.Matcher) (labels.Labels, annotations.Annotations, error) { + return nil, nil, t.err +} + type errorTestSeriesSet struct { err error } diff --git a/web/api/v1/json_codec.go b/web/api/v1/json_codec.go index ae5cc7358..bd5bce2ec 100644 --- a/web/api/v1/json_codec.go +++ b/web/api/v1/json_codec.go @@ -78,7 +78,6 @@ func marshalSeriesJSON(s promql.Series, stream *jsoniter.Stream) { stream.WriteObjectField(`metric`) marshalLabelsJSON(s.Metric, stream) - // TODO: Handle info metric samples for i, p := range s.Floats { stream.WriteMore() if i == 0 { @@ -101,6 +100,20 @@ func marshalSeriesJSON(s promql.Series, stream *jsoniter.Stream) { if len(s.Histograms) > 0 { stream.WriteArrayEnd() } + + if len(s.InfoSamples) > 0 { + stream.WriteMore() + stream.WriteObjectField(`infoSamples`) + stream.WriteArrayStart() + for i, p := range s.InfoSamples { + if i > 0 { + stream.WriteMore() + } + marshalInfoPointJSON(unsafe.Pointer(&p), stream) + } + stream.WriteArrayEnd() + } + stream.WriteObjectEnd() } @@ -148,11 +161,13 @@ func marshalSampleJSON(s promql.Sample, stream *jsoniter.Stream) { stream.WriteArrayStart() jsonutil.MarshalTimestamp(s.T, stream) stream.WriteMore() - // TODO: Handle info metric samples - if s.H == nil { - jsonutil.MarshalFloat(s.F, stream) - } else { + switch { + case s.H != nil: jsonutil.MarshalHistogram(s.H, stream) + case s.IdentifyingLabels != nil: + jsonutil.MarshalInfoSample(s.IdentifyingLabels, stream) + default: + jsonutil.MarshalFloat(s.F, stream) } stream.WriteArrayEnd() stream.WriteObjectEnd() @@ -186,6 +201,28 @@ func marshalHPointJSON(p promql.HPoint, stream *jsoniter.Stream) { stream.WriteArrayEnd() } +// marshalInfoPointJSON writes {"timestamp": , "identifyingLabels": []}. +func marshalInfoPointJSON(ptr unsafe.Pointer, stream *jsoniter.Stream) { + p := *((*promql.InfoPoint)(ptr)) + stream.WriteObjectStart() + + stream.WriteObjectField(`timestamp`) + jsonutil.MarshalTimestamp(p.T, stream) + + stream.WriteMore() + stream.WriteObjectField(`identifyingLabels`) + stream.WriteArrayStart() + for i, idx := range p.IdentifyingLabels { + if i > 0 { + stream.WriteMore() + } + stream.WriteInt(idx) + } + stream.WriteArrayEnd() + + stream.WriteObjectEnd() +} + // marshalExemplarJSON writes. // // { diff --git a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts index 9d5d55f60..55082939e 100644 --- a/web/ui/module/codemirror-promql/src/complete/promql.terms.ts +++ b/web/ui/module/codemirror-promql/src/complete/promql.terms.ts @@ -281,6 +281,12 @@ export const functionIdentifierTerms = [ info: 'Calculate the increase in value over a range of time (for counters)', type: 'function', }, + { + label: 'info', + detail: 'function', + info: 'Add data labels from corresponding info metrics', + type: 'function', + }, { label: 'irate', detail: 'function', diff --git a/web/ui/module/codemirror-promql/src/types/function.ts b/web/ui/module/codemirror-promql/src/types/function.ts index 2505edc22..50fa400a3 100644 --- a/web/ui/module/codemirror-promql/src/types/function.ts +++ b/web/ui/module/codemirror-promql/src/types/function.ts @@ -50,6 +50,7 @@ import { Hour, Idelta, Increase, + Info, Irate, LabelJoin, LabelReplace, @@ -336,6 +337,12 @@ const promqlFunctions: { [key: number]: PromQLFunction } = { variadic: 0, returnType: ValueType.vector, }, + [Info]: { + name: 'info', + argTypes: [ValueType.vector, ValueType.vector], + variadic: 0, + returnType: ValueType.vector, + }, [Irate]: { name: 'irate', argTypes: [ValueType.matrix], diff --git a/web/ui/module/lezer-promql/src/promql.grammar b/web/ui/module/lezer-promql/src/promql.grammar index fd4edddf2..5e1a28c9a 100644 --- a/web/ui/module/lezer-promql/src/promql.grammar +++ b/web/ui/module/lezer-promql/src/promql.grammar @@ -143,6 +143,7 @@ FunctionIdentifier { Hour | Idelta | Increase | + Info | Irate | LabelReplace | LabelJoin | @@ -384,6 +385,7 @@ NumberLiteral { Hour { condFn<"hour"> } Idelta { condFn<"idelta"> } Increase { condFn<"increase"> } + Info { condFn<"info"> } Irate { condFn<"irate"> } LabelReplace { condFn<"label_replace"> } LabelJoin { condFn<"label_join"> }