/
loki.go
149 lines (123 loc) · 4.3 KB
/
loki.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Package loki provides an otelcol.receiver.loki component.
package loki
import (
"context"
"path"
"strings"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
loki_translator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
)
func init() {
component.Register(component.Registration{
Name: "otelcol.receiver.loki",
Args: Arguments{},
Exports: Exports{},
Build: func(o component.Options, a component.Arguments) (component.Component, error) {
return New(o, a.(Arguments))
},
})
}
var hintAttributes = "loki.attribute.labels"
// Arguments configures the otelcol.receiver.loki component.
type Arguments struct {
// Output configures where to send received data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}
// Exports holds the receiver that is used to send log entries to the
// loki.write component.
type Exports struct {
Receiver loki.LogsReceiver `river:"receiver,attr"`
}
// Component is the otelcol.receiver.loki component.
type Component struct {
log log.Logger
opts component.Options
mut sync.RWMutex
receiver loki.LogsReceiver
logsSink consumer.Logs
}
var _ component.Component = (*Component)(nil)
// New creates a new otelcol.receiver.loki component.
func New(o component.Options, c Arguments) (*Component, error) {
// TODO(@tpaschalis) Create a metrics struct to count
// total/successful/errored log entries?
res := &Component{
log: o.Logger,
opts: o,
}
// Create and immediately export the receiver which remains the same for
// the component's lifetime.
res.receiver = loki.NewLogsReceiver()
o.OnStateChange(Exports{Receiver: res.receiver})
if err := res.Update(c); err != nil {
return nil, err
}
return res, nil
}
// Run implements Component.
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver.Chan():
logs := convertLokiEntryToPlog(entry)
// TODO(@tpaschalis) Is there any more handling to be done here?
err := c.logsSink.ConsumeLogs(ctx, logs)
if err != nil {
level.Error(c.opts.Logger).Log("msg", "failed to consume log entries", "err", err)
}
}
}
}
// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()
cfg := newConfig.(Arguments)
c.logsSink = fanoutconsumer.Logs(cfg.Output.Logs)
return nil
}
// Create a new Otlp Logs entry from a Promtail entry
func convertLokiEntryToPlog(lokiEntry loki.Entry) plog.Logs {
logs := plog.NewLogs()
lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
if filename, exists := lokiEntry.Labels["filename"]; exists {
filenameStr := string(filename)
// The `promtailreceiver` from the opentelemetry-collector-contrib
// repo adds these two labels based on these "semantic conventions
// for log media".
// https://opentelemetry.io/docs/reference/specification/logs/semantic_conventions/media/
// We're keeping them as well, but we're also adding the `filename`
// attribute so that it can be used from the
// `loki.attribute.labels` hint for when the opposite OTel -> Loki
// transformation happens.
lr.Attributes().PutStr("log.file.path", filenameStr)
lr.Attributes().PutStr("log.file.name", path.Base(filenameStr))
// TODO(@tpaschalis) Remove the addition of "log.file.path" and "log.file.name",
// because the Collector doesn't do it and we would be more in line with it.
}
var lbls []string
for key := range lokiEntry.Labels {
keyStr := string(key)
lbls = append(lbls, keyStr)
}
if len(lbls) > 0 {
// This hint is defined in the pkg/translator/loki package and the
// opentelemetry-collector-contrib repo, but is not exported so we
// re-define it.
// It is used to detect which attributes should be promoted to labels
// when transforming back from OTel -> Loki.
lr.Attributes().PutStr(hintAttributes, strings.Join(lbls, ","))
}
loki_translator.ConvertEntryToLogRecord(&lokiEntry.Entry, &lr, lokiEntry.Labels, true)
return logs
}