Skip to content

Commit e98a2b9

Browse files
committed
[tunnel] reduce bfdl hot-path allocations
Add MarshalTo/UnmarshalInto zero-alloc variants that write into caller-provided buffers. Change ProcessRx/BuildTx to fill caller-owned Packet instead of returning heap-allocated pointers. Use sync.Pool for packets crossing the client read goroutine channel boundary. Cache the UDP destination address in Client to avoid per-send allocation.
1 parent eb7733d commit e98a2b9

File tree

5 files changed

+111
-78
lines changed

5 files changed

+111
-78
lines changed

pkg/tunnel/bfdl/client.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,18 @@ import (
66
"math/rand/v2"
77
"net"
88
"net/netip"
9+
"sync"
910
"time"
1011

1112
apoxynet "github.com/apoxy-dev/apoxy/pkg/tunnel/net"
1213
)
1314

15+
// pktPool recycles Packet objects between the read goroutine and the
16+
// main Run loop to avoid per-packet heap allocations.
17+
var pktPool = sync.Pool{
18+
New: func() any { return new(Packet) },
19+
}
20+
1421
// BFDServerAddr is the well-known server-side BFD address (proxySourceAddr).
1522
var BFDServerAddr = netip.MustParseAddr(apoxynet.ApoxyULAAddr)
1623

@@ -19,6 +26,7 @@ type Client struct {
1926
session *Session
2027
conn net.PacketConn
2128
serverAddr netip.AddrPort
29+
dst *net.UDPAddr // cached WriteTo target
2230

2331
// downCh is closed when the session transitions to Down from Up
2432
// (detect timer expired). Consumers should select on this to
@@ -36,6 +44,7 @@ func NewClient(conn net.PacketConn, serverAddr netip.AddrPort) *Client {
3644
c := &Client{
3745
conn: conn,
3846
serverAddr: serverAddr,
47+
dst: net.UDPAddrFromAddrPort(serverAddr),
3948
downCh: make(chan struct{}),
4049
}
4150

@@ -88,8 +97,9 @@ func (c *Client) Run(ctx context.Context) error {
8897
BFDPacketErrors.WithLabelValues("client", "rx").Inc()
8998
continue
9099
}
91-
pkt, err := Unmarshal(buf[:n])
92-
if err != nil {
100+
pkt := pktPool.Get().(*Packet)
101+
if err := UnmarshalInto(pkt, buf[:n]); err != nil {
102+
pktPool.Put(pkt)
93103
slog.Debug("BFD client unmarshal error", "error", err)
94104
BFDPacketErrors.WithLabelValues("client", "rx").Inc()
95105
continue
@@ -98,28 +108,29 @@ func (c *Client) Run(ctx context.Context) error {
98108
select {
99109
case readCh <- pkt:
100110
default:
111+
pktPool.Put(pkt)
101112
}
102113
}
103114
}()
104115

105116
// Send initial packet immediately.
106117
c.sendTx()
107118

108-
dst := net.UDPAddrFromAddrPort(c.serverAddr)
109119
for {
110120
select {
111121
case <-ctx.Done():
112122
return nil
113123
case pkt := <-readCh:
114-
resp := c.session.ProcessRx(pkt)
115-
if resp != nil {
116-
out := Marshal(resp)
117-
if _, err := c.conn.WriteTo(out, dst); err != nil {
118-
slog.Debug("BFD client write error", "error", err)
119-
BFDPacketErrors.WithLabelValues("client", "tx").Inc()
120-
} else {
121-
BFDPacketsTx.WithLabelValues("client").Inc()
122-
}
124+
var resp Packet
125+
c.session.ProcessRx(pkt, &resp)
126+
pktPool.Put(pkt)
127+
var out [bfdPacketLen]byte
128+
MarshalTo(out[:], &resp)
129+
if _, err := c.conn.WriteTo(out[:], c.dst); err != nil {
130+
slog.Debug("BFD client write error", "error", err)
131+
BFDPacketErrors.WithLabelValues("client", "tx").Inc()
132+
} else {
133+
BFDPacketsTx.WithLabelValues("client").Inc()
123134
}
124135
case <-txTicker.C:
125136
c.sendTx()
@@ -149,10 +160,11 @@ func (c *Client) Run(ctx context.Context) error {
149160
}
150161

151162
func (c *Client) sendTx() {
152-
pkt := c.session.BuildTx()
153-
out := Marshal(pkt)
154-
dst := net.UDPAddrFromAddrPort(c.serverAddr)
155-
if _, err := c.conn.WriteTo(out, dst); err != nil {
163+
var pkt Packet
164+
c.session.BuildTx(&pkt)
165+
var out [bfdPacketLen]byte
166+
MarshalTo(out[:], &pkt)
167+
if _, err := c.conn.WriteTo(out[:], c.dst); err != nil {
156168
slog.Debug("BFD client write error", "error", err)
157169
BFDPacketErrors.WithLabelValues("client", "tx").Inc()
158170
} else {

pkg/tunnel/bfdl/packet.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ type Packet struct {
6363
RequiredMinEcho uint32 // Microseconds, always 0 (no echo mode).
6464
}
6565

66-
// Marshal encodes a BFD control packet into a 24-byte wire format.
66+
// MarshalTo encodes a BFD control packet into buf, which must be at least
67+
// bfdPacketLen (24) bytes. Use with a stack-allocated [bfdPacketLen]byte
68+
// to avoid heap allocation.
6769
//
6870
// Wire format (RFC 5880 section 4.1):
6971
//
@@ -82,9 +84,7 @@ type Packet struct {
8284
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
8385
// | Required Min Echo RX Interval |
8486
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
85-
func Marshal(p *Packet) []byte {
86-
buf := make([]byte, bfdPacketLen)
87-
87+
func MarshalTo(buf []byte, p *Packet) {
8888
// Byte 0: Version (3 bits) | Diag (5 bits).
8989
buf[0] = (p.Version << 5) | (p.Diag & 0x1f)
9090

@@ -110,32 +110,36 @@ func Marshal(p *Packet) []byte {
110110
binary.BigEndian.PutUint32(buf[12:16], p.DesiredMinTx)
111111
binary.BigEndian.PutUint32(buf[16:20], p.RequiredMinRx)
112112
binary.BigEndian.PutUint32(buf[20:24], p.RequiredMinEcho)
113+
}
113114

115+
// Marshal encodes a BFD control packet into a newly allocated 24-byte slice.
116+
func Marshal(p *Packet) []byte {
117+
buf := make([]byte, bfdPacketLen)
118+
MarshalTo(buf, p)
114119
return buf
115120
}
116121

117-
// Unmarshal decodes a BFD control packet from wire format.
118-
func Unmarshal(b []byte) (*Packet, error) {
122+
// UnmarshalInto decodes a BFD control packet from wire format into p.
123+
// Use with a stack-allocated Packet to avoid heap allocation.
124+
func UnmarshalInto(p *Packet, b []byte) error {
119125
if len(b) < bfdPacketLen {
120-
return nil, fmt.Errorf("packet too short: %d < %d", len(b), bfdPacketLen)
126+
return fmt.Errorf("packet too short: %d < %d", len(b), bfdPacketLen)
121127
}
122128

123-
p := &Packet{
124-
Version: b[0] >> 5,
125-
Diag: b[0] & 0x1f,
126-
State: State(b[1] >> 6),
127-
Poll: b[1]&0x20 != 0,
128-
Final: b[1]&0x10 != 0,
129-
DetectMult: b[2],
130-
}
129+
p.Version = b[0] >> 5
130+
p.Diag = b[0] & 0x1f
131+
p.State = State(b[1] >> 6)
132+
p.Poll = b[1]&0x20 != 0
133+
p.Final = b[1]&0x10 != 0
134+
p.DetectMult = b[2]
131135

132136
if p.Version != 1 {
133-
return nil, fmt.Errorf("unsupported BFD version: %d", p.Version)
137+
return fmt.Errorf("unsupported BFD version: %d", p.Version)
134138
}
135139

136140
length := b[3]
137141
if length < bfdPacketLen {
138-
return nil, fmt.Errorf("invalid packet length field: %d", length)
142+
return fmt.Errorf("invalid packet length field: %d", length)
139143
}
140144

141145
p.MyDiscr = binary.BigEndian.Uint32(b[4:8])
@@ -144,5 +148,15 @@ func Unmarshal(b []byte) (*Packet, error) {
144148
p.RequiredMinRx = binary.BigEndian.Uint32(b[16:20])
145149
p.RequiredMinEcho = binary.BigEndian.Uint32(b[20:24])
146150

151+
return nil
152+
}
153+
154+
// Unmarshal decodes a BFD control packet from wire format into a newly
155+
// allocated Packet.
156+
func Unmarshal(b []byte) (*Packet, error) {
157+
p := &Packet{}
158+
if err := UnmarshalInto(p, b); err != nil {
159+
return nil, err
160+
}
147161
return p, nil
148162
}

pkg/tunnel/bfdl/server.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ func (s *Server) Start(ctx context.Context) error {
134134

135135
// RX loop: receive and respond to incoming BFD packets.
136136
buf := make([]byte, 128)
137+
var rxPkt, resp Packet
138+
var out [bfdPacketLen]byte
137139
for {
138140
n, raddr, err := conn.ReadFromUDP(buf)
139141
if err != nil {
@@ -144,16 +146,15 @@ func (s *Server) Start(ctx context.Context) error {
144146
continue
145147
}
146148

147-
pkt, err := Unmarshal(buf[:n])
148-
if err != nil {
149+
if err := UnmarshalInto(&rxPkt, buf[:n]); err != nil {
149150
slog.Debug("BFD unmarshal error", "error", err, "src", raddr)
150151
BFDPacketErrors.WithLabelValues("server", "rx").Inc()
151152
continue
152153
}
153154

154155
BFDPacketsRx.WithLabelValues("server").Inc()
155156

156-
resp, connID := s.handlePacket(pkt, raddr)
157+
connID, ok := s.handlePacket(&rxPkt, raddr, &resp)
157158

158159
// Check blackhole: drop response and skip onAlive.
159160
s.mu.RLock()
@@ -163,9 +164,9 @@ func (s *Server) Start(ctx context.Context) error {
163164
continue
164165
}
165166

166-
if resp != nil {
167-
out := Marshal(resp)
168-
if _, err := conn.WriteToUDP(out, raddr); err != nil {
167+
if ok {
168+
MarshalTo(out[:], &resp)
169+
if _, err := conn.WriteToUDP(out[:], raddr); err != nil {
169170
slog.Warn("BFD write error", "error", err, "dst", raddr)
170171
BFDPacketErrors.WithLabelValues("server", "tx").Inc()
171172
} else {
@@ -192,9 +193,8 @@ func (s *Server) txLoop(ctx context.Context, conn *net.UDPConn) {
192193
case <-ticker.C:
193194
s.mu.Lock()
194195
type txTarget struct {
195-
pkt *Packet
196-
addr net.UDPAddr
197-
connID string
196+
pkt Packet
197+
addr net.UDPAddr
198198
}
199199
targets := make([]txTarget, 0, len(s.sessions))
200200
for _, ss := range s.sessions {
@@ -219,17 +219,19 @@ func (s *Server) txLoop(ctx context.Context, conn *net.UDPConn) {
219219
continue
220220
}
221221

222+
var pkt Packet
223+
ss.BuildTx(&pkt)
222224
targets = append(targets, txTarget{
223-
pkt: ss.BuildTx(),
224-
addr: ss.peerAddr,
225-
connID: ss.connID,
225+
pkt: pkt,
226+
addr: ss.peerAddr,
226227
})
227228
}
228229
s.mu.Unlock()
229230

231+
var out [bfdPacketLen]byte
230232
for _, t := range targets {
231-
out := Marshal(t.pkt)
232-
if _, err := conn.WriteToUDP(out, &t.addr); err != nil {
233+
MarshalTo(out[:], &t.pkt)
234+
if _, err := conn.WriteToUDP(out[:], &t.addr); err != nil {
233235
slog.Debug("BFD server TX error", "error", err, "dst", t.addr.String())
234236
BFDPacketErrors.WithLabelValues("server", "tx").Inc()
235237
} else {
@@ -240,23 +242,25 @@ func (s *Server) txLoop(ctx context.Context, conn *net.UDPConn) {
240242
}
241243
}
242244

243-
// handlePacket processes an incoming BFD packet and returns a response.
244-
func (s *Server) handlePacket(pkt *Packet, raddr *net.UDPAddr) (*Packet, string) {
245+
// handlePacket processes an incoming BFD packet. If a session exists or can
246+
// be created, it writes the response into resp and returns (connID, true).
247+
// Returns ("", false) when the packet should be dropped.
248+
func (s *Server) handlePacket(rx *Packet, raddr *net.UDPAddr, resp *Packet) (string, bool) {
245249
s.mu.Lock()
246250
defer s.mu.Unlock()
247251

248252
var ss *serverSession
249253

250254
// Fast path: lookup by YourDiscr (remote knows our discriminator).
251-
if pkt.YourDiscr != 0 {
252-
ss = s.sessions[pkt.YourDiscr]
255+
if rx.YourDiscr != 0 {
256+
ss = s.sessions[rx.YourDiscr]
253257
}
254258

255259
// Slow path: lookup by source IP (initial packet).
256260
if ss == nil {
257261
srcIP, ok := netip.AddrFromSlice(raddr.IP)
258262
if !ok {
259-
return nil, ""
263+
return "", false
260264
}
261265
srcIP = srcIP.Unmap()
262266

@@ -265,7 +269,7 @@ func (s *Server) handlePacket(pkt *Packet, raddr *net.UDPAddr) (*Packet, string)
265269
// No existing session. Check if peer is registered.
266270
connID, registered := s.peers[srcIP]
267271
if !registered {
268-
return nil, ""
272+
return "", false
269273
}
270274

271275
// Create new session.
@@ -303,6 +307,6 @@ func (s *Server) handlePacket(pkt *Packet, raddr *net.UDPAddr) (*Packet, string)
303307
// Update peer address (port may change).
304308
ss.peerAddr = *raddr
305309

306-
resp := ss.ProcessRx(pkt)
307-
return resp, ss.connID
310+
ss.ProcessRx(rx, resp)
311+
return ss.connID, true
308312
}

pkg/tunnel/bfdl/session.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (s *Session) SetOnStateChange(fn OnStateChangeFunc) {
4040
s.onStateChange = fn
4141
}
4242

43-
// ProcessRx handles an incoming BFD packet and returns a response packet (or nil).
43+
// ProcessRx handles an incoming BFD packet and writes the response into resp.
4444
//
4545
// Simplified RFC 5880 section 6.8.6 state transitions:
4646
//
@@ -49,30 +49,30 @@ func (s *Session) SetOnStateChange(fn OnStateChangeFunc) {
4949
// Local=Init + Remote=Init -> Up
5050
// Local=Init + Remote=Up -> Up
5151
// Local=Up + Remote=Down -> Down
52-
func (s *Session) ProcessRx(pkt *Packet) *Packet {
52+
func (s *Session) ProcessRx(rx, resp *Packet) {
5353
s.mu.Lock()
5454
defer s.mu.Unlock()
5555

5656
s.lastRx = time.Now()
57-
s.remoteDiscr = pkt.MyDiscr
58-
s.remoteState = pkt.State
57+
s.remoteDiscr = rx.MyDiscr
58+
s.remoteState = rx.State
5959

6060
oldState := s.localState
6161
switch s.localState {
6262
case StateDown:
63-
switch pkt.State {
63+
switch rx.State {
6464
case StateDown:
6565
s.localState = StateInit
6666
case StateInit:
6767
s.localState = StateUp
6868
}
6969
case StateInit:
70-
switch pkt.State {
70+
switch rx.State {
7171
case StateInit, StateUp:
7272
s.localState = StateUp
7373
}
7474
case StateUp:
75-
if pkt.State == StateDown {
75+
if rx.State == StateDown {
7676
s.localState = StateDown
7777
}
7878
}
@@ -81,19 +81,19 @@ func (s *Session) ProcessRx(pkt *Packet) *Packet {
8181
s.onStateChange(oldState, s.localState)
8282
}
8383

84-
// Build response.
85-
return s.buildTxLocked()
84+
s.buildTxLocked(resp)
8685
}
8786

88-
// BuildTx creates an outgoing BFD control packet for periodic transmission.
89-
func (s *Session) BuildTx() *Packet {
87+
// BuildTx writes an outgoing BFD control packet into pkt for periodic
88+
// transmission.
89+
func (s *Session) BuildTx(pkt *Packet) {
9090
s.mu.Lock()
9191
defer s.mu.Unlock()
92-
return s.buildTxLocked()
92+
s.buildTxLocked(pkt)
9393
}
9494

95-
func (s *Session) buildTxLocked() *Packet {
96-
return &Packet{
95+
func (s *Session) buildTxLocked(pkt *Packet) {
96+
*pkt = Packet{
9797
Version: 1,
9898
State: s.localState,
9999
DetectMult: s.detectMult,

0 commit comments

Comments
 (0)