-
Notifications
You must be signed in to change notification settings - Fork 77
/
http2grpc_transform.go
254 lines (210 loc) · 6.65 KB
/
http2grpc_transform.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package ebpfcommon
import (
"bytes"
"encoding/binary"
"net"
"strconv"
"strings"
"github.com/cilium/ebpf/ringbuf"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/otel/trace"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)
type BPFHTTP2Info bpfHttp2GrpcRequestT
type Protocol uint8
// The following consts need to coincide with some C identifiers:
// EVENT_HTTP_REQUEST, EVENT_GRPC_REQUEST, EVENT_HTTP_CLIENT, EVENT_GRPC_CLIENT, EVENT_SQL_CLIENT
const (
HTTP2 Protocol = iota + 1
GRPC
)
var hdec = hpack.NewDecoder(0, nil)
// not all requests for a given stream specify the protocol, but one must
// we remember if we see grpc mentioned and tag the rest of the streams for
// a given connection as grpc. default assumes plain HTTP2
var activeGRPCConnections, _ = lru.New[BPFConnInfo, Protocol](1024)
func byteFramer(data []uint8) *http2.Framer {
buf := bytes.NewBuffer(data)
fr := http2.NewFramer(buf, buf) // the write is same as read, but we never write
return fr
}
func defaultProtocol(conn *BPFConnInfo) Protocol {
proto, ok := activeGRPCConnections.Get(*conn)
if !ok {
proto = HTTP2
}
return proto
}
func protocolIsGRPC(conn *BPFConnInfo) {
activeGRPCConnections.Add(*conn, GRPC)
}
func readMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFrame) (string, string, Protocol) {
method := ""
path := ""
proto := defaultProtocol(conn)
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
hfKey := strings.ToLower(hf.Name)
switch hfKey {
case ":method":
method = hf.Value
case ":path":
path = hf.Value
case "content-type":
if strings.ToLower(hf.Value) == "application/grpc" {
protocolIsGRPC(conn)
proto = GRPC
}
}
})
// Lose reference to MetaHeadersFrame:
defer hdec.SetEmitFunc(func(_ hpack.HeaderField) {})
for {
frag := hf.HeaderBlockFragment()
if _, err := hdec.Write(frag); err != nil {
return method, path, proto
}
if hf.HeadersEnded() {
break
}
if _, err := fr.ReadFrame(); err != nil {
return method, path, proto
}
}
return method, path, proto
}
func http2grpcStatus(status int) int {
if status < 100 {
return status
}
if status < 400 {
return 0
}
return 2 // Unknown
}
func readRetMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFrame) (int, Protocol) {
status := 0
proto := defaultProtocol(conn)
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
hfKey := strings.ToLower(hf.Name)
// grpc requests may have :status and grpc-status. :status will be HTTP code.
// we prefer the grpc one if it exists, it's always later since : tagged headers
// end up first in the headers list.
switch hfKey {
case ":status":
status, _ = strconv.Atoi(hf.Value)
proto = HTTP2
case "grpc-status":
status, _ = strconv.Atoi(hf.Value)
protocolIsGRPC(conn)
proto = GRPC
}
})
// Lose reference to MetaHeadersFrame:
defer hdec.SetEmitFunc(func(_ hpack.HeaderField) {})
for {
frag := hf.HeaderBlockFragment()
if _, err := hdec.Write(frag); err != nil {
return status, proto
}
if hf.HeadersEnded() {
break
}
if _, err := fr.ReadFrame(); err != nil {
return status, proto
}
}
return status, proto
}
var genericServiceID = svc.ID{SDKLanguage: svc.InstrumentableGeneric}
func http2InfoToSpan(info *BPFHTTP2Info, method, path, peer, host string, status int, protocol Protocol) request.Span {
return request.Span{
Type: info.eventType(protocol),
ID: 0,
Method: method,
Path: removeQuery(path),
Peer: peer,
Host: host,
HostPort: int(info.ConnInfo.D_port),
ContentLength: int64(info.Len),
RequestStart: int64(info.StartMonotimeNs),
Start: int64(info.StartMonotimeNs),
End: int64(info.EndMonotimeNs),
Status: status,
ServiceID: genericServiceID, // set generic service to be overwritten later by the PID filters
TraceID: trace.TraceID(info.Tp.TraceId),
SpanID: trace.SpanID(info.Tp.SpanId),
ParentSpanID: trace.SpanID(info.Tp.ParentId),
Flags: info.Tp.Flags,
Pid: request.PidInfo{
HostPID: info.Pid.HostPid,
UserPID: info.Pid.UserPid,
Namespace: info.Pid.Ns,
},
}
}
// The eBPF kernel side gives us information only if the event type is server or client. We reuse what's
// done for HTTP 1.1. We figure out what the protocol is by looking at the response status, is it :grpc-status,
// or :status. Then we know what the protocol actually is.
func (event *BPFHTTP2Info) eventType(protocol Protocol) request.EventType {
eventType := request.EventType(event.Type)
switch protocol {
case HTTP2:
return eventType // just use HTTP as is, no special handling
case GRPC:
switch eventType {
case request.EventTypeHTTP:
return request.EventTypeGRPC
case request.EventTypeHTTPClient:
return request.EventTypeGRPCClient
}
}
return 0
}
func (event *BPFHTTP2Info) hostInfo() (source, target string) {
src := make(net.IP, net.IPv6len)
dst := make(net.IP, net.IPv6len)
copy(src, event.ConnInfo.S_addr[:])
copy(dst, event.ConnInfo.D_addr[:])
return src.String(), dst.String()
}
func ReadHTTP2InfoIntoSpan(record *ringbuf.Record) (request.Span, bool, error) {
var event BPFHTTP2Info
err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err != nil {
return request.Span{}, true, err
}
framer := byteFramer(event.Data[:])
retFramer := byteFramer(event.RetData[:])
// We don't set the framer.ReadMetaHeaders function to hpack.NewDecoder because
// the http2.MetaHeadersFrame code wants a full grpc buffer with all the fields,
// and if it sees our partially captured eBPF buffers, it will not parse the frame
// while returning a (nil, error) tuple. We read the meta frame ourselves as long as
// we can and terminate without an error when things fail to decode because of
// partial buffers.
retF, _ := retFramer.ReadFrame()
status := 0
eventType := HTTP2
if ff, ok := retF.(*http2.HeadersFrame); ok {
status, eventType = readRetMetaFrame((*BPFConnInfo)(&event.ConnInfo), retFramer, ff)
}
f, _ := framer.ReadFrame()
if ff, ok := f.(*http2.HeadersFrame); ok {
method, path, proto := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), framer, ff)
if eventType != GRPC && proto == GRPC {
eventType = proto
status = http2grpcStatus(status)
}
peer := ""
host := ""
if event.ConnInfo.S_port != 0 || event.ConnInfo.D_port != 0 {
source, target := event.hostInfo()
host = target
peer = source
}
return http2InfoToSpan(&event, method, path, peer, host, status, eventType), false, nil
}
return request.Span{}, true, nil // ignore if we couldn't parse it
}