Skip to content

Commit

Permalink
fix: promtail; clean up metrics generated from logs after a config re…
Browse files Browse the repository at this point in the history
…load. (#11882)
  • Loading branch information
ptodev committed Apr 24, 2024
1 parent 1933f0e commit 39a7181
Show file tree
Hide file tree
Showing 19 changed files with 121 additions and 5 deletions.
6 changes: 6 additions & 0 deletions clients/pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (c *cri) Name() string {
return "cri"
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions clients/pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
metrics[name] = collector
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: *cfgs,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -142,6 +142,19 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
for name, collector := range m.metrics {
Expand Down Expand Up @@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
// nolint:goconst
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
Expand Down
9 changes: 8 additions & 1 deletion clients/pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down Expand Up @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
}

var (
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type Pipeline struct {
dropCount *prometheus.CounterVec
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
Expand Down Expand Up @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
}
return creator(params)
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/structuredmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,8 @@ The metrics stage allows for defining metrics from the extracted data.

Created metrics are not pushed to Loki and are instead exposed via Promtail's
`/metrics` endpoint. Prometheus should be configured to scrape Promtail to be
able to retrieve the metrics configured by this stage.
able to retrieve the metrics configured by this stage.
If Promtail's configuration is reloaded, all metrics will be reset.


```yaml
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/stages/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ The `metrics` stage is an action stage that allows for defining and updating
metrics based on data from the extracted map. Note that created metrics are not
pushed to Loki and are instead exposed via Promtail's `/metrics` endpoint.
Prometheus should be configured to scrape Promtail to be able to retrieve the
metrics configured by this stage.
metrics configured by this stage. If Promtail's configuration is reloaded,
all metrics will be reset.

## Schema

Expand Down

0 comments on commit 39a7181

Please sign in to comment.