/
target_group.go
178 lines (153 loc) · 4.29 KB
/
target_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package docker
import (
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/docker/docker/client"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/relabel"
"github.com/grafana/loki/v3/pkg/util/build"
"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/positions"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"
)
const DockerSource = "Docker"
// targetGroup manages all container targets of one Docker daemon.
type targetGroup struct {
metrics *Metrics
logger log.Logger
positions positions.Positions
entryHandler api.EntryHandler
defaultLabels model.LabelSet
relabelConfig []*relabel.Config
host string
httpClientConfig config.HTTPClientConfig
client client.APIClient
refreshInterval model.Duration
mtx sync.Mutex
targets map[string]*Target
}
func (tg *targetGroup) sync(groups []*targetgroup.Group) {
tg.mtx.Lock()
defer tg.mtx.Unlock()
for _, group := range groups {
if group.Source != DockerSource {
continue
}
for _, t := range group.Targets {
containerID, ok := t[dockerLabelContainerID]
if !ok {
level.Debug(tg.logger).Log("msg", "Docker target did not include container ID")
continue
}
err := tg.addTarget(string(containerID), t)
if err != nil {
level.Error(tg.logger).Log("msg", "could not add target", "containerID", containerID, "err", err)
}
}
}
}
// addTarget checks whether the container with given id is already known. If not it's added to the this group
func (tg *targetGroup) addTarget(id string, discoveredLabels model.LabelSet) error {
if tg.client == nil {
hostURL, err := url.Parse(tg.host)
if err != nil {
return err
}
opts := []client.Opt{
client.WithHost(tg.host),
client.WithAPIVersionNegotiation(),
}
// There are other protocols than HTTP supported by the Docker daemon, like
// unix, which are not supported by the HTTP client. Passing HTTP client
// options to the Docker client makes those non-HTTP requests fail.
if hostURL.Scheme == "http" || hostURL.Scheme == "https" {
rt, err := config.NewRoundTripperFromConfig(tg.httpClientConfig, "docker_sd")
if err != nil {
return err
}
opts = append(opts,
client.WithHTTPClient(&http.Client{
Transport: rt,
Timeout: time.Duration(tg.refreshInterval),
}),
client.WithScheme(hostURL.Scheme),
client.WithHTTPHeaders(map[string]string{
"User-Agent": fmt.Sprintf("loki-promtail/%s", build.Version),
}),
)
}
tg.client, err = client.NewClientWithOpts(opts...)
if err != nil {
level.Error(tg.logger).Log("msg", "could not create new Docker client", "err", err)
return err
}
}
if t, ok := tg.targets[id]; ok {
level.Debug(tg.logger).Log("msg", "container target already exists", "container", id)
t.startIfNotRunning()
return nil
}
t, err := NewTarget(
tg.metrics,
log.With(tg.logger, "target", fmt.Sprintf("docker/%s", id)),
tg.entryHandler,
tg.positions,
id,
discoveredLabels.Merge(tg.defaultLabels),
tg.relabelConfig,
tg.client,
)
if err != nil {
return err
}
tg.targets[id] = t
level.Info(tg.logger).Log("msg", "added Docker target", "containerID", id)
return nil
}
// Ready returns true if at least one target is running.
func (tg *targetGroup) Ready() bool {
tg.mtx.Lock()
defer tg.mtx.Unlock()
for _, t := range tg.targets {
if t.Ready() {
return true
}
}
return true
}
// Stop all targets
func (tg *targetGroup) Stop() {
tg.mtx.Lock()
defer tg.mtx.Unlock()
for _, t := range tg.targets {
t.Stop()
}
tg.entryHandler.Stop()
}
// ActiveTargets return all targets that are ready.
func (tg *targetGroup) ActiveTargets() []target.Target {
tg.mtx.Lock()
defer tg.mtx.Unlock()
result := make([]target.Target, 0, len(tg.targets))
for _, t := range tg.targets {
if t.Ready() {
result = append(result, t)
}
}
return result
}
// AllTargets returns all targets of this group.
func (tg *targetGroup) AllTargets() []target.Target {
result := make([]target.Target, 0, len(tg.targets))
for _, t := range tg.targets {
result = append(result, t)
}
return result
}