-
Notifications
You must be signed in to change notification settings - Fork 486
/
reconciler_metrics.go
206 lines (180 loc) · 5.79 KB
/
reconciler_metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package operator
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"os"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/go-jsonnet"
gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"
"github.com/grafana/agent/pkg/operator/clientutil"
"github.com/grafana/agent/pkg/operator/config"
apps_v1 "k8s.io/api/apps/v1"
core_v1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// createMetricsConfigurationSecret creates the Grafana Agent metrics configuration and stores
// it into a secret.
func (r *reconciler) createMetricsConfigurationSecret(
ctx context.Context,
l log.Logger,
d gragent.Deployment,
) error {
name := fmt.Sprintf("%s-config", d.Agent.Name)
return r.createTelemetryConfigurationSecret(ctx, l, name, d, config.MetricsType)
}
func (r *reconciler) createTelemetryConfigurationSecret(
ctx context.Context,
l log.Logger,
name string,
d gragent.Deployment,
ty config.Type,
) error {
key := types.NamespacedName{
Namespace: d.Agent.Namespace,
Name: name,
}
var shouldCreate bool
switch ty {
case config.MetricsType:
shouldCreate = len(d.Metrics) > 0
case config.LogsType:
shouldCreate = len(d.Logs) > 0
case config.IntegrationsType:
shouldCreate = len(d.Integrations) > 0
default:
return fmt.Errorf("unknown telemetry type %s", ty)
}
// Delete the old Secret if one exists and we have nothing to create.
if !shouldCreate {
var secret core_v1.Secret
return deleteManagedResource(ctx, r.Client, key, &secret)
}
rawConfig, err := config.BuildConfig(&d, ty)
var jsonnetError jsonnet.RuntimeError
if errors.As(err, &jsonnetError) {
// Dump Jsonnet errors to the console to retain newlines and make them
// easier to digest.
fmt.Fprintf(os.Stderr, "%s", jsonnetError.Error())
}
if err != nil {
return fmt.Errorf("unable to build config: %w", err)
}
const maxUncompressed = 100 * 1024 // only compress secrets over 100kB
rawBytes := []byte(rawConfig)
if len(rawBytes) > maxUncompressed {
buf := &bytes.Buffer{}
w := gzip.NewWriter(buf)
if _, err = w.Write(rawBytes); err != nil {
return fmt.Errorf("unable to compress config: %w", err)
}
if err = w.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
}
rawBytes = buf.Bytes()
}
secret := core_v1.Secret{
ObjectMeta: v1.ObjectMeta{
Namespace: key.Namespace,
Name: key.Name,
Labels: r.config.Labels.Merge(managedByOperatorLabels),
OwnerReferences: []v1.OwnerReference{{
APIVersion: d.Agent.APIVersion,
BlockOwnerDeletion: pointer.Bool(true),
Kind: d.Agent.Kind,
Name: d.Agent.Name,
UID: d.Agent.UID,
}},
},
Data: map[string][]byte{"agent.yml": rawBytes},
}
level.Info(l).Log("msg", "reconciling secret", "secret", secret.Name)
err = clientutil.CreateOrUpdateSecret(ctx, r.Client, &secret)
if err != nil {
return fmt.Errorf("failed to reconcile secret: %w", err)
}
return nil
}
// createMetricsGoverningService creates the service that governs the (eventual)
// StatefulSet. It must be created before the StatefulSet.
func (r *reconciler) createMetricsGoverningService(
ctx context.Context,
l log.Logger,
d gragent.Deployment,
) error {
svc := generateMetricsStatefulSetService(r.config, d)
// Delete the old Service if one exists and we have no prometheus instances.
if len(d.Metrics) == 0 {
var service core_v1.Service
key := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
return deleteManagedResource(ctx, r.Client, key, &service)
}
level.Info(l).Log("msg", "reconciling statefulset service", "service", svc.Name)
err := clientutil.CreateOrUpdateService(ctx, r.Client, svc)
if err != nil {
return fmt.Errorf("failed to reconcile statefulset governing service: %w", err)
}
return nil
}
// createMetricsStatefulSets creates a set of Grafana Agent StatefulSets, one per shard.
func (r *reconciler) createMetricsStatefulSets(
ctx context.Context,
l log.Logger,
d gragent.Deployment,
) error {
shards := minShards
if reqShards := d.Agent.Spec.Metrics.Shards; reqShards != nil && *reqShards > 1 {
shards = *reqShards
}
// Keep track of generated stateful sets so we can delete ones that should
// no longer exist.
generated := make(map[string]struct{})
for shard := int32(0); shard < shards; shard++ {
// Don't generate anything if there weren't any instances.
if len(d.Metrics) == 0 {
continue
}
name := d.Agent.Name
if shard > 0 {
name = fmt.Sprintf("%s-shard-%d", name, shard)
}
ss, err := generateMetricsStatefulSet(r.config, name, d, shard)
if err != nil {
return fmt.Errorf("failed to generate statefulset for shard: %w", err)
}
level.Info(l).Log("msg", "reconciling statefulset", "statefulset", ss.Name)
err = clientutil.CreateOrUpdateStatefulSet(ctx, r.Client, ss)
if err != nil {
return fmt.Errorf("failed to reconcile statefulset for shard: %w", err)
}
generated[ss.Name] = struct{}{}
}
// Clean up statefulsets that should no longer exist.
var statefulSets apps_v1.StatefulSetList
err := r.List(ctx, &statefulSets, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
managedByOperatorLabel: managedByOperatorLabelValue,
agentNameLabelName: d.Agent.Name,
}),
})
if err != nil {
return fmt.Errorf("failed to list statefulsets: %w", err)
}
for _, ss := range statefulSets.Items {
if _, keep := generated[ss.Name]; keep || !isManagedResource(&ss) {
continue
}
level.Info(l).Log("msg", "deleting stale statefulset", "name", ss.Name)
if err := r.Delete(ctx, &ss); err != nil {
return fmt.Errorf("failed to delete stale statefulset %s: %w", ss.Name, err)
}
}
return nil
}