Skip to content

Commit

Permalink
Merge branch 'develop' into feat/read-slice
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jan 12, 2022
2 parents f4570c4 + 7e0d07b commit 6c00df3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
5 changes: 5 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (c *connection) Release() (err error) {
// c.operator.do competes with c.inputs/c.inputAck
if c.inputBuffer.Len() == 0 && c.operator.do() {
maxSize := c.inputBuffer.calcMaxSize()
// Set the maximum value of maxsize equal to mallocMax to prevent GC pressure.
if maxSize > mallocMax {
maxSize = mallocMax
}

if maxSize > c.maxSize {
c.maxSize = maxSize
}
Expand Down
7 changes: 5 additions & 2 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,18 @@ func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
const maxBookSize = 16 * pagesize
// Auto size bookSize.
if n == c.bookSize && c.bookSize < maxBookSize {
if n == c.bookSize && c.bookSize < mallocMax {
c.bookSize <<= 1
}

length, _ := c.inputBuffer.bookAck(n)
if c.maxSize < length {
c.maxSize = length
}
if c.maxSize > mallocMax {
c.maxSize = mallocMax
}

var needTrigger = true
if length == n {
Expand Down
48 changes: 41 additions & 7 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -68,28 +69,26 @@ func TestConnectionRead(t *testing.T) {
wconn.init(&netFD{fd: w}, nil)

var size = 256
var cycleTime = 100000
var msg = make([]byte, size)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
for i := 0; i < cycleTime; i++ {
buf, err := rconn.Reader().Next(size)
if err != nil && errors.Is(err, ErrConnClosed) || !rconn.IsActive() {
return
}
rconn.Reader().Release()
MustNil(t, err)
rconn.Reader().Release()
Equal(t, len(buf), size)
}
}()
for i := 0; i < 100000; i++ {
for i := 0; i < cycleTime; i++ {
n, err := wconn.Write(msg)
MustNil(t, err)
Equal(t, n, len(msg))
}
rconn.Close()
wg.Wait()
rconn.Close()
}

func TestConnectionReadAfterClosed(t *testing.T) {
Expand Down Expand Up @@ -312,3 +311,38 @@ func TestConnectionUntil(t *testing.T) {
Equal(t, len(buf), 100)
MustTrue(t, errors.Is(err, ErrEOF))
}

func TestBookSizeLargerThanMaxSize(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)

var length = 25
dataCollection := make([][]byte, length)
for i := 0; i < length; i++ {
dataCollection[i] = make([]byte, 2<<i)
for j := 0; j < 2<<i; j++ {
dataCollection[i][j] = byte(rand.Intn(256))
}
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < length; i++ {
buf, err := rconn.Reader().Next(2 << i)
MustNil(t, err)
rconn.Reader().Release()
Equal(t, string(buf), string(dataCollection[i]))
}
}()
for i := 0; i < length; i++ {
n, err := wconn.Write(dataCollection[i])
MustNil(t, err)
Equal(t, n, 2<<i)
}
wg.Wait()
rconn.Close()
}

0 comments on commit 6c00df3

Please sign in to comment.