Skip to content

Commit

Permalink
feat: detach conn
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed May 6, 2023
1 parent d5ac410 commit 800c2ce
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 44 deletions.
17 changes: 6 additions & 11 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ func (c *connection) Close() error {
return c.onClose()
}

// Detach implements Connection.
func (c *connection) Detach() error {
return c.onDetach()
// DetachPoller detaches the connection from poller.
func (c *connection) DetachPoller() error {
c.detaching = true
return c.onClose()
}

// ------------------------------------------ private ------------------------------------------
Expand Down Expand Up @@ -376,14 +377,8 @@ func (c *connection) initFinalizer() {
c.AddCloseCallback(func(connection Connection) (err error) {
c.stop(flushing)
c.operator.Free()
if c.isCloseBy(rpal) {
if err = c.netFD.DummyClose(); err != nil {
logger.Printf("NETPOLL: netFD dummy close failed: %v", err)
}
} else {
if err = c.netFD.Close(); err != nil {
logger.Printf("NETPOLL: netFD close failed: %v", err)
}
if err = c.netFD.Close(); err != nil {
logger.Printf("NETPOLL: netFD close failed: %v", err)
}
c.closeBuffer()
return nil
Expand Down
1 change: 0 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const (
none who = iota
user
poller
rpal
)

type key int32
Expand Down
2 changes: 1 addition & 1 deletion connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *connection) closeCallback(needLock bool) (err error) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if (c.isCloseBy(user) || c.isCloseBy(rpal)) && c.operator.poll != nil {
if c.isCloseBy(user) && c.operator.poll != nil {
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
}
Expand Down
16 changes: 0 additions & 16 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,6 @@ func (c *connection) onClose() error {
return nil
}

// onDetach means detach by rpal.
func (c *connection) onDetach() error {
if c.closeBy(rpal) {
c.triggerRead()
c.triggerWrite(ErrConnClosed)
c.closeCallback(true)
return nil
}
if c.isCloseBy(poller) {
// Connection with OnRequest of nil
// relies on the user to actively close the connection to recycle resources.
c.closeCallback(true)
}
return nil
}

// closeBuffer recycle input & output LinkBuffer.
func (c *connection) closeBuffer() {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
Expand Down
49 changes: 49 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"math/rand"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -432,3 +434,50 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) {
wg.Wait()
rconn.Close()
}

func TestConnDetachPoller(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
MustNil(t, err)

go func() {
for {
conn, err := ln.Accept()
if conn == nil && err == nil {
continue
}
if err != nil {
return
}
go func() {
buf := make([]byte, 1024)
// slow read
for {
_, err := conn.Read(buf)
MustNil(t, err)
time.Sleep(100 * time.Millisecond)
_, err = conn.Write(buf)
MustNil(t, err)
}
}()
}
}()

c, err := DialConnection("tcp", ":1234", time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)

err = conn.DetachPoller()
MustNil(t, err)

f := os.NewFile(uintptr(conn.fd), "netpoll-connection")
defer f.Close()

gonetconn, err := net.FileConn(f)
MustNil(t, err)
buf := make([]byte, 1024)
_, err = gonetconn.Write(buf)
MustNil(t, err)
_, err = gonetconn.Read(buf)
MustNil(t, err)
}
2 changes: 2 additions & 0 deletions net_netfd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type netFD struct {
network string // tcp tcp4 tcp6, udp, udp4, udp6, ip, ip4, ip6, unix, unixgram, unixpacket
localAddr net.Addr
remoteAddr net.Addr
// for detaching conn from poller
detaching bool
}

func newNetFD(fd, family, sotype int, net string) *netFD {
Expand Down
16 changes: 1 addition & 15 deletions net_netfd_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *netFD) Close() (err error) {
if atomic.AddUint32(&c.closed, 1) != 1 {
return nil
}
if c.fd > 2 {
if !c.detaching && c.fd > 2 {
err = syscall.Close(c.fd)
if err != nil {
logger.Printf("NETPOLL: netFD[%d] close error: %s", c.fd, err.Error())
Expand All @@ -68,20 +68,6 @@ func (c *netFD) Close() (err error) {
return err
}

// DummyClose will be executed only once.
func (c *netFD) DummyClose() (err error) {
if atomic.AddUint32(&c.closed, 1) != 1 {
return nil
}
//if c.fd > 0 {
// err = syscall.Close(c.fd)
// if err != nil {
// log.Printf("netFD[%d] close error: %s", c.fd, err.Error())
// }
//}
return err
}

// LocalAddr implements Conn.
func (c *netFD) LocalAddr() (addr net.Addr) {
return c.localAddr
Expand Down

0 comments on commit 800c2ce

Please sign in to comment.