Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: promtail; clean up metrics generated from logs after a config reload. #11882

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clients/pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (c *cri) Name() string {
return "cri"
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions clients/pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
metrics[name] = collector
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: *cfgs,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -142,6 +142,19 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}
ptodev marked this conversation as resolved.
Show resolved Hide resolved

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
for name, collector := range m.metrics {
Expand Down Expand Up @@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
// nolint:goconst
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
Expand Down
9 changes: 8 additions & 1 deletion clients/pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
Comment on lines +131 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we specifically check for the absence of the metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After pl.Cleanup(), no metrics should be visible. I don't see an advantage in checking individual metrics when there shouldn't be any of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there's originally two metrics stages, and after we reload there should be only one? we should still check for it's existence and correct value, I think right now we're deleting metrics for stages that will still exist after the reload which would not be correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that deleting all the metrics is the right thing to do. The fact that a metric exists doesn't mean that it's used in the same way in the new config. It's possible that the user changed the meaning of each metric, but retained the metric names. In that case it would make sense to start with fresh metrics.

In the future we could make the code smart and reload only the stages which changed, but I think that's a separate issue. To do that, we could avoid calling the Cleanup method for stages which are completely unchanged as a whole.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should document this clearly in the metrics config section

and maybe as a note here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this line to both doc pages:

If Promtail's configuration is reloaded, all metrics will be reset.

}

func TestNegativeGauge(t *testing.T) {
Expand Down Expand Up @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
}

var (
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type Pipeline struct {
dropCount *prometheus.CounterVec
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
Expand Down Expand Up @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
}
return creator(params)
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/structuredmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ The metrics stage allows for defining metrics from the extracted data.

Created metrics are not pushed to Loki and are instead exposed via Promtail's
`/metrics` endpoint. Prometheus should be configured to scrape Promtail to be
able to retrieve the metrics configured by this stage.
able to retrieve the metrics configured by this stage.
If Promtail's configuration is reloaded, all metrics will be reset.


```yaml
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/stages/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ The `metrics` stage is an action stage that allows for defining and updating
metrics based on data from the extracted map. Note that created metrics are not
pushed to Loki and are instead exposed via Promtail's `/metrics` endpoint.
Prometheus should be configured to scrape Promtail to be able to retrieve the
metrics configured by this stage.
metrics configured by this stage. If Promtail's configuration is reloaded,
all metrics will be reset.

## Schema

Expand Down
Loading