Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: detach operator race #280

Merged
merged 1 commit into from Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions connection_onevent.go
Expand Up @@ -185,7 +185,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if c.IsActive() {
c.Close()
} else {
c.closeCallback(false)
c.closeCallback(false, false)
}
}
}()
Expand All @@ -200,7 +200,8 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
}
// Handling callback if connection has been closed.
if !c.IsActive() {
c.closeCallback(false)
// connection if closed by user when processing, so it needs detach
c.closeCallback(false, true)
panicked = false
return
}
Expand All @@ -221,10 +222,16 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
// closeCallback .
// It can be confirmed that closeCallback and onRequest will not be executed concurrently.
// If onRequest is still running, it will trigger closeCallback on exit.
func (c *connection) closeCallback(needLock bool) (err error) {
func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
if needDetach && c.operator.poll != nil { // If Close is called during OnPrepare, poll is not registered.
// PollDetach only happen when user call conn.Close() or poller detect error
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
Expand Down
26 changes: 10 additions & 16 deletions connection_reactor.go
Expand Up @@ -28,7 +28,6 @@ func (c *connection) onHup(p Poll) error {
if !c.closeBy(poller) {
return nil
}
// already PollDetach when call OnHup
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
Expand All @@ -37,35 +36,30 @@ func (c *connection) onHup(p Poll) error {
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
// already PollDetach when call OnHup
c.closeCallback(true, false)
}
return nil
}

// onClose means close by user.
func (c *connection) onClose() error {
// user code close the connection
if c.closeBy(user) {
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
c.triggerRead(Exception(ErrConnClosed, "self close"))
c.triggerWrite(Exception(ErrConnClosed, "self close"))
c.closeCallback(true)
// Detach from poller when processing finished, otherwise it will cause race
c.closeCallback(true, true)
return nil
}

closedByPoller := c.isCloseBy(poller)
// force change closed by user
// closed by poller
// still need to change closing status to `user` since OnProcess should not be processed again
c.force(closing, user)

// If OnRequest is nil, relies on the user to actively close the connection to recycle resources.
if closedByPoller {
c.closeCallback(true)
}
return nil
// user code should actively close the connection to recycle resources.
// poller already detached operator
return c.closeCallback(true, false)
}

// closeBuffer recycle input & output LinkBuffer.
Expand Down