Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Add a stream lagging metric #2618

Merged
merged 11 commits into from
Sep 18, 2020
11 changes: 11 additions & 0 deletions pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ func (c *metricVec) With(labels model.LabelSet) prometheus.Metric {
return metric
}

func (c *metricVec) Delete(labels model.LabelSet) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
_, ok := c.metrics[fp]
if ok {
delete(c.metrics, fp)
}
return ok
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/promtail/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/prometheus/common/model"
)

type InstrumentedEntryHandler interface {
EntryHandler
UnregisterLatencyMetric(labels model.LabelSet)
}

// EntryHandler is something that can "handle" entries.
type EntryHandler interface {
Handle(labels model.LabelSet, time time.Time, entry string) error
Expand Down
54 changes: 48 additions & 6 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"sync"
"time"

"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/logentry/metric"
"github.com/grafana/loki/pkg/promtail/api"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -34,39 +37,43 @@ const (
// Label reserved to override the tenant ID while processing
// pipeline stages
ReservedLabelTenantID = "__tenant_id__"

LatencyLabel = "filename"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ho I see, I think we need a better name.

HostLabel = "host"
)

var (
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{"host"})
}, []string{HostLabel})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{"host"})
}, []string{HostLabel})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
}, []string{HostLabel})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{"host"})
}, []string{HostLabel})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
}, []string{HostLabel})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", "host"})
}, []string{"status_code", HostLabel})
streamLag *metric.Gauges

countersWithHost = []*prometheus.CounterVec{
encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries,
Expand All @@ -82,6 +89,16 @@ func init() {
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
var err error
streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
"Difference between current time and last batch timestamp for successful sends",
metric.GaugeConfig{Action: "set"},
int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic(err)
}
prometheus.MustRegister(streamLag)
}

// Client pushes entries to Loki and can be stopped
Expand Down Expand Up @@ -234,6 +251,26 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
return
}
var lblSet model.LabelSet
for i := range lbls {
if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{
model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
}
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Now().Sub(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
return
}

Expand Down Expand Up @@ -330,3 +367,8 @@ func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
}}
return nil
}

func (c *client) UnregisterLatencyMetric(labels model.LabelSet) {
labels[HostLabel] = model.LabelValue(c.cfg.URL.Host)
streamLag.Delete(labels)
}
2 changes: 1 addition & 1 deletion pkg/promtail/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Config struct {
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&c.URL, prefix+"client.url", "URL of log server")
f.DurationVar(&c.BatchWait, prefix+"client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 1024*1024, "Maximum batch size to accrue before sending. ")
// Default backoff schedule: 0.5s, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(4.267m) For a total time of 511.5s(8.5m) before logs are lost
f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"client.max-retries", 10, "Maximum number of retires when sending batches.")
f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"client.min-backoff", 500*time.Millisecond, "Initial backoff time between retries.")
Expand Down
4 changes: 4 additions & 0 deletions pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/targets/target"
)
Expand Down Expand Up @@ -316,6 +317,9 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
t.positions.Remove(tailer.path)
delete(t.tails, p)
}
if h, ok := t.handler.(api.InstrumentedEntryHandler); ok {
h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)})
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down