forked from grafana/loki
/
pack.go
220 lines (194 loc) · 5.48 KB
/
pack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package stages
import (
"bytes"
"errors"
"fmt"
"reflect"
"sort"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
json "github.com/json-iterator/go"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logqlmodel"
)
var (
reallyTrue = true
reallyFalse = false
)
type Packed struct {
Labels map[string]string `json:",inline"`
Entry string `json:"_entry"`
}
// UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field
func (w *Packed) UnmarshalJSON(data []byte) error {
m := &map[string]interface{}{}
err := json.Unmarshal(data, m)
if err != nil {
return err
}
w.Labels = map[string]string{}
for k, v := range *m {
// _entry key goes to the Entry field, everything else becomes a label
if k == logqlmodel.PackedEntryKey {
if s, ok := v.(string); ok {
w.Entry = s
} else {
return errors.New("failed to unmarshal json, all values must be of type string")
}
} else {
if s, ok := v.(string); ok {
w.Labels[k] = s
} else {
return errors.New("failed to unmarshal json, all values must be of type string")
}
}
}
return nil
}
// MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (w Packed) MarshalJSON() ([]byte, error) {
// Marshal the entry to properly escape if it's json or contains quotes
b, err := json.Marshal(w.Entry)
if err != nil {
return nil, err
}
// Creating a map and marshalling from a map results in a non deterministic ordering of the resulting json object
// This is functionally ok but really annoying to humans and automated tests.
// Instead we will build the json ourselves after sorting all the labels to get a consistent output
keys := make([]string, 0, len(w.Labels))
for k := range w.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
var buf bytes.Buffer
buf.WriteString("{")
for i, k := range keys {
if i != 0 {
buf.WriteString(",")
}
// marshal key
key, err := json.Marshal(k)
if err != nil {
return nil, err
}
buf.Write(key)
buf.WriteString(":")
// marshal value
val, err := json.Marshal(w.Labels[k])
if err != nil {
return nil, err
}
buf.Write(val)
}
// Only add the comma if something exists in the buffer other than "{"
if buf.Len() > 1 {
buf.WriteString(",")
}
// Add the line entry
buf.WriteString("\"" + logqlmodel.PackedEntryKey + "\":")
buf.Write(b)
buf.WriteString("}")
return buf.Bytes(), nil
}
// PackConfig contains the configuration for a packStage
type PackConfig struct {
Labels []string `mapstrcuture:"labels"`
IngestTimestamp *bool `mapstructure:"ingest_timestamp"`
}
// validatePackConfig validates the PackConfig for the packStage
//
//nolint:unparam // Always returns nil until someone adds more validation and can remove this.
func validatePackConfig(cfg *PackConfig) error {
// Default the IngestTimestamp value to be true
if cfg.IngestTimestamp == nil {
cfg.IngestTimestamp = &reallyTrue
}
return nil
}
// newPackStage creates a DropStage from config
func newPackStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &PackConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validatePackConfig(cfg)
if err != nil {
return nil, err
}
return &packStage{
logger: log.With(logger, "component", "stage", "type", "pack"),
cfg: cfg,
dropCount: getDropCountMetric(registerer),
}, nil
}
// packStage applies Label matchers to determine if the include stages should be run
type packStage struct {
logger log.Logger
cfg *PackConfig
dropCount *prometheus.CounterVec
}
func (m *packStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
out <- m.pack(e)
}
}()
return out
}
func (m *packStage) pack(e Entry) Entry {
lbls := e.Labels
packedLabels := make(map[string]string, len(m.cfg.Labels))
foundLabels := []model.LabelName{}
// Iterate through all the extracted map (which also includes all the labels)
for lk, lv := range e.Extracted {
for _, wl := range m.cfg.Labels {
if lk == wl {
sv, err := getString(lv)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("value for key: '%s' cannot be converted to a string and cannot be packed", lk), "err", err, "type", reflect.TypeOf(lv))
}
continue
}
packedLabels[wl] = sv
foundLabels = append(foundLabels, model.LabelName(lk))
}
}
}
// Embed the extracted labels into the wrapper object
w := Packed{
Labels: packedLabels,
Entry: e.Line,
}
// Marshal to json
wl, err := json.Marshal(w)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", "pack stage failed to marshal packed object to json, packing will be skipped", "err", err)
}
return e
}
// Remove anything found which is also a label, do this after the marshalling to not remove labels until
// we are sure the line can be successfully packed.
for _, fl := range foundLabels {
delete(lbls, fl)
}
// Replace the labels and the line with new values
e.Labels = lbls
e.Line = string(wl)
// If the config says to re-write the timestamp to the ingested time, do that now
if m.cfg.IngestTimestamp != nil && *m.cfg.IngestTimestamp {
e.Timestamp = time.Now()
}
return e
}
// Name implements Stage
func (m *packStage) Name() string {
return StageTypePack
}