-
Notifications
You must be signed in to change notification settings - Fork 4
/
loki_e2e.go
129 lines (113 loc) · 2.98 KB
/
loki_e2e.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Package lokie2e provides scripts for E2E testing Loki API implementation.
package lokie2e
import (
"github.com/go-faster/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"github.com/go-faster/oteldb/internal/logstorage"
"github.com/go-faster/oteldb/internal/otelstorage"
)
// BatchSet is a set of batches.
type BatchSet struct {
Batches []plog.Logs
Labels map[string][]logstorage.Label
Records map[pcommon.Timestamp]plog.LogRecord
Start otelstorage.Timestamp
End otelstorage.Timestamp
}
// NewBatchSet creates and initializes a new BatchSet.
func NewBatchSet() *BatchSet {
s := &BatchSet{}
// Init common labels.
// Should return blank values.
s.Labels = map[string][]logstorage.Label{}
for _, v := range []string{
logstorage.LabelBody,
logstorage.LabelTraceID,
logstorage.LabelSpanID,
logstorage.LabelServiceInstanceID,
logstorage.LabelServiceName,
logstorage.LabelServiceNamespace,
} {
s.Labels[v] = []logstorage.Label{}
}
for _, i := range []plog.SeverityNumber{
plog.SeverityNumberUnspecified,
plog.SeverityNumberTrace,
plog.SeverityNumberDebug,
plog.SeverityNumberInfo,
plog.SeverityNumberWarn,
plog.SeverityNumberError,
plog.SeverityNumberFatal,
} {
s.addLabel(logstorage.Label{
Name: logstorage.LabelSeverity,
Value: i.String(),
Type: int32(pcommon.ValueTypeStr),
})
}
return s
}
// Append appends a batch to the set.
func (s *BatchSet) Append(raw plog.Logs) error {
s.Batches = append(s.Batches, raw)
resLogs := raw.ResourceLogs()
for i := 0; i < resLogs.Len(); i++ {
resLog := resLogs.At(i)
res := resLog.Resource()
s.addLabels(res.Attributes())
scopeLogs := resLog.ScopeLogs()
for i := 0; i < scopeLogs.Len(); i++ {
scopeLog := scopeLogs.At(i)
scope := scopeLog.Scope()
s.addLabels(scope.Attributes())
records := scopeLog.LogRecords()
for i := 0; i < records.Len(); i++ {
record := records.At(i)
if err := s.addRecord(record); err != nil {
return errors.Wrap(err, "add record")
}
s.addLabels(record.Attributes())
}
}
}
return nil
}
func (s *BatchSet) addRecord(record plog.LogRecord) error {
ts := record.Timestamp()
if _, ok := s.Records[ts]; ok {
return errors.Errorf("duplicate record with timestamp %v", ts)
}
if s.Start == 0 || ts < s.Start {
s.Start = ts
}
if ts > s.End {
s.End = ts
}
if s.Records == nil {
s.Records = map[pcommon.Timestamp]plog.LogRecord{}
}
s.Records[ts] = record
return nil
}
func (s *BatchSet) addLabels(m pcommon.Map) {
m.Range(func(k string, v pcommon.Value) bool {
switch t := v.Type(); t {
case pcommon.ValueTypeMap, pcommon.ValueTypeSlice:
default:
s.addLabel(logstorage.Label{
Name: k,
Value: v.AsString(),
Type: int32(t),
})
}
return true
})
}
func (s *BatchSet) addLabel(label logstorage.Label) {
if s.Labels == nil {
s.Labels = map[string][]logstorage.Label{}
}
name := otelstorage.KeyToLabel(label.Name)
s.Labels[name] = append(s.Labels[name], label)
}