Skip to content

Commit 0187328

Browse files
dilyevskyclaude
andcommitted
[tunnel] eliminate per-packet copy in muxedConn ReadPacket path
Add tunOffset headroom to muxedConn pooled buffers so Splice can receive them directly via readPacketDirect/putPacketBuffer without an extra copy. Splice detects this via interface assertion and uses the zero-copy path, eliminating both the copy and its own redundant sync.Pool. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0da1bed commit 0187328

File tree

2 files changed

+92
-42
lines changed

2 files changed

+92
-42
lines changed

pkg/tunnel/connection/muxed_conn.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,30 @@ type muxedConn struct {
2626
incomingPackets chan *[]byte
2727
packetBufferPool sync.Pool
2828

29+
// headroom is the number of bytes reserved before packet data in pooled
30+
// buffers. This allows callers using readPacketDirect to receive buffers
31+
// with pre-allocated headroom (e.g. for TUN transport headers), avoiding
32+
// an extra copy.
33+
headroom int
34+
2935
closeOnce sync.Once
3036
closed atomic.Bool
3137
}
3238

39+
const defaultMTU = 1500
40+
3341
// newMuxedConn creates a new *muxedConn.
3442
func newMuxedConn() *muxedConn {
43+
headroom := tunOffset
44+
bufSize := headroom + defaultMTU
3545
return &muxedConn{
3646
conns: iptrie.NewTrie(),
3747
prefixes: make(map[netip.Prefix]Connection),
3848
incomingPackets: make(chan *[]byte, 10000),
49+
headroom: headroom,
3950
packetBufferPool: sync.Pool{
4051
New: func() interface{} {
41-
b := make([]byte, 1500)
52+
b := make([]byte, bufSize)
4253
return &b
4354
},
4455
},
@@ -48,10 +59,10 @@ func newMuxedConn() *muxedConn {
4859
func (m *muxedConn) readFromConn(src netip.Prefix, conn Connection) {
4960
for {
5061
pkt := m.packetBufferPool.Get().(*[]byte)
51-
// Reset the buffer to its original size.
62+
// Reset the buffer to its full capacity.
5263
*pkt = (*pkt)[:cap(*pkt)]
5364

54-
n, err := conn.ReadPacket(*pkt)
65+
n, err := conn.ReadPacket((*pkt)[m.headroom:])
5566
if err != nil {
5667
// If the connection is closed, remove it from the multiplexer and quit
5768
// the read loop. Otherwise, treat it as transient error and just log it.
@@ -79,7 +90,7 @@ func (m *muxedConn) readFromConn(src netip.Prefix, conn Connection) {
7990
continue
8091
}
8192

82-
*pkt = (*pkt)[:n]
93+
*pkt = (*pkt)[:n+m.headroom]
8394
select {
8495
case m.incomingPackets <- pkt:
8596
default: // Channel is closed or full, return the buffer to the pool
@@ -209,13 +220,35 @@ func (m *muxedConn) ReadPacket(pkt []byte) (int, error) {
209220
return 0, net.ErrClosed
210221
}
211222

212-
n := copy(pkt, *p)
223+
n := copy(pkt, (*p)[m.headroom:])
213224

214225
m.packetBufferPool.Put(p)
215226

216227
return n, nil
217228
}
218229

230+
// readPacketDirect returns the next packet's pooled buffer directly, avoiding
231+
// a copy. Packet data starts at buf[m.headroom:]. The caller must call
232+
// putPacketBuffer when done with the buffer.
233+
func (m *muxedConn) readPacketDirect() (*[]byte, error) {
234+
if m.closed.Load() {
235+
return nil, net.ErrClosed
236+
}
237+
238+
p, ok := <-m.incomingPackets
239+
if !ok {
240+
return nil, net.ErrClosed
241+
}
242+
243+
return p, nil
244+
}
245+
246+
// putPacketBuffer returns a buffer obtained from readPacketDirect to the pool.
247+
func (m *muxedConn) putPacketBuffer(p *[]byte) {
248+
*p = (*p)[:cap(*p)]
249+
m.packetBufferPool.Put(p)
250+
}
251+
219252
func (m *muxedConn) writeToConn(addr netip.Addr, pkt []byte) []byte {
220253
if !addr.IsValid() || !addr.IsGlobalUnicast() {
221254
slog.Warn("Invalid IP", slog.String("ip", addr.String()))

pkg/tunnel/connection/splice.go

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,39 @@ const (
2121
tunOffset = device.MessageTransportHeaderSize
2222
)
2323

24-
// SpliceConfig holds configuration options for splice operations
24+
// SpliceConfig holds configuration options for splice operations.
2525
type SpliceConfig struct {
2626
recalculateChecksum bool
2727
verifyChecksum bool
2828
logChecksumErrors bool
2929
observer PacketObserver
3030
}
3131

32-
// SpliceOption is a function that configures splice behavior
32+
// SpliceOption is a function that configures splice behavior.
3333
type SpliceOption func(*SpliceConfig)
3434

35-
// WithChecksumRecalculation enables TCP checksum recalculation
35+
// WithChecksumRecalculation enables TCP checksum recalculation.
3636
func WithChecksumRecalculation() SpliceOption {
3737
return func(c *SpliceConfig) {
3838
c.recalculateChecksum = true
3939
}
4040
}
4141

42-
// WithChecksumVerification enables TCP checksum verification (for debugging)
42+
// WithChecksumVerification enables TCP checksum verification (for debugging).
4343
func WithChecksumVerification() SpliceOption {
4444
return func(c *SpliceConfig) {
4545
c.verifyChecksum = true
4646
}
4747
}
4848

49-
// WithChecksumErrorLogging enables detailed logging of checksum errors
49+
// WithChecksumErrorLogging enables detailed logging of checksum errors.
5050
func WithChecksumErrorLogging() SpliceOption {
5151
return func(c *SpliceConfig) {
5252
c.logChecksumErrors = true
5353
}
5454
}
5555

56-
// WithPacketObserver sets a packet observer for traffic monitoring
56+
// WithPacketObserver sets a packet observer for traffic monitoring.
5757
func WithPacketObserver(obs PacketObserver) SpliceOption {
5858
return func(c *SpliceConfig) {
5959
c.observer = obs
@@ -111,12 +111,10 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
111111
for i := 0; i < n; i++ {
112112
packetData := pkts[i][:sizes[i]]
113113

114-
// Notify observer if set
115114
if config.observer != nil {
116115
config.observer.OnPacket(ExtractPacketInfo(packetData, DirectionOutbound))
117116
}
118117

119-
// Recalculate TCP checksum if enabled and needed
120118
if config.recalculateChecksum {
121119
if err := recalculateChecksumIfNeeded(packetData, config); err != nil {
122120
slog.Debug("Failed to recalculate checksum", slog.Any("error", err))
@@ -142,15 +140,29 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
142140
})
143141

144142
// Connection -> TUN path
143+
//
144+
// When the connection is a muxedConn with matching headroom, we use
145+
// readPacketDirect to receive pooled buffers without an extra copy.
146+
// Otherwise we fall back to the standard ReadPacket-into-local-pool path.
147+
type directReader interface {
148+
readPacketDirect() (*[]byte, error)
149+
putPacketBuffer(*[]byte)
150+
}
151+
dr, zeroCopy := conn.(directReader)
152+
145153
g.Go(func() error {
146154
defer func() {
147155
slog.Debug("Stopped reading from connection")
148156
}()
149157

150-
var pktPool = sync.Pool{
151-
New: func() any {
152-
return ptr.To(make([]byte, netstack.IPv6MinMTU+tunOffset))
153-
},
158+
// Fallback pool only allocated when zero-copy is not available.
159+
var pktPool *sync.Pool
160+
if !zeroCopy {
161+
pktPool = &sync.Pool{
162+
New: func() any {
163+
return ptr.To(make([]byte, netstack.IPv6MinMTU+tunOffset))
164+
},
165+
}
154166
}
155167

156168
pktCh := make(chan *[]byte, batchSize)
@@ -159,26 +171,33 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
159171
defer close(pktCh)
160172

161173
for {
162-
pkt := pktPool.Get().(*[]byte)
163-
n, err := conn.ReadPacket((*pkt)[tunOffset:])
164-
if err != nil {
165-
slog.Error("Failed to read from connection", slog.Any("error", err))
166-
return fmt.Errorf("failed to read from connection: %w", err)
174+
var pkt *[]byte
175+
if zeroCopy {
176+
var err error
177+
pkt, err = dr.readPacketDirect()
178+
if err != nil {
179+
slog.Error("Failed to read from connection", slog.Any("error", err))
180+
return fmt.Errorf("failed to read from connection: %w", err)
181+
}
182+
} else {
183+
pkt = pktPool.Get().(*[]byte)
184+
*pkt = (*pkt)[:cap(*pkt)]
185+
n, err := conn.ReadPacket((*pkt)[tunOffset:])
186+
if err != nil {
187+
slog.Error("Failed to read from connection", slog.Any("error", err))
188+
return fmt.Errorf("failed to read from connection: %w", err)
189+
}
190+
*pkt = (*pkt)[:n+tunOffset]
167191
}
168192

169-
// Notify observer if set
170-
if config.observer != nil && n > 0 {
171-
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:tunOffset+n], DirectionInbound))
193+
if config.observer != nil && len(*pkt) > tunOffset {
194+
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:], DirectionInbound))
172195
}
173196

174-
*pkt = (*pkt)[:n+tunOffset]
175-
176-
// Recalculate TCP checksum if enabled and needed
177197
if config.recalculateChecksum {
178198
packetData := (*pkt)[tunOffset:]
179199
if err := recalculateChecksumIfNeeded(packetData, config); err != nil {
180200
slog.Debug("Failed to recalculate checksum on incoming packet", slog.Any("error", err))
181-
// Continue processing - not all packets are TCP
182201
}
183202
}
184203

@@ -225,8 +244,12 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
225244
}
226245

227246
for i := 0; i < batchCount; i++ {
228-
pkt := pkts[i][:cap(pkts[i])]
229-
pktPool.Put(&pkt)
247+
p := pkts[i][:cap(pkts[i])]
248+
if zeroCopy {
249+
dr.putPacketBuffer(&p)
250+
} else {
251+
pktPool.Put(&p)
252+
}
230253
}
231254
}
232255
}
@@ -247,17 +270,14 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
247270
return nil
248271
}
249272

250-
// recalculateChecksumIfNeeded recalculates TCP checksum if the packet is TCP
273+
// recalculateChecksumIfNeeded recalculates TCP checksum if the packet is TCP.
251274
func recalculateChecksumIfNeeded(packetData []byte, config *SpliceConfig) error {
252275
if len(packetData) < 20 {
253-
return nil // Packet too short to be IP
276+
return nil // Packet too short to be an IP packet.
254277
}
255278

256-
// Check if this is a TCP packet
257-
version := packetData[0] >> 4
258279
isTCP := false
259-
260-
switch version {
280+
switch packetData[0] >> 4 {
261281
case 4:
262282
if len(packetData) >= 20 {
263283
ihl := int(packetData[0]&0x0F) * 4
@@ -270,14 +290,12 @@ func recalculateChecksumIfNeeded(packetData []byte, config *SpliceConfig) error
270290
isTCP = true
271291
}
272292
default:
273-
return nil // Unknown IP version
293+
return nil // Unknown IP version.
274294
}
275-
276295
if !isTCP {
277-
return nil // Not a TCP packet
296+
return nil // Not a TCP packet.
278297
}
279298

280-
// Verify checksum before recalculation if enabled
281299
if config.verifyChecksum {
282300
valid, err := tunnet.VerifyTCPChecksum(packetData)
283301
if err != nil {
@@ -291,7 +309,6 @@ func recalculateChecksumIfNeeded(packetData []byte, config *SpliceConfig) error
291309
}
292310
}
293311

294-
// Recalculate the checksum
295312
if err := tunnet.RecalculateTCPChecksum(packetData); err != nil {
296313
if config.logChecksumErrors {
297314
slog.Error("Failed to recalculate TCP checksum", slog.Any("error", err))

0 commit comments

Comments
 (0)