Skip to content

Commit

Permalink
Refactor individual packet retrieval methods for optimal inlining
Browse files Browse the repository at this point in the history
  • Loading branch information
fako1024 committed Sep 18, 2023
1 parent 1259aca commit eeaf39e
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 173 deletions.
234 changes: 133 additions & 101 deletions capture/afpacket/afring/afring.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@ const (
DefaultSnapLen = (1 << 16) // DefaultSnapLen : 64 kiB
)

type ringBuffer struct {
ring []byte

tpReq tPacketRequest
curTPacketHeader *tPacketHeader
offset int
}

func (b *ringBuffer) nextTPacketHeader() {
b.curTPacketHeader.data = b.ring[b.offset*int(b.tpReq.blockSize):]
}

// Source denotes an AF_PACKET capture source making use of a ring buffer
type Source struct {
eventHandler *event.Handler
Expand All @@ -51,8 +39,7 @@ type Source struct {
link *link.Link

ipLayerOffsetNum uint32

unblocked bool
filter byte

ringBuffer
sync.Mutex
Expand Down Expand Up @@ -89,6 +76,7 @@ func NewSourceFromLink(link *link.Link, options ...Option) (*Source, error) {
nBlocks: tPacketDefaultBlockNr,
ipLayerOffset: link.Type.IPHeaderOffset(),
link: link,
filter: link.FilterMask(),
}
src.ipLayerOffsetNum = uint32(src.ipLayerOffset)

Expand Down Expand Up @@ -148,13 +136,32 @@ func (s *Source) NextPacket(pBuf capture.Packet) (pkt capture.Packet, err error)
return
}

pktHdr := s.curTPacketHeader

// Parse the V3 TPacketHeader, the first byte of the payload and snaplen
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)
effectiveSnapLen := capture.PacketHdrOffset + int(hdr.snaplen)

// If a buffer was provided, extend it to maximum capacity
if pBuf != nil {
if pBuf == nil {

// Allocate new capture.Packet if no buffer was provided
pkt = make(capture.Packet, effectiveSnapLen)
} else {
pkt = pBuf[:cap(pBuf)]
}

// Populate the packet / buffer
pkt = s.curTPacketHeader.packetPut(pkt, s.ipLayerOffset)
// Extract / copy all required data / header parameters
pktHdr.pktLenCopy(pkt[2:capture.PacketHdrOffset])
pkt[0] = pktHdr.data[pktHdr.ppos+58]
pkt[1] = s.ipLayerOffset
copy(pkt[capture.PacketHdrOffset:], pktHdr.data[pos:pos+hdr.snaplen])

// Ensure correct packet length
if effectiveSnapLen < len(pkt) {
pkt = pkt[:effectiveSnapLen]
}

return
}
Expand All @@ -169,28 +176,32 @@ func (s *Source) NextPayload(pBuf []byte) (payload []byte, pktType capture.Packe
return
}

pktHdr := s.curTPacketHeader

// Parse the V3 TPacketHeader, the first byte of the payload and snaplen
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)
snapLen := int(hdr.snaplen)

// If a buffer was provided, extend it to maximum capacity
if pBuf != nil {
payload = pBuf[:cap(pBuf)]
}

// Populate the payload / buffer & parameters
payload, pktType, pktLen = s.curTPacketHeader.payloadPut(payload, 0)
} else {

return
}
// Allocate new capture.Packet if no buffer was provided
payload = make([]byte, snapLen)
}

// NextPayloadZeroCopy receives the raw payload of the next packet from the source and returns it. The operation is blocking.
// The returned payload provides direct zero-copy access to the underlying data source (e.g. a ring buffer).
func (s *Source) NextPayloadZeroCopy() (payload []byte, pktType capture.PacketType, pktLen uint32, err error) {
// Copy payload / IP layer
copy(payload, pktHdr.data[pos:pos+hdr.snaplen])

if err = s.nextPacket(); err != nil {
pktType = capture.PacketUnknown
return
// Ensure correct data length
if snapLen < len(payload) {
payload = payload[:snapLen]
}

// Extract the payload (zero-copy) & parameters
payload, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(0)
// Populate the payload / buffer & parameters
pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen

return
}
Expand All @@ -205,28 +216,38 @@ func (s *Source) NextIPPacket(pBuf capture.IPLayer) (ipLayer capture.IPLayer, pk
return
}

pktHdr := s.curTPacketHeader

// Parse the V3 TPacketHeader and the first byte of the payload
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktNet)

// Adjust effective snaplen (subtracting any potential mac layer)
effectiveSnapLen := hdr.snaplen
if s.ipLayerOffsetNum > 0 {
effectiveSnapLen -= s.ipLayerOffsetNum
}
snapLen := int(effectiveSnapLen)

// If a buffer was provided, extend it to maximum capacity
if pBuf != nil {
ipLayer = pBuf[:cap(pBuf)]
}

// Populate the IP layer / buffer & parameters
ipLayer, pktType, pktLen = s.curTPacketHeader.payloadPut(ipLayer, s.ipLayerOffsetNum)
} else {

return
}
// Allocate new capture.Packet if no buffer was provided
ipLayer = make([]byte, snapLen)
}

// NextIPPacketZeroCopy receives the IP layer of the next packet from the source and returns it. The operation is blocking.
// The returned IPLayer provides direct zero-copy access to the underlying data source (e.g. a ring buffer).
func (s *Source) NextIPPacketZeroCopy() (ipLayer capture.IPLayer, pktType capture.PacketType, pktLen uint32, err error) {
// Copy payload / IP layer
copy(ipLayer, pktHdr.data[pos:pos+effectiveSnapLen])

if err = s.nextPacket(); err != nil {
pktType = capture.PacketUnknown
return
// Ensure correct data length
if snapLen < len(ipLayer) {
ipLayer = ipLayer[:snapLen]
}

// Extract the IP layer (zero-copy) & parameters
ipLayer, pktType, pktLen = s.curTPacketHeader.payloadZeroCopy(s.ipLayerOffsetNum)
// Populate the payload / buffer & parameters
pktType, pktLen = pktHdr.data[pktHdr.ppos+58], hdr.pktLen

return
}
Expand All @@ -240,10 +261,17 @@ func (s *Source) NextPacketFn(fn func(payload []byte, totalLen uint32, pktType c
return err
}

// Extract the payload (zero-copy) & parameters
payload, pktType, pktLen := s.curTPacketHeader.payloadZeroCopy(0)
pktHdr := s.curTPacketHeader

return fn(payload, pktLen, pktType, s.ipLayerOffset)
// Parse the V3 TPacketHeader and the first byte of the payload
hdr := pktHdr.parseHeader()
pos := pktHdr.ppos + uint32(hdr.pktMac)

// #nosec G103
return fn(pktHdr.data[pos:pos+*(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+12]))],
*(*uint32)(unsafe.Pointer(&pktHdr.data[pktHdr.ppos+16])),
pktHdr.data[pktHdr.ppos+58],
s.ipLayerOffset)
}

// Stats returns (and clears) the packet counters of the underlying source
Expand Down Expand Up @@ -304,74 +332,70 @@ func (s *Source) Link() *link.Link {
return s.link
}

// nextPacket provides access to the next packet from either the current block or advances to the next
// one (fetching its first packet).
func (s *Source) nextPacket() error {

// If the current TPacketHeader does not contain any more packets (or is uninitialized)
// fetch a new one from the ring buffer
fetch:
if s.curTPacketHeader.data == nil || s.unblocked {
if !s.unblocked {
s.nextTPacketHeader()
retry:
pktHdr := s.curTPacketHeader

// If there is an active block, attempt to simply consume a packet from it
if pktHdr.data != nil {

// If there are more packets remaining (i.e. there is a non-zero next offset), advance
// the current position.
// According to https://github.com/torvalds/linux/blame/master/net/packet/af_packet.c#L811 the
// tp_next_offset field is guaranteed to be zero for the final packet of the block. In addition,
// it cannot be zero otherwise (because that would be an invalid block).
if nextPos := pktHdr.nextOffset(); nextPos != 0 {

// Update position of next packet and jump to the end
pktHdr.ppos += nextPos
goto finalize
}
for s.curTPacketHeader.getStatus()&unix.TP_STATUS_USER == 0 || s.unblocked {

// Unset the bypass marker
if s.unblocked {
s.unblocked = false
}
// If there is no next offset, release the TPacketHeader to the kernel and move on to the next block
s.releaseAndAdvance()
}

// Run a PPOLL on the file descriptor, fetching a new block into the ring buffer
efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR)
// Load the data for the block
s.loadTPacketHeader()

// If an event was received, ensure that the respective error is returned
// immediately (setting the `unblocked` marker to bypass checks done before
// upon next entry into this method)
if efdHasEvent {
return s.handleEvent()
}
// Check if the block is free to access in userland
for pktHdr.getStatus()&unix.TP_STATUS_USER == 0 {

// Handle errors
if errno != 0 {
if errno == unix.EINTR {
continue
}
if errno == unix.EBADF {
return capture.ErrCaptureStopped
}
return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno))
}
// Run a PPOLL on the file descriptor (waiting for the block to become available)
efdHasEvent, errno := s.eventHandler.Poll(unix.POLLIN | unix.POLLERR)

// Handle rare cases of runaway packets
if s.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 {
s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL)
s.offset = (s.offset + 1) % int(s.tpReq.blockNr)
s.nextTPacketHeader()
// If an event was received, ensure that the respective error / code is returned
// immediately
if efdHasEvent {
return s.handleEvent()
}

// Handle potential PPOLL errors
if errno != 0 {
if errno == unix.EINTR {
continue
}
return handlePollError(errno)
}

// After fetching a new TPacketHeader, set the position of the first packet and the number of packets
// in this TPacketHeader
s.curTPacketHeader.ppos = s.curTPacketHeader.offsetToFirstPkt()
} else {

// If there is no next offset, release the TPacketHeader to the kernel and fetch a new one
nextPos := s.curTPacketHeader.nextOffset()
if nextPos == 0 {
s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL)
s.offset = (s.offset + 1) % int(s.tpReq.blockNr)
s.curTPacketHeader.data = nil
goto fetch
// Handle rare cases of runaway packets (this call will advance to the next block
// as a side effect in case of a detection)
if s.hasRunawayBlock() {
continue
}

// Update position of next packet
s.curTPacketHeader.ppos += nextPos
}

// Set the position of the first packet in this block and jump to end
pktHdr.ppos = pktHdr.offsetToFirstPkt()

finalize:

// Apply filter (if any)
if filter := s.link.FilterMask(); filter > 0 && filter&s.curTPacketHeader.packetType() != 0 {
goto fetch
if s.filter > 0 && s.filter&pktHdr.data[pktHdr.ppos+58] != 0 {
goto retry
}

return nil
Expand All @@ -385,9 +409,10 @@ func (s *Source) handleEvent() error {
return fmt.Errorf("error reading event: %w", err)
}

// Set the bypass marker to allow for re-entry in nextPacket() where we left off if
// Unset the current block data to allow for re-entry in nextPacket[ZeroCopy]() where we left off if
// required (e.g. on ErrCaptureUnblock)
s.unblocked = true
s.curTPacketHeader.data = nil

if efdData[7] > 0 {
return capture.ErrCaptureStopped
}
Expand Down Expand Up @@ -424,3 +449,10 @@ func setupRingBuffer(sd socket.FileDescriptor, tPacketReq tPacketRequest) ([]byt

return buf, eventFD, nil
}

func handlePollError(errno unix.Errno) error {
if errno == unix.EBADF {
return capture.ErrCaptureStopped
}
return fmt.Errorf("error polling for next packet: %w (errno %d)", errno, int(errno))
}
Loading

0 comments on commit eeaf39e

Please sign in to comment.