Skip to content

Commit

Permalink
Alerting: No longer index state history log streams by instance labels (
Browse files Browse the repository at this point in the history
#65474)

* Remove private labels

* No longer index by instance labels

* Labels are now invariant, only build them once

* Remove bucketing since everything is in a single stream

* Refactor statesToStreams to only return a single unified log stream

* Don't query on labels that no longer exist

* Move selector logic to loki layer, genericize client to work in terms of straight logQL

* Add support for line-level label filters in query

* Combine existing selector tests for better parallelism

* Tests for logQL construction

* Underscore instead of dot for unwrapping labels in logql

(cherry picked from commit a416100)
  • Loading branch information
alexweav committed Mar 29, 2023
1 parent 266a740 commit 8c89db4
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 161 deletions.
139 changes: 83 additions & 56 deletions pkg/services/ngalert/state/historian/loki.go
Expand Up @@ -40,7 +40,7 @@ const (
type remoteLokiClient interface {
ping(context.Context) error
push(context.Context, []stream) error
query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error)
query(ctx context.Context, logQL string, start, end int64) (QueryRes, error)
}

// RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
Expand Down Expand Up @@ -70,10 +70,10 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
streams := statesToStreams(rule, states, h.externalLabels, logger)
logStream := statesToStream(rule, states, h.externalLabels, logger)

errCh := make(chan error, 1)
if len(streams) == 0 {
if len(logStream.Values) == 0 {
close(errCh)
return errCh
}
Expand All @@ -83,16 +83,12 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM

org := fmt.Sprint(rule.OrgID)
h.metrics.WritesTotal.WithLabelValues(org, "loki").Inc()
samples := 0
for _, s := range streams {
samples += len(s.Values)
}
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(samples))
h.metrics.TransitionsTotal.WithLabelValues(org).Add(float64(len(logStream.Values)))

if err := h.recordStreams(ctx, streams, logger); err != nil {
if err := h.recordStreams(ctx, []stream{logStream}, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
h.metrics.WritesFailed.WithLabelValues(org, "loki").Inc()
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(samples))
h.metrics.TransitionsFailed.WithLabelValues(org).Add(float64(len(logStream.Values)))
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}()
Expand All @@ -101,21 +97,21 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM

// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
selectors, err := buildSelectors(query)
logQL, err := buildLogQuery(query)
if err != nil {
return nil, fmt.Errorf("failed to build the provided selectors: %w", err)
return nil, err
}
// Timestamps are expected in RFC3339Nano.
res, err := h.client.query(ctx, selectors, query.From.UnixNano(), query.To.UnixNano())
res, err := h.client.query(ctx, logQL, query.From.UnixNano(), query.To.UnixNano())
if err != nil {
return nil, err
}
return merge(res, query.RuleUID)
}

func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
// +2 as OrgID and the state history label will always be selectors at the API level.
selectors := make([]Selector, len(query.Labels)+2)
// OrgID and the state history label are static and will be included in all queries.
selectors := make([]Selector, 2)

// Set the predefined selector orgID.
selector, err := NewSelector(OrgIDLabel, "=", fmt.Sprintf("%d", query.OrgID))
Expand All @@ -131,17 +127,6 @@ func buildSelectors(query models.HistoryQuery) ([]Selector, error) {
}
selectors[1] = selector

// Set the label selectors
i := 2
for label, val := range query.Labels {
selector, err = NewSelector(label, "=", val)
if err != nil {
return nil, err
}
selectors[i] = selector
i++
}

// Set the optional special selector rule_id
if query.RuleUID != "" {
rsel, err := NewSelector(RuleUIDLabel, "=", query.RuleUID)
Expand Down Expand Up @@ -237,34 +222,29 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
return frame, nil
}

func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream {
buckets := make(map[string][]row) // label repr (JSON) -> entries
func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) stream {
labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)

samples := make([]row, 0, len(states))
for _, state := range states {
if !shouldRecord(state) {
continue
}

labels := mergeLabels(removePrivateLabels(state.State.Labels), externalLabels)
labels[StateHistoryLabelKey] = StateHistoryLabelValue
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
lblJsn, err := json.Marshal(labels)
if err != nil {
logger.Error("Failed to marshal labels to JSON", "error", err)
continue
}
repr := string(lblJsn)

entry := lokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
Values: valuesAsDataBlob(state.State),
DashboardUID: rule.DashboardUID,
PanelID: rule.PanelID,
InstanceLabels: state.Labels,
InstanceLabels: removePrivateLabels(state.Labels),
}
if state.State.State == eval.Error {
entry.Error = state.Error.Error()
Expand All @@ -277,26 +257,16 @@ func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition
}
line := string(jsn)

buckets[repr] = append(buckets[repr], row{
samples = append(samples, row{
At: state.State.LastEvaluationTime,
Val: line,
})
}

result := make([]stream, 0, len(buckets))
for repr, rows := range buckets {
labels, err := data.LabelsFromString(repr)
if err != nil {
logger.Error("Failed to parse frame labels, skipping state history batch: %w", err)
continue
}
result = append(result, stream{
Stream: labels,
Values: rows,
})
return stream{
Stream: labels,
Values: samples,
}

return result
}

func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
Expand Down Expand Up @@ -339,3 +309,60 @@ func jsonifyRow(line string) (json.RawMessage, error) {
}
return json.Marshal(entry)
}

type Selector struct {
// Label to Select
Label string
Op Operator
// Value that is expected
Value string
}

func NewSelector(label, op, value string) (Selector, error) {
if !isValidOperator(op) {
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
}
return Selector{Label: label, Op: Operator(op), Value: value}, nil
}

func selectorString(selectors []Selector) string {
if len(selectors) == 0 {
return "{}"
}
// Build the query selector.
query := ""
for _, s := range selectors {
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
}
// Remove the last comma, as we append one to every selector.
query = query[:len(query)-1]
return "{" + query + "}"
}

func isValidOperator(op string) bool {
switch op {
case "=", "!=", "=~", "!~":
return true
}
return false
}

func buildLogQuery(query models.HistoryQuery) (string, error) {
selectors, err := buildSelectors(query)
if err != nil {
return "", fmt.Errorf("failed to build the provided selectors: %w", err)
}

logQL := selectorString(selectors)

labelFilters := ""
for k, v := range query.Labels {
labelFilters += fmt.Sprintf(" | labels_%s=%q", k, v)
}

if labelFilters != "" {
logQL = fmt.Sprintf("%s | json%s", logQL, labelFilters)
}

return logQL, nil
}
44 changes: 2 additions & 42 deletions pkg/services/ngalert/state/historian/loki_http.go
Expand Up @@ -101,14 +101,6 @@ const (
NeqRegEx Operator = "!~"
)

type Selector struct {
// Label to Select
Label string
Op Operator
// Value that is expected
Value string
}

func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger) *httpLokiClient {
tc := client.NewTimedClient(req, metrics.WriteDuration)
return &httpLokiClient{
Expand Down Expand Up @@ -216,19 +208,16 @@ func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
}
}
func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start, end int64) (QueryRes, error) {
func (c *httpLokiClient) query(ctx context.Context, logQL string, start, end int64) (QueryRes, error) {
// Run the pre-flight checks for the query.
if len(selectors) == 0 {
return QueryRes{}, fmt.Errorf("at least one selector required to query")
}
if start > end {
return QueryRes{}, fmt.Errorf("start time cannot be after end time")
}

queryURL := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/query_range")

values := url.Values{}
values.Set("query", selectorString(selectors))
values.Set("query", logQL)
values.Set("start", fmt.Sprintf("%d", start))
values.Set("end", fmt.Sprintf("%d", end))

Expand Down Expand Up @@ -276,35 +265,6 @@ func (c *httpLokiClient) query(ctx context.Context, selectors []Selector, start,
return queryRes, nil
}

func selectorString(selectors []Selector) string {
if len(selectors) == 0 {
return "{}"
}
// Build the query selector.
query := ""
for _, s := range selectors {
query += fmt.Sprintf("%s%s%q,", s.Label, s.Op, s.Value)
}
// Remove the last comma, as we append one to every selector.
query = query[:len(query)-1]
return "{" + query + "}"
}

func NewSelector(label, op, value string) (Selector, error) {
if !isValidOperator(op) {
return Selector{}, fmt.Errorf("'%s' is not a valid query operator", op)
}
return Selector{Label: label, Op: Operator(op), Value: value}, nil
}

func isValidOperator(op string) bool {
switch op {
case "=", "!=", "=~", "!~":
return true
}
return false
}

type Stream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
Expand Down
8 changes: 2 additions & 6 deletions pkg/services/ngalert/state/historian/loki_http_test.go
Expand Up @@ -132,18 +132,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) {
// so the x-scope-orgid header is set.
// client.cfg.TenantID = "<your_tenant_id>"

// Create an array of selectors that should be used for the
// query.
selectors := []Selector{
{Label: "probe", Op: Eq, Value: "Paris"},
}
logQL := `{probe="Paris"}`

// Define the query time range
start := time.Now().Add(-30 * time.Minute).UnixNano()
end := time.Now().UnixNano()

// Authorized request should not fail against Grafana Cloud.
res, err := client.query(context.Background(), selectors, start, end)
res, err := client.query(context.Background(), logQL, start, end)
require.NoError(t, err)
require.NotNil(t, res)
})
Expand Down

0 comments on commit 8c89db4

Please sign in to comment.