/
otlp_trace.go
102 lines (85 loc) · 2.65 KB
/
otlp_trace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package route
import (
"context"
"errors"
"net/http"
huskyotlp "github.com/honeycombio/husky/otlp"
"github.com/honeycombio/refinery/types"
collectortrace "go.opentelemetry.io/proto/otlp/collector/trace/v1"
)
func (r *Router) postOTLP(w http.ResponseWriter, req *http.Request) {
ri := huskyotlp.GetRequestInfoFromHttpHeaders(req.Header)
if err := ri.ValidateTracesHeaders(); err != nil {
if errors.Is(err, huskyotlp.ErrInvalidContentType) {
r.handlerReturnWithError(w, ErrInvalidContentType, err)
} else {
r.handlerReturnWithError(w, ErrAuthNeeded, err)
}
return
}
result, err := huskyotlp.TranslateTraceRequestFromReader(req.Body, ri)
if err != nil {
r.handlerReturnWithError(w, ErrUpstreamFailed, err)
return
}
if err := processTraceRequest(req.Context(), r, result.Batches, ri.ApiKey); err != nil {
r.handlerReturnWithError(w, ErrUpstreamFailed, err)
}
}
type TraceServer struct {
router *Router
collectortrace.UnimplementedTraceServiceServer
}
func NewTraceServer(router *Router) *TraceServer {
traceServer := TraceServer{router: router}
return &traceServer
}
func (t *TraceServer) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) {
ri := huskyotlp.GetRequestInfoFromGrpcMetadata(ctx)
if err := ri.ValidateTracesHeaders(); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}
result, err := huskyotlp.TranslateTraceRequest(req, ri)
if err != nil {
return nil, huskyotlp.AsGRPCError(err)
}
if err := processTraceRequest(ctx, t.router, result.Batches, ri.ApiKey); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}
return &collectortrace.ExportTraceServiceResponse{}, nil
}
func processTraceRequest(
ctx context.Context,
router *Router,
batches []huskyotlp.Batch,
apiKey string) error {
var requestID types.RequestIDContextKey
apiHost, err := router.Config.GetHoneycombAPI()
if err != nil {
router.Logger.Error().Logf("Unable to retrieve APIHost from config while processing OTLP batch")
return err
}
// get environment name - will be empty for legacy keys
environment, err := router.getEnvironmentName(apiKey)
if err != nil {
return nil
}
for _, batch := range batches {
for _, ev := range batch.Events {
event := &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: batch.Dataset,
Environment: environment,
SampleRate: uint(ev.SampleRate),
Timestamp: ev.Timestamp,
Data: ev.Attributes,
}
if err = router.processEvent(event, requestID); err != nil {
router.Logger.Error().Logf("Error processing event: " + err.Error())
}
}
}
return nil
}