-
Notifications
You must be signed in to change notification settings - Fork 480
/
push_translation.go
96 lines (82 loc) · 3.57 KB
/
push_translation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package gcplogtarget
// This code is copied from Promtail. The gcplogtarget package is used to
// configure and run the targets that can read log entries from cloud resource
// logs like bucket logs, load balancer logs, and Kubernetes cluster logs
// from GCP.
import (
"encoding/base64"
"fmt"
"strings"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
)
// ReservedLabelTenantID reserved to override the tenant ID while processing
// pipeline stages
const ReservedLabelTenantID = "__tenant_id__"
// PushMessage is the POST body format sent by GCP PubSub push subscriptions.
// See https://cloud.google.com/pubsub/docs/push for details.
type PushMessage struct {
Message struct {
Attributes map[string]string `json:"attributes"`
Data string `json:"data"`
ID string `json:"message_id"`
PublishTimestamp string `json:"publish_time"`
} `json:"message"`
Subscription string `json:"subscription"`
}
// Validate checks that the required fields of a PushMessage are set.
func (pm PushMessage) Validate() error {
if pm.Message.Data == "" {
return fmt.Errorf("push message has no data")
}
if pm.Message.ID == "" {
return fmt.Errorf("push message has no ID")
}
if pm.Subscription == "" {
return fmt.Errorf("push message has no subscription")
}
return nil
}
// translate converts a GCP PushMessage into a loki.Entry. It parses the
// push-specific labels and delegates the rest to parseGCPLogsEntry.
func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (loki.Entry, error) {
// Collect all push-specific labels. Every one of them is first configured
// as optional, and the user can relabel it if needed. The relabeling and
// internal drop is handled in parseGCPLogsEntry.
lbs := labels.NewBuilder(nil)
lbs.Set("__gcp_message_id", m.Message.ID)
lbs.Set("__gcp_subscription_name", m.Subscription)
for k, v := range m.Message.Attributes {
lbs.Set(fmt.Sprintf("__gcp_attributes_%s", convertToLokiCompatibleLabel(k)), v)
}
// Add fixed labels coming from the target configuration
fixedLabels := other.Clone()
// If the incoming request carries the tenant id, inject it as the reserved
// label, so it's used by the remote write client.
if xScopeOrgID != "" {
// Expose tenant ID through relabel to use as logs or metrics label.
lbs.Set(ReservedLabelTenantID, xScopeOrgID)
fixedLabels[ReservedLabelTenantID] = model.LabelValue(xScopeOrgID)
}
decodedData, err := base64.StdEncoding.DecodeString(m.Message.Data)
if err != nil {
return loki.Entry{}, fmt.Errorf("failed to decode data: %w", err)
}
entry, err := parseGCPLogsEntry(decodedData, fixedLabels, lbs.Labels(nil), useIncomingTimestamp, relabelConfigs)
if err != nil {
return loki.Entry{}, fmt.Errorf("failed to parse logs entry: %w", err)
}
return entry, nil
}
var separatorCharacterReplacer = strings.NewReplacer(".", "_", "-", "_", "/", "_")
// convertToLokiCompatibleLabel converts an incoming GCP Push message label to
// a loki compatible format. There are labels such as
// `logging.googleapis.com/timestamp`, which contain non-loki-compatible
// characters, which is just alphanumeric and _. The approach taken is to
// translate every non-alphanumeric separator character to an underscore.
func convertToLokiCompatibleLabel(label string) string {
return util.SnakeCase(separatorCharacterReplacer.Replace(label))
}