-
Notifications
You must be signed in to change notification settings - Fork 479
/
write.go
127 lines (106 loc) · 3.06 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package write
import (
"context"
"fmt"
"sync"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/client"
"github.com/grafana/agent/pkg/build"
)
var streamLagLabels = []string{"filename"}
func init() {
component.Register(component.Registration{
Name: "loki.write",
Args: Arguments{},
Exports: Exports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
},
})
client.UserAgent = fmt.Sprintf("GrafanaAgent/%s", build.Version)
}
// Arguments holds values which are used to configure the loki.write component.
type Arguments struct {
Endpoints []EndpointOptions `river:"endpoint,block,optional"`
ExternalLabels map[string]string `river:"external_labels,attr,optional"`
MaxStreams int `river:"max_streams,attr,optional"`
}
// Exports holds the receiver that is used to send log entries to the
// loki.write component.
type Exports struct {
Receiver loki.LogsReceiver `river:"receiver,attr"`
}
var (
_ component.Component = (*Component)(nil)
)
// Component implements the loki.write component.
type Component struct {
opts component.Options
metrics *client.Metrics
mut sync.RWMutex
args Arguments
receiver loki.LogsReceiver
clients []client.Client
}
// New creates a new loki.write component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
metrics: client.NewMetrics(o.Registerer, streamLagLabels),
}
// Create and immediately export the receiver which remains the same for
// the component's lifetime.
c.receiver = loki.NewLogsReceiver()
o.OnStateChange(Exports{Receiver: c.receiver})
// Call to Update() to start readers and set receivers once at the start.
if err := c.Update(args); err != nil {
return nil, err
}
return c, nil
}
// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case entry := <-c.receiver.Chan():
for _, client := range c.clients {
if client != nil {
select {
case <-ctx.Done():
return nil
case client.Chan() <- entry:
// no-op
}
}
}
}
}
}
// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)
c.mut.Lock()
defer c.mut.Unlock()
c.args = newArgs
for _, client := range c.clients {
if client != nil {
client.Stop()
}
}
c.clients = make([]client.Client, len(newArgs.Endpoints))
cfgs := newArgs.convertClientConfigs()
// TODO (@tpaschalis) We could use a client.NewMulti here to push the
// fanout logic back to the client layer, but I opted to keep it explicit
// here a) for easier debugging and b) possible improvements in the future.
for _, cfg := range cfgs {
client, err := client.New(c.metrics, cfg, streamLagLabels, newArgs.MaxStreams, c.opts.Logger)
if err != nil {
return err
}
c.clients = append(c.clients, client)
}
return nil
}