Skip to content

Commit

Permalink
feat: implement the gnet.Conn.AsyncWritev()
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Dec 4, 2021
1 parent f990be1 commit 469a86b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 39 deletions.
68 changes: 68 additions & 0 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/internal/io"
"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/internal/socket"
"github.com/panjf2000/gnet/pkg/mixedbuffer"
Expand All @@ -39,6 +40,7 @@ type conn struct {
loop *eventloop // connected event-loop
codec ICodec // codec for TCP
opened bool // connection opened event fired
packets [][]byte // reuse it for multiple byte slices
localAddr net.Addr // local addr
remoteAddr net.Addr // remote addr
inboundBuffer *ringbuffer.RingBuffer // buffer for leftover data from the peer
Expand Down Expand Up @@ -74,6 +76,7 @@ func (c *conn) releaseTCP() {
c.outboundBuffer.Release()
netpoll.PutPollAttachment(c.pollAttachment)
c.pollAttachment = nil
c.packets = c.packets[:0]
}

func newUDPConn(fd int, el *eventloop, localAddr net.Addr, sa unix.Sockaddr, connected bool) (c *conn) {
Expand Down Expand Up @@ -156,13 +159,74 @@ func (c *conn) write(buf []byte) (err error) {
return
}

func (c *conn) writev(bs [][]byte) (err error) {
defer func() {
for _, b := range bs {
c.loop.eventHandler.AfterWrite(c, b)
}
c.packets = c.packets[:0]
}()

var sum int
for _, b := range bs {
var packet []byte
if packet, err = c.codec.Encode(c, b); err != nil {
return
}
c.packets = append(c.packets, packet)
sum += len(packet)
c.loop.eventHandler.PreWrite(c)
}

// If there is pending data in outbound buffer, the current data ought to be appended to the outbound buffer
// for maintaining the sequence of network packets.
if !c.outboundBuffer.IsEmpty() {
_, _ = c.outboundBuffer.Writev(c.packets)
return
}

var n int
if n, err = io.Writev(c.fd, c.packets); err != nil {
// A temporary error occurs, append the data to outbound buffer, writing it back to the peer in the next round.
if err == unix.EAGAIN {
_, _ = c.outboundBuffer.Writev(c.packets)
err = c.loop.poller.ModReadWrite(c.pollAttachment)
return
}
return c.loop.loopCloseConn(c, os.NewSyscallError("write", err))
}
// Failed to send all data back to the peer, buffer the leftover data for the next round.
if n < sum {
var pos int
for i := range c.packets {
np := len(c.packets[i])
if n < np {
pos = i
c.packets[i] = c.packets[i][n:]
break
}
n -= np
}
_, _ = c.outboundBuffer.Writev(c.packets[pos:])
err = c.loop.poller.ModReadWrite(c.pollAttachment)
}
return
}

func (c *conn) asyncWrite(itf interface{}) error {
if !c.opened {
return nil
}
return c.write(itf.([]byte))
}

func (c *conn) asyncWritev(itf interface{}) error {
if !c.opened {
return nil
}
return c.writev(itf.([][]byte))
}

func (c *conn) sendTo(buf []byte) error {
c.loop.eventHandler.PreWrite(c)
defer c.loop.eventHandler.AfterWrite(c, buf)
Expand Down Expand Up @@ -238,6 +302,10 @@ func (c *conn) AsyncWrite(buf []byte) error {
return c.loop.poller.Trigger(c.asyncWrite, buf)
}

func (c *conn) AsyncWritev(bs [][]byte) error {
return c.loop.poller.Trigger(c.asyncWritev, bs)
}

func (c *conn) SendTo(buf []byte) error {
return c.sendTo(buf)
}
Expand Down
10 changes: 10 additions & 0 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ func (c *stdConn) AsyncWrite(buf []byte) (err error) {
return
}

func (c *stdConn) AsyncWritev(bs [][]byte) (err error) {
for _, b := range bs {
err = c.AsyncWrite(b)
if err != nil {
return
}
}
return
}

func (c *stdConn) SendTo(buf []byte) (err error) {
c.loop.eventHandler.PreWrite(c)
_, err = c.loop.svr.ln.packetConn.WriteTo(buf, c.remoteAddr)
Expand Down
6 changes: 5 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,14 @@ type Conn interface {
// SendTo writes data for UDP sockets, it allows you to send data back to UDP socket in individual goroutines.
SendTo(buf []byte) error

// AsyncWrite writes data to peer asynchronously, usually you would call it in individual goroutines
// AsyncWrite writes one byte slice to peer asynchronously, usually you would call it in individual goroutines
// instead of the event-loop goroutines.
AsyncWrite(buf []byte) error

// AsyncWritev writes multiple byte slices to peer asynchronously, usually you would call it in individual goroutines
// instead of the event-loop goroutines.
AsyncWritev(bs [][]byte) error

// Wake triggers a React event for the connection.
Wake() error

Expand Down
102 changes: 64 additions & 38 deletions gnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,152 +323,168 @@ func TestServe(t *testing.T) {
t.Run("poll", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
testServe(t, "tcp", ":9991", false, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", false, false, true, false, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
testServe(t, "tcp", ":9991", false, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
testServe(t, "tcp", ":9992", false, false, true, true, false, 10, LeastConnections)
})
})
t.Run("tcp-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", false, false, false, true, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", false, false, true, true, true, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", false, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
testServe(t, "udp", ":9992", false, false, true, false, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
testServe(t, "udp", ":9991", false, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
testServe(t, "udp", ":9992", false, false, true, true, false, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", false, false, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
testServe(t, "unix", "gnet2.sock", false, false, true, false, false, 10, SourceAddrHash)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", false, false, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", false, false, true, true, false, 10, SourceAddrHash)
})
})
t.Run("unix-async-writev", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", false, false, false, true, true, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
testServe(t, "unix", "gnet2.sock", false, false, true, true, true, 10, SourceAddrHash)
})
})
})

t.Run("poll-reuseport", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
testServe(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
testServe(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
testServe(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
testServe(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections)
})
})
})

t.Run("poll-reuseaddr", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
testServe(t, "tcp", ":9991", false, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections)
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
testServe(t, "tcp", ":9991", false, true, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
testServe(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections)
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
testServe(t, "udp", ":9992", false, true, true, false, false, 10, LeastConnections)
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
testServe(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
testServe(t, "udp", ":9992", false, true, true, true, false, 10, LeastConnections)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", false, true, false, false, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", false, true, true, false, false, 10, LeastConnections)
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
testServe(t, "unix", "gnet1.sock", false, true, false, true, false, 10, RoundRobin)
})
t.Run("N-loop", func(t *testing.T) {
testServe(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
testServe(t, "unix", "gnet2.sock", false, true, true, true, false, 10, LeastConnections)
})
})
})
Expand All @@ -482,6 +498,7 @@ type testServer struct {
addr string
multicore bool
async bool
writev bool
nclients int
started int32
connected int32
Expand Down Expand Up @@ -534,7 +551,15 @@ func (s *testServer) React(packet []byte, c Conn) (out []byte, action Action) {

_ = s.workerPool.Submit(
func() {
_ = c.AsyncWrite(buf.Bytes())
if s.writev {
mid := buf.Len() / 2
bs := make([][]byte, 2)
bs[0] = buf.B[:mid]
bs[1] = buf.B[mid:]
_ = c.AsyncWritev(bs)
} else {
_ = c.AsyncWrite(buf.Bytes())
}
})
return
} else if s.network == "udp" {
Expand Down Expand Up @@ -568,13 +593,14 @@ func (s *testServer) Tick() (delay time.Duration, action Action) {
return
}

func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
func testServe(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async, writev bool, nclients int, lb LoadBalancing) {
ts := &testServer{
tester: t,
network: network,
addr: addr,
multicore: multicore,
async: async,
writev: writev,
nclients: nclients,
workerPool: goroutine.Default(),
}
Expand Down
Loading

0 comments on commit 469a86b

Please sign in to comment.