Skip to content

Commit 498b05c

Browse files
authored
[connect-ip] further enhance packet splice and add high performance connection.Pipe implementation (#45)
Pipe is good to like 40gbit/s on my crummy old XPS laptop.
1 parent 0cb2c92 commit 498b05c

File tree

5 files changed

+364
-18
lines changed

5 files changed

+364
-18
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ require (
222222
github.com/hashicorp/mdns v1.0.1 // indirect
223223
github.com/hashicorp/memberlist v0.5.0 // indirect
224224
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 // indirect
225+
github.com/hedzr/go-ringbuf/v2 v2.2.1 // indirect
225226
github.com/iancoleman/strcase v0.3.0 // indirect
226227
github.com/imdario/mergo v0.3.16 // indirect
227228
github.com/inconshreveable/mousetrap v1.1.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,8 @@ github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR
612612
github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
613613
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 h1:O/pT5C1Q3mVXMyuqg7yuAWUg/jMZR1/0QTzTRdNR6Uw=
614614
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443/go.mod h1:bEpDU35nTu0ey1EXjwNwPjI9xErAsoOCmcMb9GKvyxo=
615+
github.com/hedzr/go-ringbuf/v2 v2.2.1 h1:bnIRxSCWYt4vs5UCDCOYf+r1C8cQC7tkcOdjOTaVzNk=
616+
github.com/hedzr/go-ringbuf/v2 v2.2.1/go.mod h1:N3HsRpbHvPkX9GsykpkPoR2vD6WRR6GbU7tx/9GLE4M=
615617
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
616618
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
617619
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=

pkg/tunnel/connection/pipe.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package connection
2+
3+
import (
4+
"context"
5+
"errors"
6+
"runtime"
7+
"sync"
8+
9+
"github.com/hedzr/go-ringbuf/v2"
10+
"github.com/hedzr/go-ringbuf/v2/mpmc"
11+
)
12+
13+
var _ Connection = (*Pipe)(nil)
14+
15+
type Pipe struct {
16+
readRing mpmc.RingBuffer[[]byte]
17+
writeRing mpmc.RingBuffer[[]byte]
18+
ctx context.Context
19+
cancel context.CancelFunc
20+
}
21+
22+
var bufPool = sync.Pool{
23+
New: func() interface{} {
24+
b := make([]byte, 2048) // Adjust size if needed
25+
return &b
26+
},
27+
}
28+
29+
// NewPipe creates a pair of connected Pipe instances for bidirectional communication.
30+
func NewPipe(ctx context.Context) (*Pipe, *Pipe) {
31+
ringAtoB := ringbuf.New[[]byte](1024)
32+
ringBtoA := ringbuf.New[[]byte](1024)
33+
34+
ctx, cancel := context.WithCancel(ctx)
35+
36+
pipeA := &Pipe{
37+
readRing: ringBtoA,
38+
writeRing: ringAtoB,
39+
ctx: ctx,
40+
cancel: cancel,
41+
}
42+
pipeB := &Pipe{
43+
readRing: ringAtoB,
44+
writeRing: ringBtoA,
45+
ctx: ctx,
46+
cancel: cancel,
47+
}
48+
49+
return pipeA, pipeB
50+
}
51+
52+
// ReadPacket reads a packet into the provided buffer.
53+
func (p *Pipe) ReadPacket(buf []byte) (int, error) {
54+
select {
55+
case <-p.ctx.Done():
56+
return 0, errors.New("pipe closed")
57+
default:
58+
var item []byte
59+
var err error
60+
for {
61+
item, err = p.readRing.Dequeue()
62+
if err != nil {
63+
if errors.Is(err, mpmc.ErrQueueEmpty) {
64+
runtime.Gosched()
65+
66+
// Has the context been cancelled?
67+
select {
68+
case <-p.ctx.Done():
69+
bufPool.Put(&item)
70+
return 0, errors.New("pipe closed")
71+
default:
72+
// Continue to try to dequeue
73+
continue
74+
}
75+
}
76+
return 0, err
77+
}
78+
break
79+
}
80+
n := copy(buf, item)
81+
bufPool.Put(&item)
82+
return n, nil
83+
}
84+
}
85+
86+
// WritePacket writes a packet from the provided buffer.
87+
func (p *Pipe) WritePacket(b []byte) ([]byte, error) {
88+
select {
89+
case <-p.ctx.Done():
90+
return nil, errors.New("pipe closed")
91+
default:
92+
bufPtr := bufPool.Get().(*[]byte)
93+
buf := *bufPtr
94+
if cap(buf) < len(b) {
95+
buf = make([]byte, len(b))
96+
}
97+
buf = buf[:len(b)]
98+
copy(buf, b)
99+
100+
for {
101+
err := p.writeRing.Enqueue(buf)
102+
if err != nil {
103+
if errors.Is(err, mpmc.ErrQueueFull) {
104+
runtime.Gosched()
105+
106+
// Has the context been cancelled?
107+
select {
108+
case <-p.ctx.Done():
109+
bufPool.Put(&buf)
110+
return nil, errors.New("pipe closed")
111+
default:
112+
// Continue to try to enqueue
113+
continue
114+
}
115+
}
116+
return nil, err
117+
}
118+
break
119+
}
120+
121+
return nil, nil
122+
}
123+
}
124+
125+
// Close terminates the pipe.
126+
func (p *Pipe) Close() error {
127+
p.cancel()
128+
return nil
129+
}

pkg/tunnel/connection/pipe_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package connection_test
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
"time"
7+
8+
"github.com/apoxy-dev/apoxy-cli/pkg/tunnel/connection"
9+
)
10+
11+
func TestPipeThroughput(t *testing.T) {
12+
const (
13+
packetSize = 1024 // 1 KB per packet
14+
numPackets = 1_000_000 // Total packets to send
15+
)
16+
17+
p1, p2 := connection.NewPipe(t.Context())
18+
19+
payload := bytes.Repeat([]byte("X"), packetSize)
20+
buf := make([]byte, packetSize)
21+
22+
done := make(chan struct{})
23+
24+
go func() {
25+
for i := 0; i < numPackets; i++ {
26+
if _, err := p2.ReadPacket(buf); err != nil {
27+
t.Fatal(err)
28+
}
29+
}
30+
close(done)
31+
}()
32+
33+
start := time.Now()
34+
35+
for i := 0; i < numPackets; i++ {
36+
if _, err := p1.WritePacket(payload); err != nil {
37+
t.Fatal(err)
38+
}
39+
}
40+
41+
<-done
42+
duration := time.Since(start)
43+
44+
throughputGbps := (float64(packetSize*numPackets*8) / 1e9) / duration.Seconds()
45+
t.Logf("Sent %d packets of %d bytes in %s", numPackets, packetSize, duration)
46+
t.Logf("Throughput: %.2f Gbps", throughputGbps)
47+
}

0 commit comments

Comments
 (0)