Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions pkg/tunnel/connection/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ type Pipe struct {
writeRing mpmc.RingBuffer[[]byte]
ctx context.Context
cancel context.CancelFunc
}

var bufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 2048) // Adjust size if needed
return &b
},
bufPool *sync.Pool
}

// NewPipe creates a pair of connected Pipe instances for bidirectional communication.
func NewPipe(ctx context.Context) (*Pipe, *Pipe) {
// Note: I have seen packet loss on ARM64 platforms, I believe this is due to the
// weaker memory model of ARM64, we should really dig into this, but for now
// dropping 0.001% of packets is not a big deal, we can just retry.
func NewPipe(ctx context.Context, mtu int) (*Pipe, *Pipe) {
bufPool := sync.Pool{
New: func() interface{} {
b := make([]byte, mtu)
return &b
},
}

ringAtoB := ringbuf.New[[]byte](1024)
ringBtoA := ringbuf.New[[]byte](1024)

Expand All @@ -38,12 +42,14 @@ func NewPipe(ctx context.Context) (*Pipe, *Pipe) {
writeRing: ringAtoB,
ctx: ctx,
cancel: cancel,
bufPool: &bufPool,
}
pipeB := &Pipe{
readRing: ringAtoB,
writeRing: ringBtoA,
ctx: ctx,
cancel: cancel,
bufPool: &bufPool,
}

return pipeA, pipeB
Expand All @@ -66,7 +72,6 @@ func (p *Pipe) ReadPacket(buf []byte) (int, error) {
// Has the context been cancelled?
select {
case <-p.ctx.Done():
bufPool.Put(&item)
return 0, errors.New("pipe closed")
default:
// Continue to try to dequeue
Expand All @@ -78,7 +83,7 @@ func (p *Pipe) ReadPacket(buf []byte) (int, error) {
break
}
n := copy(buf, item)
bufPool.Put(&item)
p.bufPool.Put(&item)
return n, nil
}
}
Expand All @@ -89,11 +94,8 @@ func (p *Pipe) WritePacket(b []byte) ([]byte, error) {
case <-p.ctx.Done():
return nil, errors.New("pipe closed")
default:
bufPtr := bufPool.Get().(*[]byte)
bufPtr := p.bufPool.Get().(*[]byte)
buf := *bufPtr
if cap(buf) < len(b) {
buf = make([]byte, len(b))
}
buf = buf[:len(b)]
copy(buf, b)

Expand All @@ -106,7 +108,6 @@ func (p *Pipe) WritePacket(b []byte) ([]byte, error) {
// Has the context been cancelled?
select {
case <-p.ctx.Done():
bufPool.Put(&buf)
return nil, errors.New("pipe closed")
default:
// Continue to try to enqueue
Expand Down
47 changes: 37 additions & 10 deletions pkg/tunnel/connection/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,73 @@ package connection_test

import (
"bytes"
"sync/atomic"
"testing"
"time"

"github.com/apoxy-dev/apoxy-cli/pkg/netstack"
"github.com/apoxy-dev/apoxy-cli/pkg/tunnel/connection"
)

func TestPipeThroughput(t *testing.T) {
const (
packetSize = 1024 // 1 KB per packet
numPackets = 1_000_000 // Total packets to send
packetSize = netstack.IPv6MinMTU
numPackets = 10_000_000
)

p1, p2 := connection.NewPipe(t.Context())
p1, p2 := connection.NewPipe(t.Context(), packetSize)

payload := bytes.Repeat([]byte("X"), packetSize)
buf := make([]byte, packetSize)

done := make(chan struct{})
var bytesTransferred int64
var packetsTransferred int64

// Reader goroutine
go func() {
for i := 0; i < numPackets; i++ {
if _, err := p2.ReadPacket(buf); err != nil {
t.Fatal(err)
select {
case <-t.Context().Done():
default:
t.Fatalf("Read error: %v", err)
}
return
}
atomic.AddInt64(&bytesTransferred, int64(packetSize))
atomic.AddInt64(&packetsTransferred, 1)
}
close(done)
}()

// Reporter goroutine
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

go func(startTime time.Time) {
lastTransferred := int64(0)
for range ticker.C {
currentTransferred := atomic.LoadInt64(&bytesTransferred)
bytesThisSecond := currentTransferred - lastTransferred
lastTransferred = currentTransferred

throughputGbps := (float64(bytesThisSecond*8) / 1e9)
elapsed := time.Since(startTime).Truncate(time.Second)
t.Logf("[+%s] Throughput: %.2f Gbps", elapsed, throughputGbps)
t.Logf("[+%s] Packets: %d", elapsed, atomic.LoadInt64(&packetsTransferred))
}
}(time.Now())

start := time.Now()

// Writer loop
for i := 0; i < numPackets; i++ {
if _, err := p1.WritePacket(payload); err != nil {
t.Fatal(err)
}
}

<-done
duration := time.Since(start)

throughputGbps := (float64(packetSize*numPackets*8) / 1e9) / duration.Seconds()
t.Logf("Sent %d packets of %d bytes in %s", numPackets, packetSize, duration)
t.Logf("Throughput: %.2f Gbps", throughputGbps)
totalThroughputGbps := (float64(packetSize*numPackets*8) / 1e9) / duration.Seconds()
t.Logf("Total Throughput: %.2f Gbps", totalThroughputGbps)
}
44 changes: 44 additions & 0 deletions pkg/tunnel/fasttun/fasttun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Package fasttun implements a high-performance interface to Linux TUN devices
// with support for multi-queue and batched packet I/O.
package fasttun

import "io"

// Device represents a virtual TUN network interface.
// It provides methods to query device properties and create packet queues
// for reading and writing packets concurrently.
type Device interface {
io.Closer

// Name returns the name of the TUN device (e.g., "tun0").
Name() string

// MTU returns the device's Maximum Transmission Unit.
MTU() (int, error)

// BatchSize returns the recommended number of packets to process in one batch.
// This is useful for optimizing I/O performance.
BatchSize() int

// NewPacketQueue creates a new packet queue for the device.
// Each queue is associated with a file descriptor and can be used
// concurrently with others.
NewPacketQueue() (PacketQueue, error)
}

// PacketQueue represents a single queue for sending and receiving packets
// from a TUN device. It supports batch I/O for efficient packet processing.
type PacketQueue interface {
io.Closer

// Read reads packets into the provided buffer slices `pkts` and stores
// the size of each packet in `sizes`.
//
// It returns the number of packets successfully read and an error, if any.
// On timeout or no available packets, it may return (0, nil).
Read(pkts [][]byte, sizes []int) (n int, err error)

// Write writes the given packets to the TUN device.
// It returns the number of packets successfully written and an error, if any.
Write(pkts [][]byte) (int, error)
}
Loading