/
grpc.go
151 lines (143 loc) · 4.04 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package csiparser
import (
"net"
"github.com/google/gopacket/layers"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
func parseRPCs(rawstreams []rawStreams) []CsiRPC {
var rpcs []CsiRPC
// For each pair of hosts between which there exist streams:
for _, streams := range rawstreams {
for sid, abStrm := range streams.ab {
// For each pair of opposite streams with the same stream ID:
if baStrm, ok:= streams.ba[sid]; ok {
// Figure out which is request & which is response
var (
request, response stream
clientIP net.IP
clientPort layers.TCPPort
)
if isGRPCRequest(abStrm) && isGRPCResponse(baStrm) {
request, response = abStrm, baStrm
clientIP, clientPort = streams.aIP, streams.aPort
} else if isGRPCRequest(baStrm) && isGRPCResponse(abStrm) {
request, response = baStrm, abStrm
clientIP, clientPort = streams.bIP, streams.bPort
} else {
continue
}
// Create a CsiRPC struct from the stream
curRPC, ok := parseGRPC(request, response, clientIP, clientPort)
// Append to rpcs
if ok {
rpcs = append(rpcs, curRPC)
}
}
}
}
return rpcs
}
func isGRPCRequest(strm stream) bool {
// There must be at least 1 headers & 1 data frame
if len(strm.frames) < 2 {
return false
}
// Frame 0 must be headers
if strm.frames[0].header.Type != http2.FrameHeaders {
return false
}
// Frame 1 and on must be data
for i := 1; i < len(strm.frames); i++ {
if strm.frames[i].header.Type != http2.FrameData {
return false
}
}
// Make sure all necessary headers are in frame 0
hdrs := strm.frames[0].headerFields
method, scheme, contentType := false, false, false
for _, h := range hdrs {
if h.Name == ":method" && h.Value == "POST" {
method = true
} else if h.Name == ":scheme" && h.Value == "http" {
scheme = true
} else if h.Name == "content-type" && h.Value == "application/grpc" {
contentType = true
}
}
return method && scheme && contentType
}
// Will return true for a successful gRPC response
func isGRPCResponse(strm stream) bool {
// There must be at least 1 header, 1 data, 1 header frame
if len(strm.frames) < 3 {
return false
}
// Frame 0 must be headers
if strm.frames[0].header.Type != http2.FrameHeaders {
return false
}
// Frame 1~(n-1) must be data
for i := 1; i < len(strm.frames)-1; i++ {
if strm.frames[i].header.Type != http2.FrameData {
return false
}
}
// Frame n must be header
if strm.frames[len(strm.frames)-1].header.Type != http2.FrameHeaders {
return false
}
// Frame 0 required headers
status, contentType := false, false
for _, h := range strm.frames[0].headerFields {
if h.Name == ":status" && h.Value == "200" {
status = true
} else if h.Name == "content-type" && h.Value == "application/grpc" {
contentType = true
}
}
return status && contentType
}
func parseGRPC(request, response stream, clientIP net.IP,
clientPort layers.TCPPort) (CsiRPC, bool) {
// Find :path in request stream's header
path := findField(request.frames[0].headerFields, ":path")
// Find :authority in request stream's header
clientName := findField(request.frames[0].headerFields, ":authority")
// Parse into service & rpc
service, rpc := parseRPCPath(path)
// Find data blocks of both request & response
reqData := request.frames[1].body
repData := response.frames[1].body
// Parse request & response
req := parseCSIRequest(reqData, service, rpc)
rep := parseCSIResponse(repData, service, rpc)
// Construct CsiRPC struct
rpcStruct := CsiRPC{clientIP, clientPort, clientName, path, req, rep,
request.captureTimestamp, response.captureTimestamp}
if req != nil && rep != nil {
return rpcStruct, true
} else {
return rpcStruct, false
}
}
func findField(headerFields []hpack.HeaderField, field string) string {
for _, h := range headerFields {
if h.Name == field {
return h.Value
}
}
return ""
}
func parseRPCPath(path string) (service, rpc string) {
path = path[len("/csi.v1."):]
split := -1
for i, c := range path {
if c == '/' {
split = i
}
}
service = path[:split]
rpc = path[split+1:]
return
}