From 8c89db4da560dcb6c088e05d2d480ea2a37dc929 Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Wed, 29 Mar 2023 11:52:11 -0500 Subject: [PATCH] Alerting: No longer index state history log streams by instance labels (#65474) * Remove private labels * No longer index by instance labels * Labels are now invariant, only build them once * Remove bucketing since everything is in a single stream * Refactor statesToStreams to only return a single unified log stream * Don't query on labels that no longer exist * Move selector logic to loki layer, genericize client to work in terms of straight logQL * Add support for line-level label filters in query * Combine existing selector tests for better parallelism * Tests for logQL construction * Underscore instead of dot for unwrapping labels in logql (cherry picked from commit a416100abc5f649ada57201cb855d79116f40c8e) --- pkg/services/ngalert/state/historian/loki.go | 139 +++++++++++------- .../ngalert/state/historian/loki_http.go | 44 +----- .../ngalert/state/historian/loki_http_test.go | 8 +- .../ngalert/state/historian/loki_test.go | 137 ++++++++++------- 4 files changed, 167 insertions(+), 161 deletions(-) diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index 9dbd1937d8ad..a81a082ed8a1 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -40,7 +40,7 @@ const ( type remoteLokiClient interface { ping(context.Context) error push(context.Context, []stream) error - query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) + query(ctx context.Context, logQL string, start, end int64) (QueryRes, error) } // RemoteLokibackend is a state.Historian that records state history to an external Loki instance. @@ -70,10 +70,10 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error { // Record writes a number of state transitions for a given rule to an external Loki instance. func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error { logger := h.log.FromContext(ctx) - streams := statesToStreams(rule, states, h.externalLabels, logger) + logStream := statesToStream(rule, states, h.externalLabels, logger) errCh := make(chan error, 1) - if len(streams) == 0 { + if len(logStream.Values) == 0 { close(errCh) return errCh } @@ -83,16 +83,12 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM org := fmt.Sprint(rule.OrgID) h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc() - samples := 0 - for _, s := range streams { - samples += len(s.Values) - } - h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(samples)) + h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values))) - if err := h.recordStreams(ctx, streams, logger); err != nil { + if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil { logger.Error("Failed to save alert state history batch", "error", err) h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc() - h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(samples)) + h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values))) errCh <- fmt.Errorf("failed to save alert state history batch: %w", err) } }() @@ -101,12 +97,12 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM // Query retrieves state history entries from an external Loki instance and formats the results into a dataframe. func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { - selectors, err := buildSelectors(query) + logQL, err := buildLogQuery(query) if err != nil { - return nil, fmt.Errorf("failed to build the provided selectors: %w", err) + return nil, err } // Timestamps are expected in RFC3339Nano. - res, err := h.client.query(ctx, selectors, query.From.UnixNano(), query.To.UnixNano()) + res, err := h.client.query(ctx, logQL, query.From.UnixNano(), query.To.UnixNano()) if err != nil { return nil, err } @@ -114,8 +110,8 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery } func buildSelectors(query models.HistoryQuery) ([]Selector, error) { - // +2 as OrgID and the state history label will always be selectors at the API level. - selectors := make([]Selector, len(query.Labels)+2) + // OrgID and the state history label are static and will be included in all queries. + selectors := make([]Selector, 2) // Set the predefined selector orgID. selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID)) @@ -131,17 +127,6 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) { } selectors[1] = selector - // Set the label selectors - i := 2 - for label, val := range query.Labels { - selector, err = NewSelector(label, "=", val) - if err != nil { - return nil, err - } - selectors[i] = selector - i++ - } - // Set the optional special selector rule_id if query.RuleUID != "" { rsel, err := NewSelector(RuleUIDLabel, "=", query.RuleUID) @@ -237,26 +222,21 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) { return frame, nil } -func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream { - buckets := make(map[string][]row) // label repr (JSON) -> entries +func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream { + labels := mergeLabels(make(map[string]string), externalLabels) + // System-defined labels take precedence over user-defined external labels. + labels[StateHistoryLabelKey] = StateHistoryLabelValue + labels[OrgIDLabel] = fmt.Sprint(rule.OrgID) + labels[RuleUIDLabel] = fmt.Sprint(rule.UID) + labels[GroupLabel] = fmt.Sprint(rule.Group) + labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) + + samples := make([]row, 0, len(states)) for _, state := range states { if !shouldRecord(state) { continue } - labels := mergeLabels(removePrivateLabels(state.State.Labels), externalLabels) - labels[StateHistoryLabelKey] = StateHistoryLabelValue - labels[OrgIDLabel] = fmt.Sprint(rule.OrgID) - labels[RuleUIDLabel] = fmt.Sprint(rule.UID) - labels[GroupLabel] = fmt.Sprint(rule.Group) - labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) - lblJsn, err := json.Marshal(labels) - if err != nil { - logger.Error("Failed to marshal labels to JSON", "error", err) - continue - } - repr := string(lblJsn) - entry := lokiEntry{ SchemaVersion: 1, Previous: state.PreviousFormatted(), @@ -264,7 +244,7 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition Values: valuesAsDataBlob(state.State), DashboardUID: rule.DashboardUID, PanelID: rule.PanelID, - InstanceLabels: state.Labels, + InstanceLabels: removePrivateLabels(state.Labels), } if state.State.State == eval.Error { entry.Error = state.Error.Error() @@ -277,26 +257,16 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition } line := string(jsn) - buckets[repr] = append(buckets[repr], row{ + samples = append(samples, row{ At: state.State.LastEvaluationTime, Val: line, }) } - result := make([]stream, 0, len(buckets)) - for repr, rows := range buckets { - labels, err := data.LabelsFromString(repr) - if err != nil { - logger.Error("Failed to parse frame labels, skipping state history batch: %w", err) - continue - } - result = append(result, stream{ - Stream: labels, - Values: rows, - }) + return stream{ + Stream: labels, + Values: samples, } - - return result } func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { @@ -339,3 +309,60 @@ func jsonifyRow(line string) (json.RawMessage, error) { } return json.Marshal(entry) } + +type Selector struct { + // Label to Select + Label string + Op Operator + // Value that is expected + Value string +} + +func NewSelector(label, op, value string) (Selector, error) { + if !isValidOperator(op) { + return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op) + } + return Selector{Label: label, Op: Operator(op), Value: value}, nil +} + +func selectorString(selectors []Selector) string { + if len(selectors) == 0 { + return "{}" + } + // Build the query selector. + query := "" + for _, s := range selectors { + query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value) + } + // Remove the last comma, as we append one to every selector. + query = query[:len(query)-1] + return "{" + query + "}" +} + +func isValidOperator(op string) bool { + switch op { + case "=", "!=", "=~", "!~": + return true + } + return false +} + +func buildLogQuery(query models.HistoryQuery) (string, error) { + selectors, err := buildSelectors(query) + if err != nil { + return "", fmt.Errorf("failed to build the provided selectors: %w", err) + } + + logQL := selectorString(selectors) + + labelFilters := "" + for k, v := range query.Labels { + labelFilters += fmt.Sprintf(" | labels_%s=%q", k, v) + } + + if labelFilters != "" { + logQL = fmt.Sprintf("%s | json%s", logQL, labelFilters) + } + + return logQL, nil +} diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index e4c5f3370873..64e97f5ddf10 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -101,14 +101,6 @@ const ( NeqRegEx Operator = "!~" ) -type Selector struct { - // Label to Select - Label string - Op Operator - // Value that is expected - Value string -} - func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *httpLokiClient { tc := client.NewTimedClient(req, metrics.WriteDuration) return &httpLokiClient{ @@ -216,11 +208,8 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) { req.Header.Add("X-Scope-OrgID", c.cfg.TenantID) } } -func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) { +func (c *httpLokiClient) query(ctx context.Context, logQL string, start, end int64) (QueryRes, error) { // Run the pre-flight checks for the query. - if len(selectors) == 0 { - return QueryRes{}, fmt.Errorf("at least one selector required to query") - } if start > end { return QueryRes{}, fmt.Errorf("start time cannot be after end time") } @@ -228,7 +217,7 @@ func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range") values := url.Values{} - values.Set("query", selectorString(selectors)) + values.Set("query", logQL) values.Set("start", fmt.Sprintf("%d", start)) values.Set("end", fmt.Sprintf("%d", end)) @@ -276,35 +265,6 @@ func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, return queryRes, nil } -func selectorString(selectors []Selector) string { - if len(selectors) == 0 { - return "{}" - } - // Build the query selector. - query := "" - for _, s := range selectors { - query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value) - } - // Remove the last comma, as we append one to every selector. - query = query[:len(query)-1] - return "{" + query + "}" -} - -func NewSelector(label, op, value string) (Selector, error) { - if !isValidOperator(op) { - return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op) - } - return Selector{Label: label, Op: Operator(op), Value: value}, nil -} - -func isValidOperator(op string) bool { - switch op { - case "=", "!=", "=~", "!~": - return true - } - return false -} - type Stream struct { Stream map[string]string `json:"stream"` Values [][2]string `json:"values"` diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 82a4e0e48caf..562becf3951a 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -132,18 +132,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) { // so the x-scope-orgid header is set. // client.cfg.TenantID = "" - // Create an array of selectors that should be used for the - // query. - selectors := []Selector{ - {Label: "probe", Op: Eq, Value: "Paris"}, - } + logQL := `{probe="Paris"}` // Define the query time range start := time.Now().Add(-30 * time.Minute).UnixNano() end := time.Now().UnixNano() // Authorized request should not fail against Grafana Cloud. - res, err := client.query(context.Background(), selectors, start, end) + res, err := client.query(context.Background(), logQL, start, end) require.NoError(t, err) require.NotNil(t, res) }) diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index a47c8a9cc71d..53527dff9e80 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "net/url" - "sort" "testing" "time" @@ -16,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" + "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/state" history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model" "github.com/prometheus/client_golang/prometheus" @@ -25,15 +25,15 @@ import ( ) func TestRemoteLokiBackend(t *testing.T) { - t.Run("statesToStreams", func(t *testing.T) { + t.Run("statesToStream", func(t *testing.T) { t.Run("skips non-transitory states", func(t *testing.T) { rule := createTestRule() l := log.NewNopLogger() states := singleFromNormal(&state.State{State: eval.Normal}) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) - require.Empty(t, res) + require.Empty(t, res.Values) }) t.Run("maps evaluation errors", func(t *testing.T) { @@ -41,7 +41,7 @@ func TestRemoteLokiBackend(t *testing.T) { l := log.NewNopLogger() states := singleFromNormal(&state.State{State: eval.Error, Error: fmt.Errorf("oh no")}) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) entry := requireSingleEntry(t, res) require.Contains(t, entry.Error, "oh no") @@ -52,7 +52,7 @@ func TestRemoteLokiBackend(t *testing.T) { l := log.NewNopLogger() states := singleFromNormal(&state.State{State: eval.NoData}) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) _ = requireSingleEntry(t, res) }) @@ -65,55 +65,16 @@ func TestRemoteLokiBackend(t *testing.T) { Labels: data.Labels{"a": "b"}, }) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) - require.Len(t, res, 1) exp := map[string]string{ StateHistoryLabelKey: StateHistoryLabelValue, "folderUID": rule.NamespaceUID, "group": rule.Group, "orgID": fmt.Sprint(rule.OrgID), "ruleUID": rule.UID, - "a": "b", } - require.Equal(t, exp, res[0].Stream) - }) - - t.Run("groups streams based on combined labels", func(t *testing.T) { - rule := createTestRule() - l := log.NewNopLogger() - states := []state.StateTransition{ - { - PreviousState: eval.Normal, - State: &state.State{ - State: eval.Alerting, - Labels: data.Labels{"a": "b"}, - }, - }, - { - PreviousState: eval.Normal, - State: &state.State{ - State: eval.Alerting, - Labels: data.Labels{"a": "b"}, - }, - }, - { - PreviousState: eval.Normal, - State: &state.State{ - State: eval.Alerting, - Labels: data.Labels{"c": "d"}, - }, - }, - } - - res := statesToStreams(rule, states, nil, l) - - require.Len(t, res, 2) - sort.Slice(res, func(i, j int) bool { return len(res[i].Values) > len(res[j].Values) }) - require.Contains(t, res[0].Stream, "a") - require.Len(t, res[0].Values, 2) - require.Contains(t, res[1].Stream, "c") - require.Len(t, res[1].Values, 1) + require.Equal(t, exp, res.Stream) }) t.Run("excludes private labels", func(t *testing.T) { @@ -124,10 +85,9 @@ func TestRemoteLokiBackend(t *testing.T) { Labels: data.Labels{"__private__": "b"}, }) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) - require.Len(t, res, 1) - require.NotContains(t, res[0].Stream, "__private__") + require.NotContains(t, res.Stream, "__private__") }) t.Run("includes instance labels in log line", func(t *testing.T) { @@ -138,7 +98,7 @@ func TestRemoteLokiBackend(t *testing.T) { Labels: data.Labels{"statelabel": "labelvalue"}, }) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) entry := requireSingleEntry(t, res) require.Contains(t, entry.InstanceLabels, "statelabel") @@ -156,7 +116,7 @@ func TestRemoteLokiBackend(t *testing.T) { }, }) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) entry := requireSingleEntry(t, res) require.Len(t, entry.InstanceLabels, 3) @@ -170,7 +130,7 @@ func TestRemoteLokiBackend(t *testing.T) { Values: map[string]float64{"A": 2.0, "B": 5.5}, }) - res := statesToStreams(rule, states, nil, l) + res := statesToStream(rule, states, nil, l) entry := requireSingleEntry(t, res) require.NotNil(t, entry.Values) @@ -180,6 +140,70 @@ func TestRemoteLokiBackend(t *testing.T) { require.InDelta(t, 5.5, entry.Values.Get("B").MustFloat64(), 1e-4) }) }) + + t.Run("selector string", func(t *testing.T) { + selectors := []Selector{{"name", "=", "Bob"}, {"age", "=~", "30"}} + expected := "{name=\"Bob\",age=~\"30\"}" + result := selectorString(selectors) + require.Equal(t, expected, result) + + selectors = []Selector{} + expected = "{}" + result = selectorString(selectors) + require.Equal(t, expected, result) + }) + + t.Run("new selector", func(t *testing.T) { + selector, err := NewSelector("label", "=", "value") + require.NoError(t, err) + require.Equal(t, "label", selector.Label) + require.Equal(t, Eq, selector.Op) + require.Equal(t, "value", selector.Value) + + selector, err = NewSelector("label", "invalid", "value") + require.Error(t, err) + }) + + t.Run("buildLogQuery", func(t *testing.T) { + cases := []struct { + name string + query models.HistoryQuery + exp string + }{ + { + name: "default includes state history label and orgID label", + query: models.HistoryQuery{}, + exp: `{orgID="0",from="state-history"}`, + }, + { + name: "adds stream label filter for ruleUID and orgID", + query: models.HistoryQuery{ + RuleUID: "rule-uid", + OrgID: 123, + }, + exp: `{orgID="123",from="state-history",ruleUID="rule-uid"}`, + }, + { + name: "filters instance labels in log line", + query: models.HistoryQuery{ + OrgID: 123, + Labels: map[string]string{ + "customlabel": "customvalue", + "labeltwo": "labelvaluetwo", + }, + }, + exp: `{orgID="123",from="state-history"} | json | labels_customlabel="customvalue" | labels_labeltwo="labelvaluetwo"`, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res, err := buildLogQuery(tc.query) + require.NoError(t, err) + require.Equal(t, tc.exp, res) + }) + } + }) } func TestMerge(t *testing.T) { @@ -428,10 +452,9 @@ func createTestRule() history_model.RuleMeta { } } -func requireSingleEntry(t *testing.T, res []stream) lokiEntry { - require.Len(t, res, 1) - require.Len(t, res[0].Values, 1) - return requireEntry(t, res[0].Values[0]) +func requireSingleEntry(t *testing.T, res stream) lokiEntry { + require.Len(t, res.Values, 1) + return requireEntry(t, res.Values[0]) } func requireEntry(t *testing.T, row row) lokiEntry {