forked from google/gopacket
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
209 lines (195 loc) · 6.31 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
// Copyright 2012 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// Package tcpreader provides an implementation for tcpassembly.Stream which presents
// the caller with an io.Reader for easy processing.
//
// The assembly package handles packet data reordering, but its output is
// library-specific, thus not usable by the majority of external Go libraries.
// The io.Reader interface, on the other hand, is used throughout much of Go
// code as an easy mechanism for reading in data streams and decoding them. For
// example, the net/http package provides the ReadRequest function, which can
// parase an HTTP request from a live data stream, just what we'd want when
// sniffing HTTP traffic. Using ReaderStream, this is relatively easy to set
// up:
//
// // Create our StreamFactory
// type httpStreamFactory struct {}
// func (f *httpStreamFactory) New(a, b gopacket.Flow) {
// r := tcpreader.NewReaderStream(false)
// go printRequests(r)
// return &r
// }
// func printRequests(r io.Reader) {
// // Convert to bufio, since that's what ReadRequest wants.
// buf := bufio.NewReader(r)
// for {
// if req, err := http.ReadRequest(buf); err == io.EOF {
// return
// } else if err != nil {
// log.Println("Error parsing HTTP requests:", err)
// } else {
// fmt.Println("HTTP REQUEST:", req)
// fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes")
// }
// }
// }
//
// Using just this code, we're able to reference a powerful, built-in library
// for HTTP request parsing to do all the dirty-work of parsing requests from
// the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool,
// start up an tcpassembly.Assembler, and you're good to go!
package tcpreader
import (
"code.google.com/p/gopacket/tcpassembly"
"errors"
"io"
)
var discardBuffer = make([]byte, 4096)
// DiscardBytesToFirstError will read in all bytes up to the first error
// reported by the given reader, then return the number of bytes discarded
// and the error encountered.
func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) {
for {
n, e := r.Read(discardBuffer)
discarded += n
if e != nil {
return discarded, e
}
}
}
// DiscardBytesToEOF will read in all bytes from a Reader until it
// encounters an io.EOF, then return the number of bytes. Be careful
// of this... if used on a Reader that returns a non-io.EOF error
// consistently, this will loop forever discarding that error while
// it waits for an EOF.
func DiscardBytesToEOF(r io.Reader) (discarded int) {
for {
n, e := DiscardBytesToFirstError(r)
discarded += n
if e == io.EOF {
return
}
}
}
// ReaderStream implements both tcpassembly.Stream and io.Reader. You can use it
// as a building block to make simple, easy stream handlers.
//
// IMPORTANT: If you use a ReaderStream, you MUST read ALL BYTES from it,
// quickly. Not reading available bytes will block TCP stream reassembly. It's
// a common pattern to do this by starting a goroutine in the factory's New
// method:
//
// type myStreamHandler struct {
// r ReaderStream
// }
// func (m *myStreamHandler) run() {
// // Do something here that reads all of the ReaderStream, or your assembly
// // will block.
// fmt.Println(tcpreader.DiscardBytesToEOF(&m.r))
// }
// func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
// s := &myStreamHandler{}
// go s.run()
// // Return the ReaderStream as the stream that assembly should populate.
// return &s.r
// }
type ReaderStream struct {
ReaderStreamOptions
reassembled chan []tcpassembly.Reassembly
done chan bool
current []tcpassembly.Reassembly
closed bool
lossReported bool
first bool
initiated bool
}
type ReaderStreamOptions struct {
// LossErrors determines whether this stream will return
// ReaderStreamDataLoss errors from its Read function whenever it
// determines data has been lost.
LossErrors bool
}
// NewReaderStream returns a new ReaderStream object.
func NewReaderStream() ReaderStream {
r := ReaderStream{
reassembled: make(chan []tcpassembly.Reassembly),
done: make(chan bool),
first: true,
initiated: true,
}
return r
}
// Reassembled implements tcpassembly.Stream's Reassembled function.
func (r *ReaderStream) Reassembled(reassembly []tcpassembly.Reassembly) {
if !r.initiated {
panic("ReaderStream not created via NewReaderStream")
}
r.reassembled <- reassembly
<-r.done
}
// ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
func (r *ReaderStream) ReassemblyComplete() {
close(r.reassembled)
close(r.done)
}
// stripEmpty strips empty reassembly slices off the front of its current set of
// slices.
func (r *ReaderStream) stripEmpty() {
for len(r.current) > 0 && len(r.current[0].Bytes) == 0 {
r.current = r.current[1:]
r.lossReported = false
}
}
// DataLost is returned by the ReaderStream's Read function when it encounters
// a Reassembly with Skip != 0.
var DataLost error = errors.New("lost data")
// Read implements io.Reader's Read function.
// Given a byte slice, it will either copy a non-zero number of bytes into
// that slice and return the number of bytes and a nil error, or it will
// leave slice p as is and return 0, io.EOF.
func (r *ReaderStream) Read(p []byte) (int, error) {
if !r.initiated {
panic("ReaderStream not created via NewReaderStream")
}
var ok bool
r.stripEmpty()
for !r.closed && len(r.current) == 0 {
if r.first {
r.first = false
} else {
r.done <- true
}
if r.current, ok = <-r.reassembled; ok {
r.stripEmpty()
} else {
r.closed = true
}
}
if len(r.current) > 0 {
current := &r.current[0]
if r.LossErrors && !r.lossReported && current.Skip != 0 {
r.lossReported = true
return 0, DataLost
}
length := copy(p, current.Bytes)
current.Bytes = current.Bytes[length:]
return length, nil
}
return 0, io.EOF
}
// Close implements io.Closer's Close function, making ReaderStream a
// io.ReadCloser. It discards all remaining bytes in the reassembly in a
// manner that's safe for the assembler (IE: it doesn't block).
func (r *ReaderStream) Close() error {
r.current = nil
r.closed = true
for {
if _, ok := <-r.reassembled; !ok {
return nil
}
r.done <- true
}
}