Skip to content

Commit

Permalink
feat: add Detach function to support detach connection from its poller (
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie authored and firedtoad committed Jul 6, 2023
1 parent 4e7b59b commit e9b72f3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 1 deletion.
6 changes: 6 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (c *connection) Close() error {
return c.onClose()
}

// Detach detaches the connection from poller but doesn't close it.
func (c *connection) Detach() error {
c.detaching = true
return c.onClose()
}

// ------------------------------------------ private ------------------------------------------

var barrierPool = sync.Pool{
Expand Down
59 changes: 59 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -433,6 +435,63 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) {
rconn.Close()
}

func TestConnDetach(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
if conn == nil {
continue
}
go func() {
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
if err != nil {
return
}
time.Sleep(100 * time.Millisecond)
_, err = conn.Write(buf)
if err != nil {
return
}
}
}()
}
}()

c, err := DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)

err = conn.Detach()
MustNil(t, err)

f := os.NewFile(uintptr(conn.fd), "netpoll-connection")
defer f.Close()

gonetconn, err := net.FileConn(f)
MustNil(t, err)
buf := make([]byte, 1024)
_, err = gonetconn.Write(buf)
MustNil(t, err)
_, err = gonetconn.Read(buf)
MustNil(t, err)

err = gonetconn.Close()
MustNil(t, err)

err = ln.Close()
MustNil(t, err)
}

func BenchmarkConnectionLock(b *testing.B) {
b.ReportAllocs()
l := &locker{}
Expand Down
2 changes: 2 additions & 0 deletions net_netfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type netFD struct {
network string // tcp tcp4 tcp6, udp, udp4, udp6, ip, ip4, ip6, unix, unixgram, unixpacket
localAddr net.Addr
remoteAddr net.Addr
// for detaching conn from poller
detaching bool
}

func newNetFD(fd, family, sotype int, net string) *netFD {
Expand Down
2 changes: 1 addition & 1 deletion net_netfd_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *netFD) Close() (err error) {
if atomic.AddUint32(&c.closed, 1) != 1 {
return nil
}
if c.fd > 2 {
if !c.detaching && c.fd > 2 {
err = syscall.Close(c.fd)
if err != nil {
logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error())
Expand Down

0 comments on commit e9b72f3

Please sign in to comment.