Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: fix waitReadSize and enhance trigger #74

Merged
merged 1 commit into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,10 @@ func (c *connection) triggerWrite(err error) {

// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
leftover := n - c.inputBuffer.Len()
if leftover <= 0 {
if n <= c.inputBuffer.Len() {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(leftover))
atomic.StoreInt32(&c.waitReadSize, int32(n))
defer atomic.StoreInt32(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
Expand Down
8 changes: 4 additions & 4 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (c *connection) onPrepare(prepare OnPrepare) (err error) {
}

// onRequest is also responsible for executing the callbacks after the connection has been closed.
func (c *connection) onRequest() (err error) {
func (c *connection) onRequest() (needTrigger bool) {
var process = c.process.Load()
if process == nil {
return nil
return true
}
// Buffer has been fully processed, or task already exists
if !c.lock(processing) {
return nil
return true
}
// add new task
var task = func() {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *connection) onRequest() (err error) {
}
}
runTask(c.ctx, task)
return nil
return false
}

// closeCallback .
Expand Down
14 changes: 9 additions & 5 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, leftover <= 0)
//FIXME: always trigger reader since waitReadSize may not correct when waitReading
c.triggerRead()
c.onRequest()
waitReadSize := int(atomic.LoadInt32(&c.waitReadSize))
length := c.inputBuffer.BookAck(n, waitReadSize)
var needTrigger = true
if length == n {
needTrigger = c.onRequest()
}
if needTrigger && length >= waitReadSize {
c.triggerRead()
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
for atomic.LoadInt32(&rconn.waitReadSize) <= 0 {
runtime.Gosched()
}
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size/2))
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size))
syscall.Write(w, msg[size/2:])
wg.Wait()
}
Expand Down
17 changes: 7 additions & 10 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (b *LinkBuffer) Book(min int, p [][]byte) (vs [][]byte) {
}

// BookAck will ack the first n malloc bytes and discard the rest.
func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
func (b *LinkBuffer) BookAck(n int, needSize int) (length int) {
var l int
for ack := n; ack > 0; ack = ack - l {
l = b.flush.malloc - len(b.flush.buf)
Expand All @@ -561,18 +561,16 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}

// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if isEnd && cap(b.flush.buf) > pagesize {
length = b.recalLen(n)
if length >= needSize && cap(b.flush.buf) > pagesize {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if b.flush.next == nil {
b.flush.next = newLinkBufferNode(0)
}
b.flush = b.flush.next
}
b.write = b.flush

// re-cal length
b.recalLen(n)
return nil
return
}

// Reset resets the buffer to be empty,
Expand All @@ -587,9 +585,8 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
// }

// recalLen re-calculate the length
func (b *LinkBuffer) recalLen(delta int) (err error) {
atomic.AddInt32(&b.length, int32(delta))
return nil
func (b *LinkBuffer) recalLen(delta int) (length int) {
return int(atomic.AddInt32(&b.length, int32(delta)))
}

// ------------------------------------------ implement link node ------------------------------------------
Expand Down
16 changes: 7 additions & 9 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (b *LinkBuffer) Book(min int, p [][]byte) (vs [][]byte) {
}

// BookAck will ack the first n malloc bytes and discard the rest.
func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
func (b *LinkBuffer) BookAck(n int, needSize int) (length int) {
b.Lock()
defer b.Unlock()
var l int
Expand All @@ -592,18 +592,17 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
for node := b.flush.next; node != nil; node = node.next {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if isEnd && cap(b.flush.buf) > pagesize {
length = b.recalLen(n)
if length >= needSize && cap(b.flush.buf) > pagesize {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if b.flush.next == nil {
b.flush.next = newLinkBufferNode(0)
}
b.flush = b.flush.next
}
b.write = b.flush

// re-cal length
b.recalLen(n)
return nil
return
}

// Reset resets the buffer to be empty,
Expand All @@ -618,9 +617,8 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
// }

// recalLen re-calculate the length
func (b *LinkBuffer) recalLen(delta int) (err error) {
atomic.AddInt32(&b.length, int32(delta))
return nil
func (b *LinkBuffer) recalLen(delta int) (length int) {
return int(atomic.AddInt32(&b.length, int32(delta)))
}

// recalMallocLen re-calculate the malloc length
Expand Down