forked from google/gopacket
-
Notifications
You must be signed in to change notification settings - Fork 0
/
afpacket.go
345 lines (320 loc) · 11.6 KB
/
afpacket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
// Copyright 2012 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// +build linux
// Package afpacket provides Go bindings for MMap'd AF_PACKET socket reading.
package afpacket
// Couldn't have done this without:
// http://lxr.free-electrons.com/source/Documentation/networking/packet_mmap.txt
// http://codemonkeytips.blogspot.co.uk/2011/07/asynchronous-packet-socket-reading-with.html
import (
"code.google.com/p/gopacket"
"errors"
"fmt"
"net"
"runtime"
"sync"
"time"
"unsafe"
)
/*
#include <linux/if_packet.h> // AF_PACKET, sockaddr_ll
#include <linux/if_ether.h> // ETH_P_ALL
#include <sys/socket.h> // socket()
#include <unistd.h> // close()
#include <arpa/inet.h> // htons()
#include <sys/mman.h> // mmap(), munmap()
#include <poll.h> // poll()
*/
import "C"
var pageSize = int(C.getpagesize())
var tpacketAlignment = uint(C.TPACKET_ALIGNMENT)
func tpacketAlign(v int) int {
return int((uint(v) + tpacketAlignment - 1) & ((^tpacketAlignment) - 1))
}
// Stats is a set of counters detailing the work TPacket has done so far.
type Stats struct {
// Packets is the total number of packets returned to the caller.
Packets int64
// Polls is the number of blocking syscalls made waiting for packets.
// This should always be <= Packets, since with TPacket one syscall
// can (and often does) return many results.
Polls int64
}
type TPacket struct {
// fd is the C file descriptor.
fd C.int
// ring points to the memory space of the ring buffer shared by tpacket and the kernel.
ring unsafe.Pointer
// opts contains read-only options for the TPacket object.
opts options
mu sync.Mutex // guards below
// offset is the offset into the ring of the current header.
offset int
// current is the current header.
current header
// pollset is used by TPacket for its poll() call.
pollset C.struct_pollfd
// shouldReleasePacket is set to true whenever we return packet data, to make sure we remember to release that data back to the kernel.
shouldReleasePacket bool
// stats is simple statistics on TPacket's run.
stats Stats
// tpVersion is the version of TPacket actually in use, set by setRequestedTPacketVersion.
tpVersion OptTPacketVersion
// Hackity hack hack hack. We need to return a pointer to the header with
// getTPacketHeader, and we don't want to allocate a v3wrapper every time,
// so we leave it in the TPacket object and return a pointer to it.
v3 v3wrapper
}
// bindToInterface binds the TPacket socket to a particular named interface.
func (h *TPacket) bindToInterface(ifaceName string) error {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return fmt.Errorf("InterfaceByName: %v", err)
}
var ll C.struct_sockaddr_ll
ll.sll_family = C.AF_PACKET
ll.sll_protocol = C.__be16(C.htons(C.ETH_P_ALL))
ll.sll_ifindex = C.int(iface.Index)
if _, err := C.bind(h.fd, (*C.struct_sockaddr)(unsafe.Pointer(&ll)), C.socklen_t(unsafe.Sizeof(ll))); err != nil {
return fmt.Errorf("bindToInterface: %v", err)
}
return nil
}
// setTPacketVersion asks the kernel to set TPacket to a particular version, and returns an error on failure.
func (h *TPacket) setTPacketVersion(version OptTPacketVersion) error {
val := C.int(version)
_, err := C.setsockopt(h.fd, C.SOL_PACKET, C.PACKET_VERSION, unsafe.Pointer(&val), C.socklen_t(unsafe.Sizeof(val)))
if err != nil {
return fmt.Errorf("setsockopt packet_version: %v", err)
}
return nil
}
// setRequestedTPacketVersion tries to set TPacket to the requested version or versions.
func (h *TPacket) setRequestedTPacketVersion() error {
switch {
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion3) && h.setTPacketVersion(TPacketVersion3) == nil:
h.tpVersion = TPacketVersion3
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion2) && h.setTPacketVersion(TPacketVersion2) == nil:
h.tpVersion = TPacketVersion2
case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion1) && h.setTPacketVersion(TPacketVersion1) == nil:
h.tpVersion = TPacketVersion1
default:
return errors.New("no known tpacket versions work on this machine")
}
return nil
}
// setUpRing sets up the shared-memory ring buffer between the user process and the kernel.
func (h *TPacket) setUpRing() (err error) {
totalSize := C.uint(h.opts.framesPerBlock * h.opts.numBlocks * h.opts.frameSize)
switch h.tpVersion {
case TPacketVersion1, TPacketVersion2:
var tp C.struct_tpacket_req
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
if _, err := C.setsockopt(h.fd, C.SOL_PACKET, C.PACKET_RX_RING, unsafe.Pointer(&tp), C.socklen_t(unsafe.Sizeof(tp))); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring: %v", err)
}
case TPacketVersion3:
var tp C.struct_tpacket_req3
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
if _, err := C.setsockopt(h.fd, C.SOL_PACKET, C.PACKET_RX_RING, unsafe.Pointer(&tp), C.socklen_t(unsafe.Sizeof(tp))); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
}
default:
return errors.New("invalid tpVersion")
}
if h.ring, err = C.mmap(nil, C.size_t(totalSize), C.PROT_READ|C.PROT_WRITE, C.MAP_SHARED, C.int(h.fd), 0); err != nil {
return
}
if h.ring == nil {
return errors.New("no ring")
}
return nil
}
// Close cleans up the TPacket. It should not be used after the Close call.
func (h *TPacket) Close() {
if h.fd == -1 {
return // already closed.
}
if h.ring != nil {
C.munmap(h.ring, C.size_t(h.opts.blockSize*h.opts.numBlocks))
}
h.ring = nil
C.close(h.fd)
h.fd = -1
runtime.SetFinalizer(h, nil)
}
// NewTPacket returns a new TPacket object for reading packets off the wire.
// Its behavior may be modified by passing in any/all of afpacket.Opt* to this
// function.
// If this function succeeds, the user should be sure to Close the returned
// TPacket when finished with it.
func NewTPacket(opts ...interface{}) (h *TPacket, err error) {
h = &TPacket{}
if h.opts, err = parseOptions(opts...); err != nil {
return nil, err
}
fd, err := C.socket(C.AF_PACKET, C.int(h.opts.socktype), C.int(C.htons(C.ETH_P_ALL)))
if err != nil {
return nil, err
}
h.fd = fd
if h.opts.iface != "" {
if err = h.bindToInterface(h.opts.iface); err != nil {
goto errlbl
}
}
if err = h.setRequestedTPacketVersion(); err != nil {
goto errlbl
}
if err = h.setUpRing(); err != nil {
goto errlbl
}
runtime.SetFinalizer(h, (*TPacket).Close)
return h, nil
errlbl:
h.Close()
return nil, err
}
func (h *TPacket) releaseCurrentPacket() error {
h.current.clearStatus()
h.offset++
h.shouldReleasePacket = false
return nil
}
// ZeroCopyReadPacketData reads the next packet off the wire, and returns its data.
// The slice returned by ZeroCopyReadPacketData points to bytes owned by the
// TPacket. Each call to ZeroCopyReadPacketData invalidates any data previously
// returned by ZeroCopyReadPacketData. Care must be taken not to keep pointers
// to old bytes when using ZeroCopyReadPacketData... if you need to keep data past
// the next time you call ZeroCopyReadPacketData, use ReadPacketDataData, which copies
// the bytes into a new buffer for you.
// tp, _ := NewTPacket(...)
// data1, _, _ := tp.ZeroCopyReadPacketData()
// // do everything you want with data1 here, copying bytes out of it if you'd like to keep them around.
// data2, _, _ := tp.ZeroCopyReadPacketData() // invalidates bytes in data1
func (h *TPacket) ZeroCopyReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
h.mu.Lock()
if h.current == nil || !h.current.next() {
if h.shouldReleasePacket {
h.releaseCurrentPacket()
}
h.current = h.getTPacketHeader()
if err = h.pollForFirstPacket(h.current); err != nil {
h.mu.Unlock()
return
}
}
data = h.current.getData()
ci.Timestamp = h.current.getTime()
ci.CaptureLength = len(data)
ci.Length = h.current.getLength()
h.stats.Packets++
h.mu.Unlock()
return
}
// Stats returns statistics on the packets the TPacket has seen so far.
func (h *TPacket) Stats() (Stats, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.stats, nil
}
// ReadPacketDataTo reads packet data into a user-supplied buffer.
// This function reads up to the length of the passed-in slice.
// The number of bytes read into data will be returned in ci.CaptureLength,
// which is the minimum of the size of the passed-in buffer and the size of
// the captured packet.
func (h *TPacket) ReadPacketDataTo(data []byte) (ci gopacket.CaptureInfo, err error) {
var d []byte
d, ci, err = h.ZeroCopyReadPacketData()
if err != nil {
return
}
ci.CaptureLength = copy(data, d)
return
}
// ReadPacketData reads the next packet, copies it into a new buffer, and returns
// that buffer. Since the buffer is allocated by ReadPacketData, it is safe for long-term
// use. This implements gopacket.PacketDataSource.
func (h *TPacket) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
var d []byte
d, ci, err = h.ZeroCopyReadPacketData()
if err != nil {
return
}
data = make([]byte, len(d))
copy(data, d)
return
}
func (h *TPacket) getTPacketHeader() header {
switch h.tpVersion {
case TPacketVersion1:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.ring) + uintptr(h.opts.frameSize*h.offset)
return (*v1header)(unsafe.Pointer(position))
case TPacketVersion2:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.ring) + uintptr(h.opts.frameSize*h.offset)
return (*v2header)(unsafe.Pointer(position))
case TPacketVersion3:
// TPacket3 uses each block to return values, instead of each frame. Hence we need to rotate when we hit #blocks, not #frames.
if h.offset >= h.opts.numBlocks {
h.offset = 0
}
position := uintptr(h.ring) + uintptr(h.opts.frameSize*h.offset*h.opts.framesPerBlock)
h.v3 = initV3Wrapper(unsafe.Pointer(position))
return &h.v3
}
panic("handle tpacket version is invalid")
}
func (h *TPacket) pollForFirstPacket(hdr header) error {
for hdr.getStatus()&C.TP_STATUS_USER == 0 {
h.pollset.fd = h.fd
h.pollset.events = C.POLLIN
h.pollset.revents = 0
_, err := C.poll(&h.pollset, 1, -1)
h.stats.Polls++
if err != nil {
return err
}
}
h.shouldReleasePacket = true
return nil
}
// FanoutType determines the type of fanout to use with a TPacket SetFanout call.
type FanoutType int
const (
FanoutHash FanoutType = 0
// It appears that defrag only works with FanoutHash, see:
// http://lxr.free-electrons.com/source/net/packet/af_packet.c#L1204
FanoutHashWithDefrag FanoutType = 0x8000
FanoutLoadBalance FanoutType = 1
FanoutCPU FanoutType = 2
)
// SetFanout activates TPacket's fanout ability.
// Use of Fanout requires creating multiple TPacket objects and the same id/type to
// a SetFanout call on each. Note that this can be done cross-process, so if two
// different processes both call SetFanout with the same type/id, they'll share
// packets between them. The same should work for multiple TPacket objects within
// the same process.
func (h *TPacket) SetFanout(t FanoutType, id uint16) error {
h.mu.Lock()
defer h.mu.Unlock()
arg := C.int(t) << 16
arg |= C.int(id)
_, err := C.setsockopt(h.fd, C.SOL_PACKET, C.PACKET_FANOUT, unsafe.Pointer(&arg), C.socklen_t(unsafe.Sizeof(arg)))
return err
}