Skip to content

Commit

Permalink
feat: Add metrics for number of patterns detected & evicted (#12918)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored May 15, 2024
1 parent a1b1eeb commit bc53b33
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 42 deletions.
17 changes: 13 additions & 4 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type Config struct {
ParamString string
}

func createLogClusterCache(maxSize int) *LogClusterCache {
func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
if maxSize == 0 {
maxSize = math.MaxInt
}
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, nil)
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, onEvict)
return &LogClusterCache{
cache: cache,
}
Expand Down Expand Up @@ -146,16 +146,21 @@ func DefaultConfig() *Config {
}
}

func New(config *Config) *Drain {
func New(config *Config, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2
var evictFn func(int, *LogCluster)
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
}
return d
}
Expand All @@ -165,6 +170,7 @@ type Drain struct {
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
}

func (d *Drain) Clusters() []*LogCluster {
Expand Down Expand Up @@ -195,6 +201,9 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
if d.metrics != nil {
d.metrics.PatternsDetectedTotal.Inc()
}
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens
Expand Down
56 changes: 27 additions & 29 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
}{
{
// High variation leads to many patterns including some that are too generic (many tokens matched) and some that are too specific (too few matchers)
name: "Generate patterns on high variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/agent-logfmt.txt",
name: `Generate patterns on high variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/agent-logfmt.txt`,
patterns: []string{
"ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg=\"Adding target\" key=\"/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", batch_kubernetes_io_job_name=\\\"testcoordinator-job-2665838\\\", container=\\\"testcoordinator\\\", controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", job=\\\"k6-cloud/testcoordinator\\\", job_name=\\\"testcoordinator-job-2665838\\\", name=\\\"testcoordinator\\\", namespace=\\\"k6-cloud\\\", pod=\\\"testcoordinator-job-2665838-9g8ds\\\"}\"",
"<_> <_> level=info component=logs logs_config=default <_> target\" <_> <_> <_> <_> <_> <_>",
Expand All @@ -42,9 +42,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation leads to fewer patterns including some with limited value (single lines, no matchers)
name: "Generate patterns on low variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/ingester-logfmt.txt",
name: `Generate patterns on low variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/ingester-logfmt.txt`,
patterns: []string{
"<_> caller=head.go:216 level=debug tenant=987678 msg=\"profile is empty after delta computation\" metricName=memory",
"ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg=\"GET /debug/pprof/delta_mutex (200) 1.161082ms\"",
Expand All @@ -53,9 +53,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation logs in json leads to a high number of patterns with very few matchers
name: "Generate patterns on json formatted logs",
drain: New(DefaultConfig()),
inputFile: "testdata/drone-json.txt",
name: `Generate patterns on json formatted logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/drone-json.txt`,
patterns: []string{
"<_> capacity <_>",
"<_> capacity changes <_>",
Expand Down Expand Up @@ -96,15 +96,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for distributor logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/distributor-logfmt.txt",
patterns: []string{
`<_> caller=http.go:194 level=debug <_> <_> msg="POST <_> <_> <_>`,
},
},
{
name: "Patterns for journald logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/journald.txt",
patterns: []string{
"2024-05-07T11:59:43.484606Z INFO ExtHandler ExtHandler Downloading agent manifest",
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kafka logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kafka.txt",
patterns: []string{
`[2024-05-07 <_> INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-0, dir=/bitnami/kafka/data] Deleting segment files <_> size=948, <_> <_> (kafka.log.LocalLog$)`,
Expand All @@ -219,7 +219,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kubernetes logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kubernetes.txt",
patterns: []string{
"I0507 12:04:17.596484 1 highnodeutilization.go:107] \"Criteria for a node below target utilization\" CPU=50 Mem=50 Pods=100",
Expand Down Expand Up @@ -252,15 +252,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for vault logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/vault.txt",
patterns: []string{
"<_> [INFO] expiration: revoked lease: <_>",
},
},
{
name: "Patterns for calico logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/calico.txt",
patterns: []string{
`2024-05-08 <_> [DEBUG][216945] felix/table.go 870: Found forward-reference <_> ipVersion=0x4 <_> <_> [0:0]" table="nat"`,
Expand Down Expand Up @@ -383,8 +383,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
inputLines []string
}{
{
name: `should match each line against a pattern`,
drain: New(DefaultConfig()),
name: "should match each line against a pattern",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test test test",
"test test test",
Expand All @@ -393,8 +393,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
},
},
{
name: `should also match newlines`,
drain: New(DefaultConfig()),
name: "should also match newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test test test
`,
Expand All @@ -413,7 +413,6 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
}
t.Log(`Learned clusters`, tt.drain.Clusters())

for _, line := range tt.inputLines {
match := tt.drain.Match(line)
Expand All @@ -432,8 +431,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
inputLines []string
}{
{
name: `should extract patterns that all lines match`,
drain: New(DefaultConfig()),
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test 1 test",
"test 2 test",
Expand All @@ -442,8 +441,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with newlines`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test
`,
Expand All @@ -456,8 +455,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test `,
`test 2 test `,
Expand All @@ -466,8 +465,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line starts with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
` test 1 test`,
` test 2 test`,
Expand All @@ -484,7 +483,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}
require.Equal(t, 1, len(tt.drain.Clusters()))
cluster := tt.drain.Clusters()[0]
t.Log(`Extracted cluster: `, cluster)

matcher, err := pattern.ParseLineFilter([]byte(cluster.String()))
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package drain

import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
}
2 changes: 1 addition & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.logger)
inst, err = newInstance(instanceID, i.logger, i.metrics)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig())
d := drain.New(drain.DefaultConfig(), nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestInstancePushQuery(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger())
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Expand Down
6 changes: 4 additions & 2 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type instance struct {
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
}

func newInstance(instanceID string, logger log.Logger) (*instance, error) {
func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) {
index, err := index.NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, err
Expand All @@ -43,6 +44,7 @@ func newInstance(instanceID string, logger log.Logger) (*instance, error) {
instanceID: instanceID,
streams: newStreamsMap(),
index: index,
metrics: metrics,
}
i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint)
return i, nil
Expand Down Expand Up @@ -138,7 +140,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels)
s, err := newStream(fp, sortedLabels, i.metrics)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
)

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
flushQueueLength prometheus.Gauge
patternsDiscardedTotal prometheus.Counter
patternsDetectedTotal prometheus.Counter
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
Expand All @@ -17,5 +19,17 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_evicted_total",
Help: "The total number of patterns evicted from the LRU cache.",
}),
patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}),
}
}
6 changes: 5 additions & 1 deletion pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ type stream struct {
func newStream(
fp model.Fingerprint,
labels labels.Labels,
metrics *ingesterMetrics,
) (*stream, error) {
return &stream{
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
patterns: drain.New(drain.DefaultConfig()),
patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal,
PatternsDetectedTotal: metrics.patternsDetectedTotal,
}),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/pattern/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestAddStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = stream.Push(context.Background(), []push.Entry{
Expand Down Expand Up @@ -44,7 +44,7 @@ func TestAddStream(t *testing.T) {

func TestPruneStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = stream.Push(context.Background(), []push.Entry{
Expand Down

0 comments on commit bc53b33

Please sign in to comment.