From 52fd5fb7b00fbbe92dd6d5cfe1adabfac34148a6 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Mon, 7 Aug 2023 11:29:18 +0800 Subject: [PATCH] fix: detach race --- connection_onevent.go | 13 ++++++++++--- connection_reactor.go | 26 ++++++++++---------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/connection_onevent.go b/connection_onevent.go index 28045645..6ddbf31e 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -185,7 +185,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f if c.IsActive() { c.Close() } else { - c.closeCallback(false) + c.closeCallback(false, false) } } }() @@ -200,7 +200,8 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f } // Handling callback if connection has been closed. if !c.IsActive() { - c.closeCallback(false) + // connection if closed by user when processing, so it needs detach + c.closeCallback(false, true) panicked = false return } @@ -221,10 +222,16 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f // closeCallback . // It can be confirmed that closeCallback and onRequest will not be executed concurrently. // If onRequest is still running, it will trigger closeCallback on exit. -func (c *connection) closeCallback(needLock bool) (err error) { +func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) { if needLock && !c.lock(processing) { return nil } + if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered. + // PollDetach only happen when user call conn.Close() or poller detect error + if err := c.operator.Control(PollDetach); err != nil { + logger.Printf("NETPOLL: onClose detach operator failed: %v", err) + } + } var latest = c.closeCallbacks.Load() if latest == nil { return nil diff --git a/connection_reactor.go b/connection_reactor.go index 6fca76e3..2acd45ce 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -28,7 +28,6 @@ func (c *connection) onHup(p Poll) error { if !c.closeBy(poller) { return nil } - // already PollDetach when call OnHup c.triggerRead(Exception(ErrEOF, "peer close")) c.triggerWrite(Exception(ErrConnClosed, "peer close")) // It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively. @@ -37,35 +36,30 @@ func (c *connection) onHup(p Poll) error { var onConnect, _ = c.onConnectCallback.Load().(OnConnect) var onRequest, _ = c.onRequestCallback.Load().(OnRequest) if onConnect != nil || onRequest != nil { - c.closeCallback(true) + // already PollDetach when call OnHup + c.closeCallback(true, false) } return nil } // onClose means close by user. func (c *connection) onClose() error { + // user code close the connection if c.closeBy(user) { - // If Close is called during OnPrepare, poll is not registered. - if c.operator.poll != nil { - if err := c.operator.Control(PollDetach); err != nil { - logger.Printf("NETPOLL: onClose detach operator failed: %v", err) - } - } c.triggerRead(Exception(ErrConnClosed, "self close")) c.triggerWrite(Exception(ErrConnClosed, "self close")) - c.closeCallback(true) + // Detach from poller when processing finished, otherwise it will cause race + c.closeCallback(true, true) return nil } - closedByPoller := c.isCloseBy(poller) - // force change closed by user + // closed by poller + // still need to change closing status to `user` since OnProcess should not be processed again c.force(closing, user) - // If OnRequest is nil, relies on the user to actively close the connection to recycle resources. - if closedByPoller { - c.closeCallback(true) - } - return nil + // user code should actively close the connection to recycle resources. + // poller already detached operator + return c.closeCallback(true, false) } // closeBuffer recycle input & output LinkBuffer.