Skip to content

Commit 0f86e0f

Browse files
dilyevskyclaude
andcommitted
[tunnel] reduce hot-path allocations in packet processing pipeline
Remove per-packet slog.Debug calls that eagerly evaluated addr.String() even when debug logging was disabled, inline Geneve header check to avoid full deserialization on every packet, pre-allocate batch classification slices in the bifurcator, and use netip.Addr as sync.Map key in peerMACForIP to avoid ip.String() allocation on every lookup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e98a2b9 commit 0f86e0f

File tree

5 files changed

+79
-37
lines changed

5 files changed

+79
-37
lines changed

pkg/tunnel/bifurcate/bifurcate.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"net"
77
"sync"
88

9-
"github.com/apoxy-dev/icx/geneve"
109
"gvisor.dev/gvisor/pkg/tcpip/header"
1110

1211
"github.com/apoxy-dev/apoxy/pkg/tunnel/batchpc"
@@ -35,6 +34,9 @@ func Bifurcate(pc batchpc.BatchPacketConn) (batchpc.BatchPacketConn, batchpc.Bat
3534
msgs := make([]batchpc.Message, batchpc.MaxBatchSize)
3635
// Shadow array of pooled message pointers we own & recycle.
3736
pm := make([]*batchpc.Message, batchpc.MaxBatchSize)
37+
// Pre-allocated classification slices, reused via [:0] each iteration.
38+
gBatch := make([]*batchpc.Message, 0, batchpc.MaxBatchSize)
39+
oBatch := make([]*batchpc.Message, 0, batchpc.MaxBatchSize)
3840

3941
for {
4042
// If both sides are gone, stop.
@@ -86,9 +88,9 @@ func Bifurcate(pc batchpc.BatchPacketConn) (batchpc.BatchPacketConn, batchpc.Bat
8688
continue
8789
}
8890

89-
// Classify into destination batches (slices referencing pooled messages).
90-
gBatch := make([]*batchpc.Message, 0, n)
91-
oBatch := make([]*batchpc.Message, 0, n)
91+
// Classify into destination batches (reset pre-allocated slices).
92+
gBatch = gBatch[:0]
93+
oBatch = oBatch[:0]
9294

9395
for i := 0; i < n; i++ {
9496
m := pm[i]
@@ -134,24 +136,24 @@ func Bifurcate(pc batchpc.BatchPacketConn) (batchpc.BatchPacketConn, batchpc.Bat
134136
return geneveConn, otherConn
135137
}
136138

139+
// isGeneve performs a fast inline check of the Geneve header without full
140+
// deserialization. The fixed Geneve header is 8 bytes:
141+
//
142+
// byte 0: version (2 bits) | opt len (6 bits)
143+
// byte 1: flags (8 bits)
144+
// byte 2-3: protocol type (big-endian)
145+
// byte 4-7: VNI (24 bits) | reserved (8 bits)
137146
func isGeneve(b []byte) bool {
138-
var hdr geneve.Header
139-
_, err := hdr.UnmarshalBinary(b)
140-
if err != nil {
147+
if len(b) < 8 {
141148
return false
142149
}
143-
144150
// Only Geneve version 0 is defined.
145-
if hdr.Version != 0 {
151+
if b[0]>>6 != 0 {
146152
return false
147153
}
148-
149154
// Check for valid protocol types (IPv4 or IPv6) or EtherType 0 (mgmt / oob).
150-
if hdr.ProtocolType != uint16(header.IPv4ProtocolNumber) &&
151-
hdr.ProtocolType != uint16(header.IPv6ProtocolNumber) &&
152-
hdr.ProtocolType != 0 {
153-
return false
154-
}
155-
156-
return true
155+
proto := uint16(b[2])<<8 | uint16(b[3])
156+
return proto == uint16(header.IPv4ProtocolNumber) ||
157+
proto == uint16(header.IPv6ProtocolNumber) ||
158+
proto == 0
157159
}

pkg/tunnel/bifurcate/bifurcate_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,58 @@ func TestBifurcate_BubblesTransientErrorAndContinues(t *testing.T) {
218218
require.True(t, bytes.Equal(buf[:n], genevePkt), "expected geneve packet after transient error")
219219
}
220220

221+
func TestBifurcate_IsGeneveEdgeCases(t *testing.T) {
222+
remote := &net.UDPAddr{IP: net.IPv4(10, 1, 1, 1), Port: 9999}
223+
224+
tests := []struct {
225+
name string
226+
pkt []byte
227+
isGeneve bool
228+
}{
229+
{"too short (7 bytes)", []byte{0, 0, 0x08, 0x00, 0, 0, 0}, false},
230+
{"valid IPv4 proto", makeGeneveHeader(0, uint16(header.IPv4ProtocolNumber)), true},
231+
{"valid IPv6 proto", makeGeneveHeader(0, uint16(header.IPv6ProtocolNumber)), true},
232+
{"valid proto 0 (mgmt)", makeGeneveHeader(0, 0), true},
233+
{"invalid version 1", makeGeneveHeader(1, uint16(header.IPv4ProtocolNumber)), false},
234+
{"invalid version 3", makeGeneveHeader(3, uint16(header.IPv4ProtocolNumber)), false},
235+
{"unsupported proto 0xFFFF", makeGeneveHeader(0, 0xFFFF), false},
236+
{"empty", []byte{}, false},
237+
}
238+
239+
for _, tt := range tests {
240+
t.Run(tt.name, func(t *testing.T) {
241+
mockConn := newMockBatchPacketConn()
242+
mockConn.enqueue(tt.pkt, remote)
243+
244+
geneveConn, otherConn := bifurcate.Bifurcate(mockConn)
245+
defer geneveConn.Close()
246+
defer otherConn.Close()
247+
248+
buf := make([]byte, 1024)
249+
if tt.isGeneve {
250+
n, _, err := geneveConn.ReadFrom(buf)
251+
require.NoError(t, err)
252+
require.Equal(t, tt.pkt, buf[:n])
253+
} else {
254+
n, _, err := otherConn.ReadFrom(buf)
255+
require.NoError(t, err)
256+
require.Equal(t, tt.pkt, buf[:n])
257+
}
258+
})
259+
}
260+
}
261+
262+
// makeGeneveHeader builds a minimal 8-byte Geneve header with given version and protocol type.
263+
func makeGeneveHeader(version uint8, proto uint16) []byte {
264+
buf := make([]byte, 8)
265+
buf[0] = version << 6 // version in top 2 bits, optLen=0
266+
buf[1] = 0 // flags
267+
buf[2] = byte(proto >> 8)
268+
buf[3] = byte(proto)
269+
// bytes 4-7: VNI + reserved (zeros)
270+
return buf
271+
}
272+
221273
func createGenevePacket(t *testing.T) []byte {
222274
h := geneve.Header{
223275
Version: 0,

pkg/tunnel/connection/muxed_conn.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ func (m *muxedConn) readFromConn(src netip.Prefix, conn Connection) {
7979
continue
8080
}
8181

82-
slog.Debug("Read packet from connection", slog.Int("bytes", n))
83-
8482
*pkt = (*pkt)[:n]
8583
select {
8684
case m.incomingPackets <- pkt:
@@ -242,8 +240,6 @@ func (m *muxedConn) writeToConn(addr netip.Addr, pkt []byte) []byte {
242240
return nil
243241
}
244242

245-
slog.Debug("Writing packet to connection", slog.String("ip", addr.String()), slog.String("conn", conn.String()))
246-
247243
metrics.TunnelPacketsSent.Inc()
248244
metrics.TunnelBytesSent.Add(float64(len(pkt)))
249245

@@ -284,8 +280,6 @@ func (m *SrcMuxedConn) WritePacket(pkt []byte) ([]byte, error) {
284280
return nil, net.ErrClosed
285281
}
286282

287-
slog.Debug("Write packet to connection", slog.Int("bytes", len(pkt)))
288-
289283
var srcAddr netip.Addr
290284
switch pkt[0] >> 4 {
291285
case 6:
@@ -310,8 +304,6 @@ func (m *SrcMuxedConn) WritePacket(pkt []byte) ([]byte, error) {
310304
return nil, fmt.Errorf("unknown packet type: %d", pkt[0]>>4)
311305
}
312306

313-
slog.Debug("Packet source", slog.String("ip", srcAddr.String()))
314-
315307
return m.writeToConn(srcAddr, pkt), nil
316308
}
317309

@@ -339,8 +331,6 @@ func (m *DstMuxedConn) WritePacket(pkt []byte) ([]byte, error) {
339331
return nil, net.ErrClosed
340332
}
341333

342-
slog.Debug("Write packet to connection", slog.Int("bytes", len(pkt)))
343-
344334
var dstAddr netip.Addr
345335
switch pkt[0] >> 4 {
346336
case 6:
@@ -365,7 +355,5 @@ func (m *DstMuxedConn) WritePacket(pkt []byte) ([]byte, error) {
365355
return nil, fmt.Errorf("unknown packet type: %d", pkt[0]>>4)
366356
}
367357

368-
slog.Debug("Packet destination", slog.String("ip", dstAddr.String()))
369-
370358
return m.writeToConn(dstAddr, pkt), nil
371359
}

pkg/tunnel/connection/splice.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
110110

111111
for i := 0; i < n; i++ {
112112
packetData := pkts[i][:sizes[i]]
113-
slog.Debug("Read packet from TUN", slog.Int("len", sizes[i]))
114113

115114
// Notify observer if set
116115
if config.observer != nil {
@@ -167,8 +166,6 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
167166
return fmt.Errorf("failed to read from connection: %w", err)
168167
}
169168

170-
slog.Debug("Read packet from connection", slog.Int("len", n))
171-
172169
// Notify observer if set
173170
if config.observer != nil && n > 0 {
174171
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:tunOffset+n], DirectionInbound))
@@ -217,8 +214,6 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
217214
}
218215
}
219216

220-
slog.Debug("Write packet to TUN", slog.Int("batch_size", batchCount))
221-
222217
if _, err := tunDev.Write(pkts[:batchCount], tunOffset); err != nil {
223218
if strings.Contains(err.Error(), "closed") {
224219
slog.Debug("TUN device closed")

pkg/tunnel/l2pc/l2pc.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,14 +250,19 @@ func (c *L2PacketConn) LocalMAC() tcpip.LinkAddress {
250250
// peerMACForIP returns a cached random, locally-administered unicast MAC
251251
// for the given remote IP, creating it on first use.
252252
func (c *L2PacketConn) peerMACForIP(ip net.IP) tcpip.LinkAddress {
253-
key := ip.String()
254-
if v, ok := c.peerMACCache.Load(key); ok {
253+
// Use netip.Addr as map key to avoid ip.String() allocation on every lookup.
254+
addr, ok := netip.AddrFromSlice(ip)
255+
if !ok {
256+
// Fallback for malformed IPs — should not happen in practice.
257+
addr = netip.Addr{}
258+
}
259+
if v, ok := c.peerMACCache.Load(addr); ok {
255260
return v.(tcpip.LinkAddress)
256261
}
257262
// Create and publish a new random MAC.
258263
newMAC := tcpip.GetRandMacAddr()
259264
// Use LoadOrStore to avoid races/duplication.
260-
if v, loaded := c.peerMACCache.LoadOrStore(key, newMAC); loaded {
265+
if v, loaded := c.peerMACCache.LoadOrStore(addr, newMAC); loaded {
261266
return v.(tcpip.LinkAddress)
262267
}
263268
return newMAC

0 commit comments

Comments
 (0)