Skip to content

Commit

Permalink
fix: OnRequest should wait all readable data consumed when sender clo…
Browse files Browse the repository at this point in the history
…se connection (#278)
  • Loading branch information
joway committed Aug 4, 2023
1 parent 68ebb21 commit 68b5b99
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 159 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: "1.20"

- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -66,7 +66,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -80,4 +80,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
26 changes: 13 additions & 13 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Push and Pull Request Check

on: [ push, pull_request ]
on: [ push ]

jobs:
compatibility-test:
Expand All @@ -15,12 +15,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
Expand All @@ -33,12 +33,12 @@ jobs:
uses: actions/setup-go@v3
with:
go-version: "1.20"
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
# - uses: actions/cache@v2
# with:
# path: ~/go/pkg/mod
# key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# restore-keys: |
# ${{ runner.os }}-go-
- name: Build Test
run: go vet -v ./...
style-test:
Expand Down
66 changes: 37 additions & 29 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type connection struct {
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan struct{}
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
Expand Down Expand Up @@ -319,7 +319,7 @@ var barrierPool = sync.Pool{
// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan struct{}, 1)
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
Expand Down Expand Up @@ -357,19 +357,12 @@ func (c *connection) initNetFD(conn Conn) {
}

func (c *connection) initFDOperator() {
var op *FDOperator
if c.pd != nil && c.pd.operator != nil {
// reuse operator created at connect step
op = c.pd.operator
} else {
poll := pollmanager.Pick()
op = poll.Alloc()
}
poll := pollmanager.Pick()
op := poll.Alloc()
op.FD = c.fd
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck

c.operator = op
}

Expand All @@ -385,9 +378,9 @@ func (c *connection) initFinalizer() {
})
}

func (c *connection) triggerRead() {
func (c *connection) triggerRead(err error) {
select {
case c.readTrigger <- struct{}{}:
case c.readTrigger <- err:
default:
}
}
Expand All @@ -411,10 +404,17 @@ func (c *connection) waitRead(n int) (err error) {
}
// wait full n
for c.inputBuffer.Len() < n {
if !c.IsActive() {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
<-c.readTrigger
}
return nil
}
Expand All @@ -429,24 +429,32 @@ func (c *connection) waitReadWithTimeout(n int) (err error) {
}

for c.inputBuffer.Len() < n {
if !c.IsActive() {
// cannot return directly, stop timer before !
switch c.status(closing) {
case poller:
// cannot return directly, stop timer first!
err = Exception(ErrEOF, "wait read")
goto RET
case user:
// cannot return directly, stop timer first!
err = Exception(ErrConnClosed, "wait read")
break
}

select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
goto RET
default:
select {
case <-c.readTimer.C:
// double check if there is enough data to be read
if c.inputBuffer.Len() >= n {
return nil
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case err = <-c.readTrigger:
if err != nil {
return err
}
continue
}
return Exception(ErrReadTimeout, c.remoteAddr.String())
case <-c.readTrigger:
continue
}
}

RET:
// clean timer.C
if !c.readTimer.Stop() {
<-c.readTimer.C
Expand Down
10 changes: 9 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"sync/atomic"
)

type who int32
type who = int32

const (
none who = iota
Expand Down Expand Up @@ -65,6 +65,14 @@ func (l *locker) isCloseBy(w who) (yes bool) {
return atomic.LoadInt32(&l.keychain[closing]) == int32(w)
}

func (l *locker) status(k key) int32 {
return atomic.LoadInt32(&l.keychain[k])
}

func (l *locker) force(k key, v int32) {
atomic.StoreInt32(&l.keychain[k], v)
}

func (l *locker) lock(k key) (success bool) {
return atomic.CompareAndSwapInt32(&l.keychain[k], 0, 1)
}
Expand Down
17 changes: 2 additions & 15 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *connection) onProcess(isProcessable func(c *connection) bool, process f
if isProcessable(c) {
process(c)
}
for c.IsActive() && isProcessable(c) {
for !c.isCloseBy(user) && isProcessable(c) {
process(c)
}
// Handling callback if connection has been closed.
Expand Down Expand Up @@ -225,12 +225,6 @@ func (c *connection) closeCallback(needLock bool) (err error) {
if needLock && !c.lock(processing) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.isCloseBy(user) && c.operator.poll != nil {
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
return nil
Expand All @@ -243,14 +237,7 @@ func (c *connection) closeCallback(needLock bool) (err error) {

// register only use for connection register into poll.
func (c *connection) register() (err error) {
if c.operator.isUnused() {
// operator is not registered
err = c.operator.Control(PollReadable)
} else {
// operator is already registered
// change event to wait read new data
err = c.operator.Control(PollModReadable)
}
err = c.operator.Control(PollReadable)
if err != nil {
logger.Printf("NETPOLL: connection register failed: %v", err)
c.Close()
Expand Down
46 changes: 29 additions & 17 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,44 @@ import (

// onHup means close by poller.
func (c *connection) onHup(p Poll) error {
if c.closeBy(poller) {
c.triggerRead()
c.triggerWrite(ErrConnClosed)
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
}
if !c.closeBy(poller) {
return nil
}
// already PollDetach when call OnHup
c.triggerRead(Exception(ErrEOF, "peer close"))
c.triggerWrite(Exception(ErrConnClosed, "peer close"))
// It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively.
// It can be confirmed that the OnRequest goroutine has been exited before closecallback executing,
// and it is safe to close the buffer at this time.
var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
if onConnect != nil || onRequest != nil {
c.closeCallback(true)
}
return nil
}

// onClose means close by user.
func (c *connection) onClose() error {
if c.closeBy(user) {
c.triggerRead()
c.triggerWrite(ErrConnClosed)
// If Close is called during OnPrepare, poll is not registered.
if c.operator.poll != nil {
if err := c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: onClose detach operator failed: %v", err)
}
}
c.triggerRead(Exception(ErrConnClosed, "self close"))
c.triggerWrite(Exception(ErrConnClosed, "self close"))
c.closeCallback(true)
return nil
}
if c.isCloseBy(poller) {
// Connection with OnRequest of nil
// relies on the user to actively close the connection to recycle resources.

closedByPoller := c.isCloseBy(poller)
// force change closed by user
c.force(closing, user)

// If OnRequest is nil, relies on the user to actively close the connection to recycle resources.
if closedByPoller {
c.closeCallback(true)
}
return nil
Expand Down Expand Up @@ -103,7 +115,7 @@ func (c *connection) inputAck(n int) (err error) {
needTrigger = c.onRequest()
}
if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
c.triggerRead()
c.triggerRead(nil)
}
return nil
}
Expand Down
Loading

0 comments on commit 68b5b99

Please sign in to comment.