Skip to content

Commit 2bab37c

Browse files
committed
[tunnel] eliminate intermediate channel in Splice Connection->TUN path
Remove the producer goroutine and pktCh channel from the zero-copy Splice path. Packets now flow directly from muxedConn.incomingPackets (10k-deep channel) to TUN batch writes via readPacketDirect, skipping a channel hop and goroutine context switch per packet. Add tryReadPacketDirect() for non-blocking batch accumulation so the write loop drains all queued packets before issuing a TUN write syscall. Profiling showed TUN writes at 28-35% of CPU during bulk transfer. The channel elimination reduces Go-level overhead but TUN syscalls remain the dominant cost.
1 parent 6dd4b43 commit 2bab37c

File tree

2 files changed

+103
-41
lines changed

2 files changed

+103
-41
lines changed

pkg/tunnel/connection/muxed_conn.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,24 @@ func (m *muxedConn) readPacketDirect() (*[]byte, error) {
243243
return p, nil
244244
}
245245

246+
// tryReadPacketDirect attempts a non-blocking read from the incoming packet
247+
// channel. Returns (nil, false) if no packet is immediately available.
248+
func (m *muxedConn) tryReadPacketDirect() (*[]byte, bool) {
249+
if m.closed.Load() {
250+
return nil, false
251+
}
252+
253+
select {
254+
case p, ok := <-m.incomingPackets:
255+
if !ok {
256+
return nil, false
257+
}
258+
return p, true
259+
default:
260+
return nil, false
261+
}
262+
}
263+
246264
// putPacketBuffer returns a buffer obtained from readPacketDirect to the pool.
247265
func (m *muxedConn) putPacketBuffer(p *[]byte) {
248266
*p = (*p)[:cap(*p)]

pkg/tunnel/connection/splice.go

Lines changed: 85 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,13 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
142142
// Connection -> TUN path
143143
//
144144
// When the connection is a muxedConn with matching headroom, we use
145-
// readPacketDirect to receive pooled buffers without an extra copy.
145+
// readPacketDirect to receive pooled buffers without an extra copy, and
146+
// tryReadPacketDirect for non-blocking batch accumulation — eliminating
147+
// the intermediate channel and goroutine.
146148
// Otherwise we fall back to the standard ReadPacket-into-local-pool path.
147149
type directReader interface {
148150
readPacketDirect() (*[]byte, error)
151+
tryReadPacketDirect() (*[]byte, bool)
149152
putPacketBuffer(*[]byte)
150153
}
151154
dr, zeroCopy := conn.(directReader)
@@ -155,48 +158,98 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
155158
slog.Debug("Stopped reading from connection")
156159
}()
157160

158-
// Fallback pool only allocated when zero-copy is not available.
159-
var pktPool *sync.Pool
160-
if !zeroCopy {
161-
pktPool = &sync.Pool{
162-
New: func() any {
163-
return ptr.To(make([]byte, netstack.IPv6MinMTU+tunOffset))
164-
},
161+
pkts := make([][]byte, batchSize)
162+
163+
if zeroCopy {
164+
// Zero-copy path: read directly from the muxedConn's internal
165+
// packet channel without an intermediate goroutine or channel.
166+
// This reduces per-packet overhead (no channel hop, no extra
167+
// goroutine switch) and improves batching by draining all
168+
// available packets from the 10k-deep internal queue.
169+
for {
170+
// Block for the first packet.
171+
pkt, err := dr.readPacketDirect()
172+
if err != nil {
173+
if errors.Is(err, net.ErrClosed) {
174+
return nil
175+
}
176+
slog.Error("Failed to read from connection", slog.Any("error", err))
177+
return fmt.Errorf("failed to read from connection: %w", err)
178+
}
179+
180+
if config.observer != nil && len(*pkt) > tunOffset {
181+
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:], DirectionInbound))
182+
}
183+
if config.recalculateChecksum {
184+
if err := recalculateChecksumIfNeeded((*pkt)[tunOffset:], config); err != nil {
185+
slog.Debug("Failed to recalculate checksum on incoming packet", slog.Any("error", err))
186+
}
187+
}
188+
189+
pkts[0] = *pkt
190+
batchCount := 1
191+
192+
// Drain available packets non-blockingly to maximize batch size.
193+
for batchCount < batchSize {
194+
pkt, ok := dr.tryReadPacketDirect()
195+
if !ok {
196+
break
197+
}
198+
if config.observer != nil && len(*pkt) > tunOffset {
199+
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:], DirectionInbound))
200+
}
201+
if config.recalculateChecksum {
202+
if err := recalculateChecksumIfNeeded((*pkt)[tunOffset:], config); err != nil {
203+
slog.Debug("Failed to recalculate checksum on incoming packet", slog.Any("error", err))
204+
}
205+
}
206+
pkts[batchCount] = *pkt
207+
batchCount++
208+
}
209+
210+
if _, err := tunDev.Write(pkts[:batchCount], tunOffset); err != nil {
211+
if strings.Contains(err.Error(), "closed") {
212+
slog.Debug("TUN device closed")
213+
return net.ErrClosed
214+
}
215+
slog.Error("Failed to write to TUN", slog.Any("error", err))
216+
return fmt.Errorf("failed to write to TUN: %w", err)
217+
}
218+
219+
for i := 0; i < batchCount; i++ {
220+
p := pkts[i][:cap(pkts[i])]
221+
dr.putPacketBuffer(&p)
222+
}
165223
}
166224
}
167225

226+
// Non-zero-copy fallback: use intermediate channel for batching.
227+
pktPool := &sync.Pool{
228+
New: func() any {
229+
return ptr.To(make([]byte, netstack.IPv6MinMTU+tunOffset))
230+
},
231+
}
232+
168233
pktCh := make(chan *[]byte, batchSize)
169234

170235
g.Go(func() error {
171236
defer close(pktCh)
172237

173238
for {
174-
var pkt *[]byte
175-
if zeroCopy {
176-
var err error
177-
pkt, err = dr.readPacketDirect()
178-
if err != nil {
179-
slog.Error("Failed to read from connection", slog.Any("error", err))
180-
return fmt.Errorf("failed to read from connection: %w", err)
181-
}
182-
} else {
183-
pkt = pktPool.Get().(*[]byte)
184-
*pkt = (*pkt)[:cap(*pkt)]
185-
n, err := conn.ReadPacket((*pkt)[tunOffset:])
186-
if err != nil {
187-
slog.Error("Failed to read from connection", slog.Any("error", err))
188-
return fmt.Errorf("failed to read from connection: %w", err)
189-
}
190-
*pkt = (*pkt)[:n+tunOffset]
239+
pkt := pktPool.Get().(*[]byte)
240+
*pkt = (*pkt)[:cap(*pkt)]
241+
n, err := conn.ReadPacket((*pkt)[tunOffset:])
242+
if err != nil {
243+
slog.Error("Failed to read from connection", slog.Any("error", err))
244+
return fmt.Errorf("failed to read from connection: %w", err)
191245
}
246+
*pkt = (*pkt)[:n+tunOffset]
192247

193-
if config.observer != nil && len(*pkt) > tunOffset {
248+
if config.observer != nil && n > 0 {
194249
config.observer.OnPacket(ExtractPacketInfo((*pkt)[tunOffset:], DirectionInbound))
195250
}
196-
197251
if config.recalculateChecksum {
198-
packetData := (*pkt)[tunOffset:]
199-
if err := recalculateChecksumIfNeeded(packetData, config); err != nil {
252+
if err := recalculateChecksumIfNeeded((*pkt)[tunOffset:], config); err != nil {
200253
slog.Debug("Failed to recalculate checksum on incoming packet", slog.Any("error", err))
201254
}
202255
}
@@ -205,8 +258,6 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
205258
}
206259
})
207260

208-
pkts := make([][]byte, batchSize)
209-
210261
for {
211262
select {
212263
case pkt, ok := <-pktCh:
@@ -217,14 +268,12 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
217268
pkts[0] = *pkt
218269
batchCount := 1
219270

220-
closed := false
221271
gatherBatch:
222-
for batchCount < batchSize && !closed {
272+
for batchCount < batchSize {
223273
select {
224274
case pkt, ok := <-pktCh:
225275
if !ok {
226-
closed = true
227-
break
276+
break gatherBatch
228277
}
229278
pkts[batchCount] = *pkt
230279
batchCount++
@@ -238,18 +287,13 @@ func Splice(tunDev tun.Device, conn Connection, opts ...SpliceOption) error {
238287
slog.Debug("TUN device closed")
239288
return net.ErrClosed
240289
}
241-
242290
slog.Error("Failed to write to TUN", slog.Any("error", err))
243291
return fmt.Errorf("failed to write to TUN: %w", err)
244292
}
245293

246294
for i := 0; i < batchCount; i++ {
247295
p := pkts[i][:cap(pkts[i])]
248-
if zeroCopy {
249-
dr.putPacketBuffer(&p)
250-
} else {
251-
pktPool.Put(&p)
252-
}
296+
pktPool.Put(&p)
253297
}
254298
}
255299
}

0 commit comments

Comments
 (0)