/
http2.go
162 lines (155 loc) · 4.85 KB
/
http2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package csiparser
import (
"bytes"
"io"
"log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
func extractStreams(rfs map[string]rawFrames) []rawStreams {
var streams []rawStreams
for abID, rf := range rfs {
// Extract info from current src-dst pair, then delete its rawFrames
aIP, bIP := rf.srcIP, rf.dstIP
aPort, bPort := rf.srcPort, rf.dstPort
abFrames := rf.frames
delete(rfs, abID)
// Extract info from opposite src-dst pair, the delete that rawFrames
baID := srcdstIdentifier(bIP, aIP, bPort, aPort)
x, ok := rfs[baID]
if !ok {
continue
}
baFrames := x.frames
delete(rfs, baID)
// Construct rawStreams struct
rs := rawStreams{aIP, bIP, aPort, bPort, nil, nil}
rs.ab = framesToStreams(abFrames)
rs.ba = framesToStreams(baFrames)
// Append rawStreams struct to streams
streams = append(streams, rs)
}
return streams
}
type streaminfo = struct {
headersEnded bool
headersIndex int
stream stream
}
func framesToStreams(allFrames []frames) map[uint32]stream {
// Decoder with default max dynamic table size
decoder := hpack.NewDecoder(4096, nil)
// Magic
magic := http2.ClientPreface
// Initialize streaminfos struct
streaminfos := make(map[uint32]streaminfo)
for _, pktFrames := range allFrames {
timestamp := pktFrames.captureTimestamp
frameBytes := pktFrames.frames
// If there's a magic prefix in the beginning of frameBytes, remove it
if len(frameBytes) >= len(magic) &&
string(frameBytes[:len(magic)]) == magic {
frameBytes = frameBytes[len(magic):]
}
// Create new Framer to parse frames from frameBytes
r := bytes.NewReader(frameBytes)
framer := http2.NewFramer(nil, r)
// Parse
for true {
// Read the next frame
curFrame, err := framer.ReadFrame()
if err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
// Get header of frame
hdr := copyHdr(curFrame.Header())
// Get stream ID of frame
sid := hdr.StreamID
// Create new streaminfo for sid if necessary
if _, ok := streaminfos[sid]; !ok {
streaminfos[sid] = streaminfo{true, -1, stream{}}
}
si := streaminfos[sid]
// Only store Data, Headers, and PushPromise frames (includes Continuations)
if hdr.Type == http2.FrameData {
// Append frame
body := copyBuf(curFrame.(*http2.DataFrame).Data())
si.stream.frames = append(si.stream.frames, frame{hdr, body, nil})
// Set/update captureTimestamp
si.stream.captureTimestamp = timestamp
} else if hdr.Type == http2.FrameHeaders || hdr.Type == http2.FramePushPromise {
// Append frame
var body []byte
if hdr.Type == http2.FrameHeaders {
body = copyBuf(curFrame.(*http2.HeadersFrame).HeaderBlockFragment())
} else {
body = copyBuf(curFrame.(*http2.PushPromiseFrame).HeaderBlockFragment())
}
si.stream.frames = append(si.stream.frames, frame{hdr, body, nil})
// Update headersEnded tracking
if hdr.Type == http2.FrameHeaders {
si.headersEnded = curFrame.(*http2.HeadersFrame).HeadersEnded()
} else {
si.headersEnded = curFrame.(*http2.PushPromiseFrame).HeadersEnded()
}
// Update headersIndex
si.headersIndex = len(si.stream.frames) - 1
// If headersEnded, decode header block
if si.headersEnded {
x, err := decoder.DecodeFull(si.stream.frames[si.headersIndex].body)
if err != nil {
log.Fatal(err)
}
si.stream.frames[si.headersIndex].headerFields = x
si.stream.frames[si.headersIndex].body = nil
}
// Set/update captureTimestamp
si.stream.captureTimestamp = timestamp
} else if hdr.Type == http2.FrameContinuation && !si.headersEnded {
// Append header block fragment to headers frame
fg := curFrame.(*http2.ContinuationFrame).HeaderBlockFragment()
x := append(si.stream.frames[si.headersIndex].body, fg...)
si.stream.frames[si.headersIndex].body = x
// Update headersEnded tracking
si.headersEnded = curFrame.(*http2.ContinuationFrame).HeadersEnded()
// Set/update captureTimestamp
si.stream.captureTimestamp = timestamp
// If headersEnded, decode header block
if si.headersEnded {
x, err := decoder.DecodeFull(si.stream.frames[si.headersIndex].body)
if err != nil {
log.Fatal(err)
}
si.stream.frames[si.headersIndex].headerFields = x
si.stream.frames[si.headersIndex].body = nil
}
}
streaminfos[sid] = si
}
}
// Create smap with all the streams from streaminfo
smap := make(map[uint32]stream)
for k, v := range streaminfos {
if v.stream.frames != nil {
smap[k] = v.stream
}
}
return smap
}
func copyHdr(hdr http2.FrameHeader) (copyhdr http2.FrameHeader) {
copyhdr.Type = hdr.Type
copyhdr.Flags = hdr.Flags
copyhdr.Length = hdr.Length
copyhdr.StreamID = hdr.StreamID
return
}
func copyBuf(buf []byte) []byte {
copybuf := make([]byte, len(buf))
n := copy(copybuf, buf)
if n != len(buf) {
log.Fatal("error when copying buffer")
}
return copybuf
}