Skip to content

Commit

Permalink
feat: readslice api
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 8, 2021
1 parent 48bd816 commit 83d492d
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 0 deletions.
17 changes: 17 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ func (c *connection) Len() (length int) {
return c.inputBuffer.Len()
}

// ReadSlice implements Connection.
func (c *connection) ReadSlice(delim byte) (line []byte, err error) {
var n, l int
for {
if err = c.waitRead(n + 1); err != nil {
return
}
l = c.Reader().Len()
i := c.inputBuffer.indexByte(delim, n)
if i < 0 {
n = l
continue
}
return c.Next(i + 1)
}
}

// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {
if err = c.waitRead(n); err != nil {
Expand Down
32 changes: 32 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,35 @@ func TestSetTCPNoDelay(t *testing.T) {
n, _ = syscall.GetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY)
MustTrue(t, n == 0)
}

func TestConnectionReadSlice(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)

var msg = make([]byte, 1002)
msg[500] = '\n'
msg[1001] = '\n'
go func() {
for i := 0; i < 100000; i++ {
var pos int
for pos < len(msg) {
n, err := wconn.Write(msg[pos:])
MustNil(t, err)
pos += n
}
}
rconn.Close()
}()

for i := 0; i < 100000; i++ {
buf, err := rconn.Reader().ReadSlice('\n')
if err != nil && errors.Is(err, ErrConnClosed) || !rconn.IsActive() {
return
}
MustNil(t, err)
Equal(t, len(buf), 501)
rconn.Reader().Release()
}
}
8 changes: 8 additions & 0 deletions nocopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ type Reader interface {
//
ReadByte() (b byte, err error)

// ReadSlice reads until the first occurrence of delim in the input,
// returning a slice pointing at the bytes in the input buffer.
// The bytes stop being valid at the next read.
// If ReadSlice encounters an error before finding a delimiter,
// it returns all the data in the buffer and the error itself (often io.EOF).
// ReadSlice returns err != nil if and only if line does not end in delim.
ReadSlice(delim byte) (line []byte, err error)

// Slice returns a new Reader containing the next n bytes from this reader,
// the operation is zero-copy, similar to b = p [:n].
Slice(n int) (r Reader, err error)
Expand Down
36 changes: 36 additions & 0 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package netpoll

import (
"bytes"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -245,6 +246,15 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) {
}
}

// ReadSlice returns a slice ends with the delim in the buffer.
func (b *LinkBuffer) ReadSlice(delim byte) (line []byte, err error) {
n := b.indexByte(delim, 0)
if n < 0 {
return nil, fmt.Errorf("link buffer read slice cannot find: '%b'", delim)
}
return b.readBinary(n + 1), nil
}

// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer,
// and only holds the ability of Reader.
//
Expand Down Expand Up @@ -563,6 +573,32 @@ func (b *LinkBuffer) calcMaxSize() (sum int) {
return sum
}

// indexByte returns the index of the first instance of c in b, or -1 if c is not present in b.
func (b *LinkBuffer) indexByte(c byte, skip int) int {
var n, l, pos int
for node := b.read; node != nil; node = node.next {
l = node.Len()
pos = node.off
if l <= skip {
skip -= l
n += l
continue
} else if skip > 0 {
pos += skip
skip = 0
}

i := bytes.IndexByte(node.buf[pos:], c)
if i < 0 {
n += l
continue
}
n += pos - node.off + i
return n
}
return -1
}

// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *LinkBuffer) resetTail(maxSize int) {
Expand Down
36 changes: 36 additions & 0 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package netpoll

import (
"bytes"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -258,6 +259,15 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) {
}
}

// ReadSlice returns a slice ends with the delim in the buffer.
func (b *LinkBuffer) ReadSlice(delim byte) (line []byte, err error) {
n := b.indexByte(delim, 0)
if n < 0 {
return nil, fmt.Errorf("link buffer read slice cannot find: '%b'", delim)
}
return b.readBinary(n + 1), nil
}

// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer,
// and only holds the ability of Reader.
//
Expand Down Expand Up @@ -603,6 +613,32 @@ func (b *LinkBuffer) calcMaxSize() (sum int) {
return sum
}

// indexByte returns the index of the first instance of c in b, or -1 if c is not present in b.
func (b *LinkBuffer) indexByte(c byte, skip int) int {
var n, l, pos int
for node := b.read; node != nil; node = node.next {
l = node.Len()
pos = node.off
if l <= skip {
skip -= l
n += l
continue
} else if skip > 0 {
pos += skip
skip = 0
}

i := bytes.IndexByte(node.buf[pos:], c)
if i < 0 {
n += l
continue
}
n += pos - node.off + i
return n
}
return -1
}

// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *LinkBuffer) resetTail(maxSize int) {
Expand Down
19 changes: 19 additions & 0 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,25 @@ func TestUnsafeStringToSlice(t *testing.T) {
Equal(t, string(bs), "hello world")
}

func TestLinkBufferIndexByte(t *testing.T) {
// clean & new
LinkBufferCap = 128

lb := NewLinkBuffer()
buf, err := lb.Malloc(1002)
buf[500] = '\n'
buf[1001] = '\n'
MustNil(t, err)
lb.Flush()

n := lb.indexByte('\n', 0)
Equal(t, n, 500)
n = lb.indexByte('\n', 500)
Equal(t, n, 500)
n = lb.indexByte('\n', 501)
Equal(t, n, 1001)
}

func BenchmarkStringToSliceByte(b *testing.B) {
b.StopTimer()
s := "hello world"
Expand Down
4 changes: 4 additions & 0 deletions nocopy_readwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (r *zcReader) ReadByte() (b byte, err error) {
return r.buf.ReadByte()
}

func (r *zcReader) ReadSlice(delim byte) (line []byte, err error) {
return r.buf.ReadSlice(delim)
}

func (r *zcReader) waitRead(n int) (err error) {
for r.buf.Len() < n {
err = r.fill(n)
Expand Down

0 comments on commit 83d492d

Please sign in to comment.