Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tpacket tx-ring support #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -6,6 +6,7 @@
# Folders
_obj
_test
.vscode

# Architecture specific extensions/prefixes
*.[568vq]
Expand Down
269 changes: 214 additions & 55 deletions afpacket/afpacket.go
Expand Up @@ -49,6 +49,9 @@ var ErrPoll = errors.New("packet poll failed")
// ErrTimeout returned on poll timeout
var ErrTimeout = errors.New("packet poll timeout expired")

// ErrFill returned on filling packet to tx ring
var ErrFill = errors.New("no available frames for filling packet")

// AncillaryVLAN structures are used to pass the captured VLAN
// as ancillary data via CaptureInfo.
type AncillaryVLAN struct {
Expand Down Expand Up @@ -103,17 +106,28 @@ type TPacket struct {
stats Stats
// fd is the C file descriptor.
fd int
// ring points to the memory space of the ring buffer shared by tpacket and the kernel.
// ring points to the memory space which is a continuous buffer for both rx and tx ring.
ring []byte
// rawring is the unsafe pointer that we use to poll for packets
rawring unsafe.Pointer
// rxRing points to the memory space of the rx ring buffer shared by tpacket and the kernel.
rxRing []byte
// rawRxRing is the unsafe pointer that we use to poll for incoming packets
rawRxRing unsafe.Pointer
// txRing points to the memory space of the tx ring buffer shared by tpacket and the kernel.
txRing []byte
// rawTxRing is the unsafe pointer that we use to fill outgoing packets
rawTxRing unsafe.Pointer
// opts contains read-only options for the TPacket object.
opts options
mu sync.Mutex // guards below
// offset is the offset into the ring of the current header.
offset int
// current is the current header.
current header
// guard rx
rxMu sync.Mutex
// rxOffset is the offset into the rx ring of the current header.
rxOffset int
// rxCurrent is the current header for rx.
rxCurrent header
// guards tx
txMu sync.Mutex
// txOffset is the offset into the tx ring of the current header.
txOffset int
// shouldReleasePacket is set to true whenever we return packet data, to make sure we remember to release that data back to the kernel.
shouldReleasePacket bool
// headerNextNeeded is set to true when header need to move to the next packet. No need to move it case of poll error.
Expand All @@ -123,7 +137,9 @@ type TPacket struct {
// Hackity hack hack hack. We need to return a pointer to the header with
// getTPacketHeader, and we don't want to allocate a v3wrapper every time,
// so we leave it in the TPacket object and return a pointer to it.
v3 v3wrapper
v3Rx v3wrapper
// v3Tx holds pointer to frame in tx-ring for transmitting packet
v3Tx v3wrapper

statsMu sync.Mutex // guards stats below
// socketStats contains stats from the socket
Expand Down Expand Up @@ -177,38 +193,83 @@ func (h *TPacket) setRequestedTPacketVersion() error {

// setUpRing sets up the shared-memory ring buffer between the user process and the kernel.
func (h *TPacket) setUpRing() (err error) {
totalSize := int(h.opts.framesPerBlock * h.opts.numBlocks * h.opts.frameSize)
totalRxSize := int(h.opts.rxRing.framesPerBlock * h.opts.rxRing.NumBlocks * h.opts.rxRing.FrameSize)
totalTxSize := int(h.opts.txRing.framesPerBlock * h.opts.txRing.NumBlocks * h.opts.txRing.FrameSize)
totalSize := totalRxSize + totalTxSize

switch h.tpVersion {
case TPacketVersion1, TPacketVersion2:
var tp C.struct_tpacket_req
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring: %v", err)
// setup rx ring
if h.opts.rxRing.NumBlocks > 0 {
var tp C.struct_tpacket_req
tp.tp_block_size = C.uint(h.opts.rxRing.BlockSize)
tp.tp_block_nr = C.uint(h.opts.rxRing.NumBlocks)
tp.tp_frame_size = C.uint(h.opts.rxRing.FrameSize)
tp.tp_frame_nr = C.uint(h.opts.rxRing.framesPerBlock * h.opts.rxRing.NumBlocks)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring: %v", err)
}
}

// setup tx ring
if h.opts.txRing.NumBlocks > 0 {
var tp C.struct_tpacket_req
tp.tp_block_size = C.uint(h.opts.txRing.BlockSize)
tp.tp_block_nr = C.uint(h.opts.txRing.NumBlocks)
tp.tp_frame_size = C.uint(h.opts.txRing.FrameSize)
tp.tp_frame_nr = C.uint(h.opts.txRing.framesPerBlock * h.opts.txRing.NumBlocks)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_TX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_tx_ring: %v", err)
}
}

case TPacketVersion3:
var tp C.struct_tpacket_req3
tp.tp_block_size = C.uint(h.opts.blockSize)
tp.tp_block_nr = C.uint(h.opts.numBlocks)
tp.tp_frame_size = C.uint(h.opts.frameSize)
tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
// setup v3 rx ring
if h.opts.rxRing.NumBlocks > 0 {
var tp C.struct_tpacket_req3
tp.tp_block_size = C.uint(h.opts.rxRing.BlockSize)
tp.tp_block_nr = C.uint(h.opts.rxRing.NumBlocks)
tp.tp_frame_size = C.uint(h.opts.rxRing.FrameSize)
tp.tp_frame_nr = C.uint(h.opts.rxRing.framesPerBlock * h.opts.rxRing.NumBlocks)
tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
}
}

// setup v3 tx ring
if h.opts.txRing.NumBlocks > 0 {
var tp C.struct_tpacket_req3
tp.tp_block_size = C.uint(h.opts.txRing.BlockSize)
tp.tp_block_nr = C.uint(h.opts.txRing.NumBlocks)
tp.tp_frame_size = C.uint(h.opts.txRing.FrameSize)
tp.tp_frame_nr = C.uint(h.opts.txRing.framesPerBlock * h.opts.txRing.NumBlocks)
tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_TX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
}
}

default:
return errors.New("invalid tpVersion")
}

// https://www.kernel.org/doc/html/latest/networking/packet_mmap.html
// To use one socket for capture and transmission, the mapping of both the RX and TX buffer ring
// has to be done with one call to mmap
h.ring, err = unix.Mmap(h.fd, 0, totalSize, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
if err != nil {
return err
}
if h.ring == nil {
return errors.New("no ring")
}
h.rawring = unsafe.Pointer(&h.ring[0])
h.rxRing = h.ring[:totalRxSize]
h.rawRxRing = unsafe.Pointer(&h.rxRing[0])

h.txRing = h.ring[totalRxSize:]
h.rawTxRing = unsafe.Pointer(&h.txRing[0])

return nil
}

Expand All @@ -221,6 +282,8 @@ func (h *TPacket) Close() {
unix.Munmap(h.ring)
}
h.ring = nil
h.rxRing = nil
h.txRing = nil
unix.Close(h.fd)
h.fd = -1
runtime.SetFinalizer(h, nil)
Expand All @@ -236,7 +299,13 @@ func NewTPacket(opts ...interface{}) (h *TPacket, err error) {
if h.opts, err = parseOptions(opts...); err != nil {
return nil, err
}
fd, err := unix.Socket(unix.AF_PACKET, int(h.opts.socktype), int(htons(unix.ETH_P_ALL)))
var proto int = int(htons(unix.ETH_P_ALL))
if h.opts.rxRing.NumBlocks == 0 {
// only created for transmission, set protocol to zero to avoid an expensive call
// to packet_rcv() in kernel
proto = 0
}
fd, err := unix.Socket(unix.AF_PACKET, int(h.opts.socktype), proto)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -274,8 +343,8 @@ func (h *TPacket) SetBPF(filter []bpf.RawInstruction) error {
}

func (h *TPacket) releaseCurrentPacket() error {
h.current.clearStatus()
h.offset++
h.rxCurrent.clearStatus()
h.rxOffset++
h.shouldReleasePacket = false
return nil
}
Expand All @@ -292,35 +361,35 @@ func (h *TPacket) releaseCurrentPacket() error {
// // do everything you want with data1 here, copying bytes out of it if you'd like to keep them around.
// data2, _, _ := tp.ZeroCopyReadPacketData() // invalidates bytes in data1
func (h *TPacket) ZeroCopyReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
h.mu.Lock()
h.rxMu.Lock()
retry:
if h.current == nil || !h.headerNextNeeded || !h.current.next() {
if h.rxCurrent == nil || !h.headerNextNeeded || !h.rxCurrent.next() {
if h.shouldReleasePacket {
h.releaseCurrentPacket()
}
h.current = h.getTPacketHeader()
if err = h.pollForFirstPacket(h.current); err != nil {
h.rxCurrent = h.getTPacketHeader(true)
if err = h.pollForFirstPacket(h.rxCurrent); err != nil {
h.headerNextNeeded = false
h.mu.Unlock()
h.rxMu.Unlock()
return
}
// We received an empty block
if h.current.getLength() == 0 {
if h.rxCurrent.getLength() == 0 {
goto retry
}
}
data = h.current.getData(&h.opts)
ci.Timestamp = h.current.getTime()
data = h.rxCurrent.getData(&h.opts)
ci.Timestamp = h.rxCurrent.getTime()
ci.CaptureLength = len(data)
ci.Length = h.current.getLength()
ci.InterfaceIndex = h.current.getIfaceIndex()
vlan := h.current.getVLAN()
ci.Length = h.rxCurrent.getLength()
ci.InterfaceIndex = h.rxCurrent.getIfaceIndex()
vlan := h.rxCurrent.getVLAN()
if vlan >= 0 {
ci.AncillaryData = append(ci.AncillaryData, AncillaryVLAN{vlan})
}
atomic.AddInt64(&h.stats.Packets, 1)
h.headerNextNeeded = true
h.mu.Unlock()
h.rxMu.Unlock()

return
}
Expand Down Expand Up @@ -422,28 +491,53 @@ func (h *TPacket) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err er
return
}

func (h *TPacket) getTPacketHeader() header {
func (h *TPacket) getTPacketHeader(rx bool) header {
var position uintptr
switch h.tpVersion {
case TPacketVersion1:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
if rx {
if h.rxOffset >= h.opts.rxRing.framesPerBlock*h.opts.rxRing.NumBlocks {
h.rxOffset = 0
}
position = uintptr(h.rawRxRing) + uintptr(h.opts.rxRing.FrameSize*h.rxOffset)
} else {
if h.txOffset >= h.opts.txRing.framesPerBlock*h.opts.txRing.NumBlocks {
h.txOffset = 0
}
position = uintptr(h.rawTxRing) + uintptr(h.opts.txRing.FrameSize*h.txOffset)
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
return (*v1header)(unsafe.Pointer(position))
case TPacketVersion2:
if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
h.offset = 0
if rx {
if h.rxOffset >= h.opts.rxRing.framesPerBlock*h.opts.rxRing.NumBlocks {
h.rxOffset = 0
}
position = uintptr(h.rawRxRing) + uintptr(h.opts.rxRing.FrameSize*h.rxOffset)
} else {
if h.txOffset >= h.opts.txRing.framesPerBlock*h.opts.txRing.NumBlocks {
h.txOffset = 0
}
position = uintptr(h.rawTxRing) + uintptr(h.opts.txRing.FrameSize*h.txOffset)
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
return (*v2header)(unsafe.Pointer(position))
case TPacketVersion3:
// TPacket3 uses each block to return values, instead of each frame. Hence we need to rotate when we hit #blocks, not #frames.
if h.offset >= h.opts.numBlocks {
h.offset = 0
if rx {
if h.rxOffset >= h.opts.rxRing.NumBlocks {
h.rxOffset = 0
}
position = uintptr(h.rawRxRing) + uintptr(h.opts.rxRing.FrameSize*h.rxOffset*h.opts.rxRing.framesPerBlock)
h.v3Rx = initV3Wrapper(unsafe.Pointer(position), true)
return &h.v3Rx
} else {
if h.txOffset >= h.opts.txRing.framesPerBlock*h.opts.txRing.NumBlocks {
h.txOffset = 0
}
// NOTE: txOffset points to #frame in tx ring, instead of #block like rxOffset in rx ring
position = uintptr(h.rawTxRing) + uintptr(h.opts.txRing.FrameSize*h.txOffset)
h.v3Tx = initV3Wrapper(unsafe.Pointer(position), false)
return &h.v3Tx
}
position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset*h.opts.framesPerBlock)
h.v3 = initV3Wrapper(unsafe.Pointer(position))
return &h.v3
}
panic("handle tpacket version is invalid")
}
Expand Down Expand Up @@ -503,8 +597,8 @@ const (
// packets between them. The same should work for multiple TPacket objects within
// the same process.
func (h *TPacket) SetFanout(t FanoutType, id uint16) error {
h.mu.Lock()
defer h.mu.Unlock()
h.rxMu.Lock()
defer h.rxMu.Unlock()
arg := C.int(t) << 16
arg |= C.int(id)
return setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_FANOUT, unsafe.Pointer(&arg), unsafe.Sizeof(arg))
Expand All @@ -515,3 +609,68 @@ func (h *TPacket) WritePacketData(pkt []byte) error {
_, err := unix.Write(h.fd, pkt)
return err
}

// BypassQdisc enables bypassing qdisc layer and forces pushing packets to driver directly.
func (h *TPacket) BypassQdisc() error {
h.txMu.Lock()
defer h.txMu.Unlock()
v := C.int(1)
return setsockopt(h.fd, unix.SOL_SOCKET, unix.PACKET_QDISC_BYPASS, unsafe.Pointer(&v), unsafe.Sizeof(v))
}

// Fill a packet's data to the tx ring
func (h *TPacket) FillPacketData(pkt []byte) error {
h.txMu.Lock()
defer h.txMu.Unlock()

for h.txOffset = 0; h.txOffset < h.opts.txRing.framesPerBlock*h.opts.txRing.NumBlocks; h.txOffset++ {
// find an available frame for holding the packet data
txCurrent := h.getTPacketHeader(false)
if txCurrent.getStatus() != unix.TP_STATUS_AVAILABLE {
continue
}

// put the packet data into the frame
txCurrent.putData(pkt)

// call sendto in non-blocking mode to notify kernel to send the packet to output device
unix.Sendto(h.fd, nil, unix.MSG_DONTWAIT, nil)

return nil
}

// not found an available frame to fill the packet to tx ring
return ErrFill
}

// Fill a packet to the TX ring
func (h *TPacket) FillPacket(pkt gopacket.Packet) error {
return h.FillPacketData(pkt.Data())
}

// Fill multiple packets to the TX ring at a time
func (h *TPacket) FillPackets(pkts []gopacket.Packet) (numFilled int) {
if len(pkts) == 0 {
return
}

h.txMu.Lock()
defer h.txMu.Unlock()

// iterate frames in tx ring to find available slots for holding the packet data
for h.txOffset = 0; h.txOffset < h.opts.txRing.framesPerBlock*h.opts.txRing.NumBlocks && numFilled < len(pkts); h.txOffset++ {
txCurrent := h.getTPacketHeader(false)
if txCurrent.getStatus() != unix.TP_STATUS_AVAILABLE {
continue
}

// put the packet data into the frame
pkt := pkts[numFilled]
txCurrent.putData(pkt.Data())
}

// call sendto in non-blocking mode to notify kernel to send the packet to output device
unix.Sendto(h.fd, nil, unix.MSG_DONTWAIT, nil)

return
}