-
Notifications
You must be signed in to change notification settings - Fork 510
/
deduper.go
184 lines (162 loc) · 4.74 KB
/
deduper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package frontend
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/go-kit/kit/log"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)
const (
warningTooManySpans = "cannot assign unique span ID, too many spans in the trace"
)
var (
maxSpanID uint64 = 0xffffffffffffffff
)
func Deduper(logger log.Logger) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return spanIDDeduper{
next: next,
logger: logger,
}
})
}
// This is copied over from Jaeger and modified to work for OpenTelemetry Trace data structure
// https://github.com/jaegertracing/jaeger/blob/12bba8c9b91cf4a29d314934bc08f4a80e43c042/model/adjuster/span_id_deduper.go
type spanIDDeduper struct {
next Handler
logger log.Logger
trace *tempopb.Trace
spansByID map[uint64][]*v1.Span
maxUsedID uint64
}
// Do implements Handler
func (s spanIDDeduper) Do(req *http.Request) (*http.Response, error) {
ctx := req.Context()
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.DedupeSpanIDs")
defer span.Finish()
// context propagation
req = req.WithContext(ctx)
resp, err := s.next.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
body, err := io.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return nil, err
}
traceObject := &tempopb.Trace{}
err = proto.Unmarshal(body, traceObject)
if err != nil {
return nil, err
}
s.trace = traceObject
s.dedupe()
traceBytes, err := proto.Marshal(s.trace)
if err != nil {
return nil, err
}
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Header: http.Header{},
ContentLength: resp.ContentLength,
}, nil
}
return resp, nil
}
func (s *spanIDDeduper) dedupe() {
s.groupSpansByID()
s.dedupeSpanIDs()
}
// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (s *spanIDDeduper) groupSpansByID() {
spansByID := make(map[uint64][]*v1.Span)
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
id := binary.BigEndian.Uint64(span.SpanId)
if spans, ok := spansByID[id]; ok {
// TODO maybe return an error if more than 2 spans found
spansByID[id] = append(spans, span)
} else {
spansByID[id] = []*v1.Span{span}
}
}
}
}
s.spansByID = spansByID
}
func (s *spanIDDeduper) isSharedWithClientSpan(spanID uint64) bool {
for _, span := range s.spansByID[spanID] {
if span.GetKind() == v1.Span_SPAN_KIND_CLIENT {
return true
}
}
return false
}
func (s *spanIDDeduper) dedupeSpanIDs() {
oldToNewSpanIDs := make(map[uint64]uint64)
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
id := binary.BigEndian.Uint64(span.SpanId)
// only replace span IDs for server-side spans that share the ID with something else
if span.GetKind() == v1.Span_SPAN_KIND_SERVER && s.isSharedWithClientSpan(id) {
newID, err := s.makeUniqueSpanID()
if err != nil {
// ignore this error condition where we have more than 2^64 unique span IDs
continue
}
oldToNewSpanIDs[id] = newID
if len(span.ParentSpanId) == 0 {
span.ParentSpanId = make([]byte, 8)
}
binary.BigEndian.PutUint64(span.ParentSpanId, id) // previously shared ID is the new parent
binary.BigEndian.PutUint64(span.SpanId, newID)
}
}
}
}
s.swapParentIDs(oldToNewSpanIDs)
}
// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we deduped.
func (s *spanIDDeduper) swapParentIDs(oldToNewSpanIDs map[uint64]uint64) {
if len(oldToNewSpanIDs) == 0 {
return
}
for _, batch := range s.trace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
if len(span.GetParentSpanId()) > 0 {
parentSpanID := binary.BigEndian.Uint64(span.GetParentSpanId())
if newParentID, ok := oldToNewSpanIDs[parentSpanID]; ok {
if binary.BigEndian.Uint64(span.SpanId) != newParentID {
binary.BigEndian.PutUint64(span.ParentSpanId, newParentID)
}
}
}
}
}
}
}
// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (s *spanIDDeduper) makeUniqueSpanID() (uint64, error) {
for id := s.maxUsedID + 1; id < maxSpanID; id++ {
if _, ok := s.spansByID[id]; !ok {
s.maxUsedID = id
return id, nil
}
}
return 0, fmt.Errorf(warningTooManySpans)
}