-
Notifications
You must be signed in to change notification settings - Fork 0
/
afring.go
448 lines (354 loc) · 13.4 KB
/
afring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//go:build linux
// +build linux
/*
Package afring implements a capture.Source and a capture.SourceZeroCopy that allows reading
network packets from Linux network interfaces via the AF_PACKET / TPacket ring buffer mechanism.
This implementation relies on performing optimized `PPOLL()` syscalls to the MMAP'ed socket to
fetch blocks of packets. The ring buffer is configurable (depending on the expected throughput).
This capture method is optimally suited for production-level packet capture since it achieves
blazing-fast capture rates (in particular in zero-copy mode).
*/
package afring
import (
"errors"
"fmt"
"sync"
"unsafe"
"github.com/fako1024/slimcap/capture"
"github.com/fako1024/slimcap/capture/afpacket/socket"
"github.com/fako1024/slimcap/event"
"github.com/fako1024/slimcap/link"
"golang.org/x/sys/unix"
)
const (
DefaultSnapLen = (1 << 16) // DefaultSnapLen : 64 kiB
)
// Source denotes an AF_PACKET capture source making use of a ring buffer
type Source struct {
eventHandler *event.Handler
ipLayerOffset byte
snapLen int
blockSize, nBlocks int
isPromisc bool
link *link.Link
ipLayerOffsetNum uint32
ringBuffer
sync.Mutex
}
// NewSource instantiates a new AF_PACKET capture source making use of a ring buffer
func NewSource(iface string, options ...Option) (*Source, error) {
if iface == "" {
return nil, errors.New("no interface provided")
}
link, err := link.New(iface)
if err != nil {
return nil, fmt.Errorf("failed to set up link on %s: %w", iface, err)
}
return NewSourceFromLink(link, options...)
}
// NewSourceFromLink instantiates a new AF_PACKET capture source making use of a ring buffer
// taking an existing link instance
func NewSourceFromLink(link *link.Link, options ...Option) (*Source, error) {
// Fail if link is not up
if isUp, err := link.IsUp(); err != nil || !isUp {
return nil, fmt.Errorf("link %s is not up", link.Name)
}
// Define new source
src := &Source{
eventHandler: new(event.Handler),
snapLen: DefaultSnapLen,
blockSize: tPacketDefaultBlockSize,
nBlocks: tPacketDefaultBlockNr,
ipLayerOffset: link.Type.IPHeaderOffset(),
link: link,
}
src.ipLayerOffsetNum = uint32(src.ipLayerOffset)
for _, opt := range options {
opt(src)
}
// Define a new TPacket request
var err error
src.ringBuffer.tpReq, err = newTPacketRequestForBuffer(src.blockSize, src.nBlocks, src.snapLen)
if err != nil {
return nil, fmt.Errorf("failed to setup TPacket request on %s: %w", link.Name, err)
}
// Setup socket
src.eventHandler.Fd, err = socket.New(link)
if err != nil {
return nil, fmt.Errorf("failed to setup AF_PACKET socket on %s: %w", link.Name, err)
}
// Set socket options
if err := src.eventHandler.Fd.SetSocketOptions(link, src.snapLen, src.isPromisc); err != nil {
return nil, fmt.Errorf("failed to set AF_PACKET socket options on %s: %w", link.Name, err)
}
// Setup ring buffer
src.ringBuffer.curTPacketHeader = new(tPacketHeader)
src.ringBuffer.ring, src.eventHandler.Efd, err = setupRingBuffer(src.eventHandler.Fd, src.tpReq)
if err != nil {
_ = src.eventHandler.Fd.Close()
return nil, fmt.Errorf("failed to setup AF_PACKET mmap'ed ring buffer on %s: %w", link.Name, err)
}
// Clear socket stats
if _, err := src.eventHandler.Fd.GetSocketStats(); err != nil {
_ = src.eventHandler.Fd.Close()
return nil, fmt.Errorf("failed to clear AF_PACKET socket stats on %s: %w", link.Name, err)
}
return src, nil
}
// NewPacket creates an empty "buffer" packet to be used as destination for the NextPacket() / NextPayload() /
// NextIPPacket() methods (the latter two by calling .Payload() / .IPLayer() on the created buffer). It ensures
// that a valid packet of appropriate structure / length is created
func (s *Source) NewPacket() capture.Packet {
p := make(capture.Packet, capture.PacketHdrOffset+s.snapLen)
return p
}
// NextPacket receives the next packet from the source and returns it. The operation is blocking. In
// case a non-nil "buffer" Packet is provided it will be populated with the data (and returned). The
// buffer packet can be reused. Otherwise a new Packet is allocated.
func (s *Source) NextPacket(pBuf capture.Packet) (pkt capture.Packet, err error) {
if err = s.nextPacket(); err != nil {
return
}
pktHdr := s.curTPacketHeader
// Parse the V3 TPacketHeader, the first byte of the payload and snaplen
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)
effectiveSnapLen := capture.PacketHdrOffset + int(hdr.snaplen)
// If a buffer was provided, extend it to maximum capacity
if pBuf == nil {
// Allocate new capture.Packet if no buffer was provided
pkt = make(capture.Packet, effectiveSnapLen)
} else {
pkt = pBuf[:cap(pBuf)]
}
// Extract / copy all required data / header parameters
pktHdr.pktLenCopy(pkt[2:capture.PacketHdrOffset])
pkt[0] = pktHdr.data[pktHdr.ppos+58]
pkt[1] = s.ipLayerOffset
copy(pkt[capture.PacketHdrOffset:], pktHdr.data[pos:pos+hdr.snaplen])
// Ensure correct packet length
if effectiveSnapLen < len(pkt) {
pkt = pkt[:effectiveSnapLen]
}
return
}
// NextPayload receives the raw payload of the next packet from the source and returns it. The operation is blocking.
// In case a non-nil "buffer" byte slice / payload is provided it will be populated with the data (and returned).
// The buffer can be reused. Otherwise a new byte slice / payload is allocated.
func (s *Source) NextPayload(pBuf []byte) (payload []byte, pktType capture.PacketType, pktLen uint32, err error) {
if err = s.nextPacket(); err != nil {
pktType = capture.PacketUnknown
return
}
pktHdr := s.curTPacketHeader
// Parse the V3 TPacketHeader, the first byte of the payload and snaplen
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)
snapLen := int(hdr.snaplen)
// If a buffer was provided, extend it to maximum capacity
if pBuf != nil {
payload = pBuf[:cap(pBuf)]
} else {
// Allocate new capture.Packet if no buffer was provided
payload = make([]byte, snapLen)
}
// Copy payload / IP layer
copy(payload, pktHdr.data[pos:pos+hdr.snaplen])
// Ensure correct data length
if snapLen < len(payload) {
payload = payload[:snapLen]
}
// Populate the payload / buffer & parameters
pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen
return
}
// NextIPPacket receives the IP layer of the next packet from the source and returns it. The operation is blocking.
// In case a non-nil "buffer" IPLayer is provided it will be populated with the data (and returned).
// The buffer can be reused. Otherwise a new IPLayer is allocated.
func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) {
if err = s.nextPacket(); err != nil {
pktType = capture.PacketUnknown
return
}
pktHdr := s.curTPacketHeader
// Parse the V3 TPacketHeader and the first byte of the payload
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktNet)
// Adjust effective snaplen (subtracting any potential mac layer)
effectiveSnapLen := hdr.snaplen
if s.ipLayerOffsetNum > 0 {
effectiveSnapLen -= s.ipLayerOffsetNum
}
snapLen := int(effectiveSnapLen)
// If a buffer was provided, extend it to maximum capacity
if pBuf != nil {
ipLayer = pBuf[:cap(pBuf)]
} else {
// Allocate new capture.Packet if no buffer was provided
ipLayer = make([]byte, snapLen)
}
// Copy payload / IP layer
copy(ipLayer, pktHdr.data[pos:pos+effectiveSnapLen])
// Ensure correct data length
if snapLen < len(ipLayer) {
ipLayer = ipLayer[:snapLen]
}
// Populate the payload / buffer & parameters
pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen
return
}
// NextPacketFn executes the provided function on the next packet received on the source. If possible, the
// operation should provide a zero-copy way of interaction with the payload / metadata. All operations on the data
// must be completed prior to any subsequent call to any Next*() method.
func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType capture.PacketType, ipLayerOffset byte) error) error {
if err := s.nextPacket(); err != nil {
return err
}
pktHdr := s.curTPacketHeader
// Parse the V3 TPacketHeader and the first byte of the payload
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)
// #nosec G103
return fn(unsafe.Slice(&pktHdr.data[pos], hdr.snaplen),
hdr.pktLen,
pktHdr.data[pktHdr.ppos+58],
s.ipLayerOffset)
}
// Stats returns (and clears) the packet counters of the underlying source
func (s *Source) Stats() (capture.Stats, error) {
s.Lock()
ss, err := s.eventHandler.GetSocketStats()
s.Unlock()
if err != nil {
return capture.Stats{}, err
}
return capture.Stats{
PacketsReceived: uint64(ss.Packets),
PacketsDropped: uint64(ss.Drops),
QueueFreezes: uint64(ss.QueueFreezes),
}, nil
}
// Unblock ensures that a potentially ongoing blocking poll operation is released (returning an ErrCaptureUnblock from
// any potentially ongoing call to Next*() that might currently be blocked)
func (s *Source) Unblock() error {
if s == nil || s.eventHandler.Efd < 0 || !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Unblock() on nil / closed capture source")
}
return s.eventHandler.Efd.Signal(event.SignalUnblock)
}
// Close stops / closes the capture source
func (s *Source) Close() error {
return s.closeAndUnmap()
}
func (s *Source) close() error {
if s == nil || s.eventHandler.Efd < 0 || !s.eventHandler.Fd.IsOpen() {
return errors.New("cannot call Close() on nil / closed capture source")
}
// Close file / event descriptors
if err := s.eventHandler.Efd.Signal(event.SignalStop); err != nil {
return err
}
return s.eventHandler.Fd.Close()
}
func (s *Source) closeAndUnmap() error {
if err := s.close(); err != nil {
return err
}
return unix.Munmap(s.ring)
}
// Link returns the underlying link
func (s *Source) Link() *link.Link {
return s.link
}
// nextPacket provides access to the next packet from either the current block or advances to the next
// one (fetching its first packet).
func (s *Source) nextPacket() error {
pktHdr := s.curTPacketHeader
// If there is an active block, attempt to simply consume a packet from it
if pktHdr.data != nil {
// If there are more packets remaining (i.e. there is a non-zero next offset), advance
// the current position.
// According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the
// tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition,
// it cannot be zero otherwise (because that would be an invalid block).
if nextPos := pktHdr.nextOffset(); nextPos != 0 {
// Update position of next packet and jump to the end
pktHdr.ppos += nextPos
return nil
}
// If there is no next offset, release the TPacketHeader to the kernel and move on to the next block
s.releaseAndAdvance()
}
// Load the data for the block
s.loadTPacketHeader()
// Check if the block is free to access in userland
for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 {
// Run a PPOLL on the file descriptor (waiting for the block to become available)
efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR)
// If an event was received, ensure that the respective error / code is returned
// immediately
if efdHasEvent {
return s.handleEvent()
}
// Handle potential PPOLL errors
if errno != 0 {
if errno == unix.EINTR {
continue
}
return handlePollError(errno)
}
// Handle rare cases of runaway packets (this call will advance to the next block
// as a side effect in case of a detection)
if s.hasRunawayBlock() {
continue
}
}
// Set the position of the first packet in this block and jump to end
pktHdr.ppos = pktHdr.offsetToFirstPkt()
return nil
}
func (s *Source) handleEvent() error {
// Read event data / type from the eventFD
efdData, err := s.eventHandler.Efd.ReadEvent()
if err != nil {
return fmt.Errorf("error reading event: %w", err)
}
// Unset the current block data to allow for re-entry in nextPacket[ZeroCopy]() where we left off if
// required (e.g. on ErrCaptureUnblock)
s.curTPacketHeader.data = nil
if efdData[7] > 0 {
return capture.ErrCaptureStopped
}
return capture.ErrCaptureUnblocked
}
func setupRingBuffer(sd socket.FileDescriptor, tPacketReq tPacketRequest) ([]byte, event.EvtFileDescriptor, error) {
if !sd.IsOpen() {
return nil, -1, errors.New("invalid socket")
}
// Setup event file descriptor used for stopping / unblocking the capture (we start with that to avoid
// having to clean up the ring buffer in case the decriptor can't be created
eventFD, err := event.New()
if err != nil {
return nil, -1, fmt.Errorf("failed to setup event file descriptor: %w", err)
}
// Set socket option to use PACKET_RX_RING
// #nosec G103
if err := sd.SetupRingBuffer(unsafe.Pointer(&tPacketReq), unsafe.Sizeof(tPacketReq)); err != nil {
return nil, -1, fmt.Errorf("failed to call ring buffer instruction: %w", err)
}
// Setup memory mapping
buf, err := unix.Mmap(int(sd), 0, tPacketReq.blockSizeNr(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
if err != nil {
return nil, -1, fmt.Errorf("failed to set up mmap ring buffer: %w", err)
}
if buf == nil {
return nil, -1, fmt.Errorf("mmap ring buffer is nil (error: %w)", err)
}
return buf, eventFD, nil
}
func handlePollError(errno unix.Errno) error {
if errno == unix.EBADF {
return capture.ErrCaptureStopped
}
return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno))
}