@@ -11,10 +11,12 @@ import (
11
11
"sync"
12
12
13
13
"connectrpc.com/connect"
14
-
14
+ "github.com/go-kit/log"
15
15
"github.com/gorilla/mux"
16
+ pyroutil "github.com/grafana/alloy/internal/component/pyroscope/util"
16
17
"github.com/prometheus/client_golang/prometheus"
17
18
"github.com/prometheus/prometheus/model/labels"
19
+ "go.opentelemetry.io/otel/trace"
18
20
19
21
"github.com/grafana/alloy/internal/component"
20
22
fnet "github.com/grafana/alloy/internal/component/common/net"
@@ -35,7 +37,8 @@ func init() {
35
37
Stability : featuregate .StabilityGenerallyAvailable ,
36
38
Args : Arguments {},
37
39
Build : func (opts component.Options , args component.Arguments ) (component.Component , error ) {
38
- return New (opts , args .(Arguments ))
40
+ tracer := opts .Tracer .Tracer ("pyroscope.receive_http" )
41
+ return New (opts .Logger , tracer , opts .Registerer , args .(Arguments ))
39
42
},
40
43
})
41
44
}
@@ -54,20 +57,22 @@ func (a *Arguments) SetToDefault() {
54
57
}
55
58
56
59
type Component struct {
57
- opts component.Options
58
60
server * fnet.TargetServer
59
61
serverConfig * fnet.HTTPConfig
60
62
uncheckedCollector * util.UncheckedCollector
61
63
appendables []pyroscope.Appendable
62
64
mut sync.Mutex
65
+ logger log.Logger
66
+ tracer trace.Tracer
63
67
}
64
68
65
- func New (opts component. Options , args Arguments ) (* Component , error ) {
69
+ func New (logger log. Logger , tracer trace. Tracer , reg prometheus. Registerer , args Arguments ) (* Component , error ) {
66
70
uncheckedCollector := util .NewUncheckedCollector (nil )
67
- opts . Registerer .MustRegister (uncheckedCollector )
71
+ reg .MustRegister (uncheckedCollector )
68
72
69
73
c := & Component {
70
- opts : opts ,
74
+ logger : logger ,
75
+ tracer : tracer ,
71
76
uncheckedCollector : uncheckedCollector ,
72
77
appendables : args .ForwardTo ,
73
78
}
@@ -87,7 +92,7 @@ func (c *Component) Run(ctx context.Context) error {
87
92
}()
88
93
89
94
<- ctx .Done ()
90
- level .Info (c .opts . Logger ).Log ("msg" , "terminating due to context done" )
95
+ level .Info (c .logger ).Log ("msg" , "terminating due to context done" )
91
96
return nil
92
97
}
93
98
@@ -122,7 +127,7 @@ func (c *Component) update(args component.Arguments) (bool, error) {
122
127
serverRegistry := prometheus .NewRegistry ()
123
128
c .uncheckedCollector .SetCollector (serverRegistry )
124
129
125
- srv , err := fnet .NewTargetServer (c .opts . Logger , "pyroscope_receive_http" , serverRegistry , newArgs .Server )
130
+ srv , err := fnet .NewTargetServer (c .logger , "pyroscope_receive_http" , serverRegistry , newArgs .Server )
126
131
if err != nil {
127
132
return shutdown , fmt .Errorf ("failed to create server: %w" , err )
128
133
}
@@ -163,6 +168,10 @@ func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRe
163
168
164
169
appendables := c .getAppendables ()
165
170
171
+ ctx , sp := c .tracer .Start (ctx , "/push.v1.PusherService/Push" )
172
+ defer sp .End ()
173
+ l := pyroutil .TraceLog (c .logger , sp )
174
+
166
175
var wg sync.WaitGroup
167
176
var errs error
168
177
var errorMut sync.Mutex
@@ -182,7 +191,7 @@ func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRe
182
191
lbls := ensureServiceName (lb .Labels ())
183
192
err := appendable .Append (ctx , lbls , apiToAlloySamples (req .Msg .Series [idx ].Samples ))
184
193
if err != nil {
185
- util .ErrorsJoinConcurrent (
194
+ pyroutil .ErrorsJoinConcurrent (
186
195
& errs ,
187
196
fmt .Errorf ("unable to append series %s to appendable %d: %w" , lb .Labels ().String (), i , err ),
188
197
& errorMut ,
@@ -193,11 +202,10 @@ func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRe
193
202
}
194
203
wg .Wait ()
195
204
if errs != nil {
196
- level .Error ( c . opts . Logger ).Log ("msg" , "Failed to forward profiles requests" , "err" , errs )
205
+ level .Warn ( l ).Log ("msg" , "Failed to forward profiles requests" , "err" , errs )
197
206
return nil , connect .NewError (connect .CodeInternal , errs )
198
207
}
199
208
200
- level .Debug (c .opts .Logger ).Log ("msg" , "Profiles successfully forwarded" )
201
209
return connect .NewResponse (& pushv1.PushResponse {}), nil
202
210
}
203
211
@@ -211,12 +219,19 @@ func (c *Component) getAppendables() []pyroscope.Appendable {
211
219
func (c * Component ) handleIngest (w http.ResponseWriter , r * http.Request ) {
212
220
appendables := c .getAppendables ()
213
221
222
+ ctx := r .Context ()
223
+
224
+ ctx , sp := c .tracer .Start (ctx , "/ingest" )
225
+ defer sp .End ()
226
+
227
+ l := pyroutil .TraceLog (c .logger , sp )
228
+
214
229
// Parse labels early
215
230
var lbls labels.Labels
216
231
if nameParam := r .URL .Query ().Get ("name" ); nameParam != "" {
217
232
ls , err := labelset .Parse (nameParam )
218
233
if err != nil {
219
- level .Warn (c . opts . Logger ).Log (
234
+ level .Warn (l ).Log (
220
235
"msg" , "Failed to parse labels from name parameter" ,
221
236
"name" , nameParam ,
222
237
"err" , err ,
@@ -229,7 +244,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
229
244
}
230
245
lbls = labels .New (labelPairs ... )
231
246
}
232
- }
247
+ } // todo this is a required parameter, treat absence as error
233
248
234
249
// Ensure service_name label is set
235
250
lbls = ensureServiceName (lbls )
@@ -239,7 +254,7 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
239
254
// but means the entire profile will be held in memory
240
255
var buf bytes.Buffer
241
256
if _ , err := io .Copy (& buf , r .Body ); err != nil {
242
- level .Error ( c . opts . Logger ).Log ("msg" , "Failed to read request body" , "err" , err )
257
+ level .Warn ( l ).Log ("msg" , "Failed to read request body" , "err" , err )
243
258
w .WriteHeader (http .StatusInternalServerError )
244
259
return
245
260
}
@@ -260,23 +275,21 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
260
275
Labels : lbls ,
261
276
}
262
277
263
- if err := appendable .Appender ().AppendIngest (r . Context () , profile ); err != nil {
264
- level . Error ( c . opts . Logger ). Log ( "msg" , "Failed to append profile" , " appendable" , i , "err" , err )
265
- util .ErrorsJoinConcurrent (& errs , err , & errorMut )
278
+ if err := appendable .Appender ().AppendIngest (ctx , profile ); err != nil {
279
+ err = fmt . Errorf ( "failed to ingest profile to appendable %d: %w " , i , err )
280
+ pyroutil .ErrorsJoinConcurrent (& errs , err , & errorMut )
266
281
}
267
-
268
- level .Debug (c .opts .Logger ).Log ("msg" , "Profile appended successfully" , "appendable" , i )
269
282
}()
270
283
}
271
284
272
285
wg .Wait ()
273
286
274
287
if errs != nil {
288
+ level .Warn (l ).Log ("msg" , "Failed to ingest profiles" , "err" , errs )
275
289
var writeErr * write.PyroscopeWriteError
276
290
if errors .As (errs , & writeErr ) {
277
291
http .Error (w , http .StatusText (writeErr .StatusCode ), writeErr .StatusCode )
278
292
} else {
279
- level .Error (c .opts .Logger ).Log ("msg" , "Failed to process request" , "err" , errs )
280
293
http .Error (w , "Failed to process request" , http .StatusInternalServerError )
281
294
}
282
295
return
0 commit comments