From 18c2c1f498d2f763e7fc498554703d928332cccc Mon Sep 17 00:00:00 2001 From: Joway Date: Mon, 4 Mar 2024 15:39:19 +0800 Subject: [PATCH 01/11] fix: delay accept new connections if out of fds (#311) --- net_listener.go | 10 +++++- netpoll_server.go | 92 +++++++++++++++++++++++++++++++++++------------ netpoll_test.go | 74 ++++++++++++++++++++++++++++++++++++++ test_conns.sh | 13 +++++++ 4 files changed, 166 insertions(+), 23 deletions(-) create mode 100755 test_conns.sh diff --git a/net_listener.go b/net_listener.go index 4bebcac6..e7f9edc1 100644 --- a/net_listener.go +++ b/net_listener.go @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) { // tcp var fd, sa, err = syscall.Accept(ln.fd) if err != nil { - if err == syscall.EAGAIN { + /* https://man7.org/linux/man-pages/man2/accept.2.html + EAGAIN or EWOULDBLOCK + The socket is marked nonblocking and no connections are + present to be accepted. POSIX.1-2001 and POSIX.1-2008 + allow either error to be returned for this case, and do + not require these constants to have the same value, so a + portable application should check for both possibilities. + */ + if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK { return nil, nil } return nil, err diff --git a/netpoll_server.go b/netpoll_server.go index 2d6c5709..78f26b59 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -22,6 +22,7 @@ import ( "errors" "strings" "sync" + "syscall" "time" ) @@ -92,39 +93,86 @@ func (s *server) Close(ctx context.Context) error { func (s *server) OnRead(p Poll) error { // accept socket conn, err := s.ln.Accept() - if err != nil { - // shut down - if strings.Contains(err.Error(), "closed") { - s.operator.Control(PollDetach) - s.onQuit(err) + if err == nil { + if conn != nil { + s.onAccept(conn.(Conn)) + } + // EAGAIN | EWOULDBLOCK if conn and err both nil + return nil + } + logger.Printf("NETPOLL: accept conn failed: %v", err) + + // delay accept when too many open files + if isOutOfFdErr(err) { + // since we use Epoll LT, we have to detach listener fd from epoll first + // and re-register it when accept successfully or there is no available connection + cerr := s.operator.Control(PollDetach) + if cerr != nil { + logger.Printf("NETPOLL: detach listener fd failed: %v", cerr) return err } - logger.Println("NETPOLL: accept conn failed:", err.Error()) - return err + go func() { + retryTimes := []time.Duration{0, 10, 50, 100, 200, 500, 1000} // ms + retryTimeIndex := 0 + for { + if retryTimeIndex > 0 { + time.Sleep(retryTimes[retryTimeIndex] * time.Millisecond) + } + conn, err := s.ln.Accept() + if err == nil { + if conn == nil { + // recovery accept poll loop + s.operator.Control(PollReadable) + return + } + s.onAccept(conn.(Conn)) + logger.Println("NETPOLL: re-accept conn success:", conn.RemoteAddr()) + retryTimeIndex = 0 + continue + } + if retryTimeIndex+1 < len(retryTimes) { + retryTimeIndex++ + } + logger.Printf("NETPOLL: re-accept conn failed, err=[%s] and next retrytime=%dms", err.Error(), retryTimes[retryTimeIndex]) + } + }() } - if conn == nil { - return nil + + // shut down + if strings.Contains(err.Error(), "closed") { + s.operator.Control(PollDetach) + s.onQuit(err) + return err } + + return err +} + +// OnHup implements FDOperator. +func (s *server) OnHup(p Poll) error { + s.onQuit(errors.New("listener close")) + return nil +} + +func (s *server) onAccept(conn Conn) { // store & register connection - var connection = &connection{} - connection.init(conn.(Conn), s.opts) - if !connection.IsActive() { - return nil + var nconn = new(connection) + nconn.init(conn, s.opts) + if !nconn.IsActive() { + return } - var fd = conn.(Conn).Fd() - connection.AddCloseCallback(func(connection Connection) error { + var fd = conn.Fd() + nconn.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil }) - s.connections.Store(fd, connection) + s.connections.Store(fd, nconn) // trigger onConnect asynchronously - connection.onConnect() - return nil + nconn.onConnect() } -// OnHup implements FDOperator. -func (s *server) OnHup(p Poll) error { - s.onQuit(errors.New("listener close")) - return nil +func isOutOfFdErr(err error) bool { + se, ok := err.(syscall.Errno) + return ok && (se == syscall.EMFILE || se == syscall.ENFILE) } diff --git a/netpoll_test.go b/netpoll_test.go index fb985604..b01f0a95 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -21,9 +21,11 @@ import ( "context" "errors" "math/rand" + "os" "runtime" "sync" "sync/atomic" + "syscall" "testing" "time" ) @@ -507,6 +509,78 @@ func TestClientWriteAndClose(t *testing.T) { MustNil(t, err) } +func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { + if os.Getenv("N_LOCAL") == "" { + t.Skip("Only test for debug purpose") + return + } + + var originalRlimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + t.Logf("Original RLimit: %v", originalRlimit) + + rlimit := syscall.Rlimit{Cur: 32, Max: originalRlimit.Max} + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlimit) + MustNil(t, err) + t.Logf("New RLimit: %v", rlimit) + defer func() { // reset + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &originalRlimit) + MustNil(t, err) + }() + + var network, address = "tcp", ":18888" + var connected int32 + var loop = newTestEventLoop(network, address, + func(ctx context.Context, connection Connection) error { + buf, err := connection.Reader().Next(connection.Reader().Len()) + connection.Writer().WriteBinary(buf) + connection.Writer().Flush() + return err + }, + WithOnConnect(func(ctx context.Context, connection Connection) context.Context { + atomic.AddInt32(&connected, 1) + t.Logf("Conn[%s] accpeted", connection.RemoteAddr()) + return ctx + }), + WithOnDisconnect(func(ctx context.Context, connection Connection) { + t.Logf("Conn[%s] disconnected", connection.RemoteAddr()) + }), + ) + time.Sleep(time.Millisecond * 10) + + // out of fds + files := make([]*os.File, 0) + for { + f, err := os.Open("/dev/null") + if err != nil { + Assert(t, isOutOfFdErr(errors.Unwrap(err)), err) + break + } + files = append(files, f) + } + go func() { + time.Sleep(time.Second * 10) + t.Logf("close all files") + for _, f := range files { + f.Close() + } + }() + + // we should use telnet manually + var connections = 1 + for atomic.LoadInt32(&connected) < int32(connections) { + t.Logf("connected=%d", atomic.LoadInt32(&connected)) + time.Sleep(time.Second) + } + time.Sleep(time.Second * 10) + + err = loop.Shutdown(context.Background()) + MustNil(t, err) +} + func createTestListener(network, address string) (Listener, error) { for { ln, err := CreateListener(network, address) diff --git a/test_conns.sh b/test_conns.sh new file mode 100755 index 00000000..33127f52 --- /dev/null +++ b/test_conns.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +ip="$1" +port="$2" +conns="$3" +timeout="$4" + +for i in $(seq 1 $conns); +do + nc -v -w $timeout $ip $port < /dev/null & +done + +wait From bb9c3f74620c19b139d13c20793f9ff016d302c0 Mon Sep 17 00:00:00 2001 From: QihengZhou Date: Mon, 11 Mar 2024 20:04:04 +0800 Subject: [PATCH 02/11] feat: allow GetBytes to get all bytes (#313) --- nocopy_linkbuffer.go | 9 +++++++++ nocopy_linkbuffer_race.go | 9 +++++++++ nocopy_linkbuffer_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 555ba5ce..c888c3c8 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -557,8 +557,17 @@ func (b *LinkBuffer) Bytes() []byte { } // GetBytes will read and fill the slice p as much as possible. +// If p is not passed, return all readable bytes. func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { node, flush := b.read, b.flush + if len(p) == 0 { + n := 0 + for ; node != flush; node = node.next { + n++ + } + node = b.read + p = make([][]byte, n) + } var i int for i = 0; node != flush && i < len(p); node = node.next { if node.Len() > 0 { diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 4b3635d0..4166222e 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -599,10 +599,19 @@ func (b *LinkBuffer) Bytes() []byte { } // GetBytes will read and fill the slice p as much as possible. +// If p is not passed, return all readable bytes. func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { b.Lock() defer b.Unlock() node, flush := b.read, b.flush + if len(p) == 0 { + n := 0 + for ; node != flush; node = node.next { + n++ + } + node = b.read + p = make([][]byte, n) + } var i int for i = 0; node != flush && i < len(p); node = node.next { if node.Len() > 0 { diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index c3f9b9d8..a376b7bf 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -84,6 +84,31 @@ func TestLinkBuffer(t *testing.T) { Equal(t, buf.Len(), 100) } +func TestGetBytes(t *testing.T) { + buf := NewLinkBuffer() + var ( + num = 10 + b = 1 + expectedLen = 0 + ) + for i := 0; i < num; i++ { + expectedLen += b + n, err := buf.WriteBinary(make([]byte, b)) + MustNil(t, err) + Equal(t, n, b) + b *= 10 + } + buf.Flush() + Equal(t, int(buf.length), expectedLen) + bs := buf.GetBytes(nil) + actualLen := 0 + for i := 0; i < len(bs); i++ { + actualLen += len(bs[i]) + } + Equal(t, actualLen, expectedLen) + +} + // TestLinkBufferWithZero test more case with n is invalid. func TestLinkBufferWithInvalid(t *testing.T) { // clean & new From 05a1094e4b54be84b006e42ab385baf7ec6b9b33 Mon Sep 17 00:00:00 2001 From: Joway Date: Tue, 23 Apr 2024 11:04:37 +0800 Subject: [PATCH 03/11] perf: nocopy read for ReadString and ReadBinary API if possible (#315) --- nocopy.go | 62 ++- nocopy_linkbuffer.go | 266 ++++++------ nocopy_linkbuffer_norace.go | 20 + nocopy_linkbuffer_race.go | 821 ++++-------------------------------- nocopy_linkbuffer_test.go | 142 +++++++ 5 files changed, 424 insertions(+), 887 deletions(-) create mode 100644 nocopy_linkbuffer_norace.go diff --git a/nocopy.go b/nocopy.go index 80df5f9b..6df02e8f 100644 --- a/nocopy.go +++ b/nocopy.go @@ -16,6 +16,10 @@ package netpoll import ( "io" + "reflect" + "unsafe" + + "github.com/bytedance/gopkg/lang/mcache" ) // Reader is a collection of operations for nocopy reads. @@ -108,9 +112,9 @@ type Reader interface { // The usage of the design is a two-step operation, first apply for a section of memory, // fill it and then submit. E.g: // -// var buf, _ = Malloc(n) -// buf = append(buf[:0], ...) -// Flush() +// var buf, _ = Malloc(n) +// buf = append(buf[:0], ...) +// Flush() // // Note that it is not recommended to submit self-managed buffers to Writer. // Since the writer is processed asynchronously, if the self-managed buffer is used and recycled after submission, @@ -244,10 +248,52 @@ func NewIOReadWriter(rw ReadWriter) io.ReadWriter { } const ( - block1k = 1 * 1024 - block2k = 2 * 1024 - block4k = 4 * 1024 - block8k = 8 * 1024 + block1k = 1 * 1024 + block2k = 2 * 1024 + block4k = 4 * 1024 + block8k = 8 * 1024 + block32k = 32 * 1024 + + pagesize = block8k + mallocMax = block8k * block1k // mallocMax is 8MB + + minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes + + defaultLinkBufferMode = 0 + // readonly mode indicate that the buffer node memory is not controlled by itself, + // so we cannot reuse the buffer or nocopy read it, default value is false. + readonlyMask uint8 = 1 << 0 // 0000 0001 + // nocopyRead mode indicate that the buffer node has been no copy read and cannot reuse the buffer, default value is false. + nocopyReadMask uint8 = 1 << 1 // 0000 0010 ) -const pagesize = block8k +// zero-copy slice convert to string +func unsafeSliceToString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} + +// zero-copy slice convert to string +func unsafeStringToSlice(s string) (b []byte) { + p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + hdr.Data = uintptr(p) + hdr.Cap = len(s) + hdr.Len = len(s) + return b +} + +// malloc limits the cap of the buffer from mcache. +func malloc(size, capacity int) []byte { + if capacity > mallocMax { + return make([]byte, size, capacity) + } + return mcache.Malloc(size, capacity) +} + +// free limits the cap of the buffer from mcache. +func free(buf []byte) { + if cap(buf) > mallocMax { + return + } + mcache.Free(buf) +} diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index c888c3c8..cb48cad1 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -1,4 +1,4 @@ -// Copyright 2022 CloudWeGo Authors +// Copyright 2024 CloudWeGo Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,21 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !race -// +build !race - package netpoll import ( "bytes" "errors" "fmt" - "reflect" "sync" "sync/atomic" - "unsafe" - - "github.com/bytedance/gopkg/lang/mcache" ) // BinaryInplaceThreshold marks the minimum value of the nocopy slice length, @@ -36,6 +29,9 @@ const BinaryInplaceThreshold = block4k // LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. var LinkBufferCap = block4k +var _ Reader = &LinkBuffer{} +var _ Writer = &LinkBuffer{} + // NewLinkBuffer size defines the initial capacity, but there is no readable data. func NewLinkBuffer(size ...int) *LinkBuffer { var buf = &LinkBuffer{} @@ -48,8 +44,8 @@ func NewLinkBuffer(size ...int) *LinkBuffer { return buf } -// LinkBuffer implements ReadWriter. -type LinkBuffer struct { +// UnsafeLinkBuffer implements ReadWriter. +type UnsafeLinkBuffer struct { length int64 mallocSize int @@ -61,24 +57,21 @@ type LinkBuffer struct { caches [][]byte // buf allocated by Next when cross-package, which should be freed when release } -var _ Reader = &LinkBuffer{} -var _ Writer = &LinkBuffer{} - // Len implements Reader. -func (b *LinkBuffer) Len() int { +func (b *UnsafeLinkBuffer) Len() int { l := atomic.LoadInt64(&b.length) return int(l) } // IsEmpty check if this LinkBuffer is empty. -func (b *LinkBuffer) IsEmpty() (ok bool) { +func (b *UnsafeLinkBuffer) IsEmpty() (ok bool) { return b.Len() == 0 } // ------------------------------------------ implement zero-copy reader ------------------------------------------ // Next implements Reader. -func (b *LinkBuffer) Next(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) Next(n int) (p []byte, err error) { if n <= 0 { return } @@ -117,7 +110,7 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) { // Peek does not have an independent lifecycle, and there is no signal to // indicate that Peek content can be released, so Peek will not introduce mcache for now. -func (b *LinkBuffer) Peek(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { if n <= 0 { return } @@ -154,7 +147,7 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) { } // Skip implements Reader. -func (b *LinkBuffer) Skip(n int) (err error) { +func (b *UnsafeLinkBuffer) Skip(n int) (err error) { if n <= 0 { return } @@ -178,7 +171,7 @@ func (b *LinkBuffer) Skip(n int) (err error) { // Release the node that has been read. // b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice -func (b *LinkBuffer) Release() (err error) { +func (b *UnsafeLinkBuffer) Release() (err error) { for b.read != b.flush && b.read.Len() == 0 { b.read = b.read.next } @@ -196,7 +189,7 @@ func (b *LinkBuffer) Release() (err error) { } // ReadString implements Reader. -func (b *LinkBuffer) ReadString(n int) (s string, err error) { +func (b *UnsafeLinkBuffer) ReadString(n int) (s string, err error) { if n <= 0 { return } @@ -208,7 +201,7 @@ func (b *LinkBuffer) ReadString(n int) (s string, err error) { } // ReadBinary implements Reader. -func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { +func (b *UnsafeLinkBuffer) ReadBinary(n int) (p []byte, err error) { if n <= 0 { return } @@ -220,15 +213,30 @@ func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { } // readBinary cannot use mcache, because the memory allocated by readBinary will not be recycled. -func (b *LinkBuffer) readBinary(n int) (p []byte) { +func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { b.recalLen(-n) // re-cal length // single node - p = make([]byte, n) if b.isSingleNode(n) { + // we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself + if !b.read.getMode(readonlyMask) { + // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently + // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec + // no need to malloc 10 times and the string slice could have the compact memory allocation. + if b.read.getMode(nocopyReadMask) { + return b.read.Next(n) + } + if n >= minReuseBytes && cap(b.read.buf) <= block32k { + b.read.setMode(nocopyReadMask, true) + return b.read.Next(n) + } + } + // if the underlying buffer too large, we shouldn't use no-copy mode + p = make([]byte, n) copy(p, b.read.Next(n)) return p } + p = make([]byte, n) // multiple nodes var pIdx int var l int @@ -247,7 +255,7 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) { } // ReadByte implements Reader. -func (b *LinkBuffer) ReadByte() (p byte, err error) { +func (b *UnsafeLinkBuffer) ReadByte() (p byte, err error) { // check whether enough or not. if b.Len() < 1 { return p, errors.New("link buffer read byte is empty") @@ -262,7 +270,7 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) { } // Until returns a slice ends with the delim in the buffer. -func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { +func (b *UnsafeLinkBuffer) Until(delim byte) (line []byte, err error) { n := b.indexByte(delim, 0) if n < 0 { return nil, fmt.Errorf("link buffer read slice cannot find: '%b'", delim) @@ -274,7 +282,7 @@ func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { // and only holds the ability of Reader. // // Slice will automatically execute a Release. -func (b *LinkBuffer) Slice(n int) (r Reader, err error) { +func (b *UnsafeLinkBuffer) Slice(n int) (r Reader, err error) { if n <= 0 { return NewLinkBuffer(0), nil } @@ -285,9 +293,9 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { b.recalLen(-n) // re-cal length // just use for range - p := &LinkBuffer{ - length: int64(n), - } + p := new(LinkBuffer) + p.length = int64(n) + defer func() { // set to read-only p.flush = p.flush.next @@ -324,7 +332,7 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) { // ------------------------------------------ implement zero-copy writer ------------------------------------------ // Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush). -func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { +func (b *UnsafeLinkBuffer) Malloc(n int) (buf []byte, err error) { if n <= 0 { return } @@ -334,12 +342,12 @@ func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { } // MallocLen implements Writer. -func (b *LinkBuffer) MallocLen() (length int) { +func (b *UnsafeLinkBuffer) MallocLen() (length int) { return b.mallocSize } // MallocAck will keep the first n malloc bytes and discard the rest. -func (b *LinkBuffer) MallocAck(n int) (err error) { +func (b *UnsafeLinkBuffer) MallocAck(n int) (err error) { if n < 0 { return fmt.Errorf("link buffer malloc ack[%d] invalid", n) } @@ -363,7 +371,7 @@ func (b *LinkBuffer) MallocAck(n int) (err error) { } // Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned. -func (b *LinkBuffer) Flush() (err error) { +func (b *UnsafeLinkBuffer) Flush() (err error) { b.mallocSize = 0 // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. if cap(b.write.buf) > pagesize { @@ -385,7 +393,7 @@ func (b *LinkBuffer) Flush() (err error) { } // Append implements Writer. -func (b *LinkBuffer) Append(w Writer) (err error) { +func (b *UnsafeLinkBuffer) Append(w Writer) (err error) { var buf, ok = w.(*LinkBuffer) if !ok { return errors.New("unsupported writer which is not LinkBuffer") @@ -396,7 +404,7 @@ func (b *LinkBuffer) Append(w Writer) (err error) { // WriteBuffer will not submit(e.g. Flush) data to ensure normal use of MallocLen. // you must actively submit before read the data. // The argument buf can't be used after calling WriteBuffer. (set it to nil) -func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { +func (b *UnsafeLinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { if buf == nil { return } @@ -435,7 +443,7 @@ func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { } // WriteString implements Writer. -func (b *LinkBuffer) WriteString(s string) (n int, err error) { +func (b *UnsafeLinkBuffer) WriteString(s string) (n int, err error) { if len(s) == 0 { return } @@ -444,7 +452,7 @@ func (b *LinkBuffer) WriteString(s string) (n int, err error) { } // WriteBinary implements Writer. -func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { +func (b *UnsafeLinkBuffer) WriteBinary(p []byte) (n int, err error) { n = len(p) if n == 0 { return @@ -466,8 +474,8 @@ func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. -func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { - n := len(p) +func (b *UnsafeLinkBuffer) WriteDirect(extra []byte, remainLen int) error { + n := len(extra) if n == 0 || remainLen < 0 { return nil } @@ -479,20 +487,26 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { origin = origin.next } // Add the buf length of the original node + // `malloc` is the origin buffer offset that already malloced, the extra buffer should be inserted after that offset. malloc += len(origin.buf) // Create dataNode and newNode and insert them into the chain - dataNode := newLinkBufferNode(0) - dataNode.buf, dataNode.malloc = p[:0], n + // dataNode wrap the user buffer extra, and newNode wrap the origin left netpoll buffer + // - originNode{buf=origin, off=0, malloc=malloc, readonly=true} : non-reusable + // - dataNode{buf=extra, off=0, malloc=len(extra), readonly=true} : non-reusable + // - newNode{buf=origin, off=malloc, malloc=origin.malloc, readonly=false} : reusable + dataNode := newLinkBufferNode(0) // zero node will be set by readonly mode + dataNode.buf, dataNode.malloc = extra[:0], n if remainLen > 0 { + // split a single buffer node to originNode and newNode newNode := newLinkBufferNode(0) newNode.off = malloc newNode.buf = origin.buf[:malloc] newNode.malloc = origin.malloc - newNode.readonly = false + newNode.setMode(readonlyMask, false) origin.malloc = malloc - origin.readonly = true + origin.setMode(readonlyMask, true) // link nodes dataNode.next = newNode @@ -514,7 +528,7 @@ func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { } // WriteByte implements Writer. -func (b *LinkBuffer) WriteByte(p byte) (err error) { +func (b *UnsafeLinkBuffer) WriteByte(p byte) (err error) { dst, err := b.Malloc(1) if len(dst) == 1 { dst[0] = p @@ -523,7 +537,7 @@ func (b *LinkBuffer) WriteByte(p byte) (err error) { } // Close will recycle all buffer. -func (b *LinkBuffer) Close() (err error) { +func (b *UnsafeLinkBuffer) Close() (err error) { atomic.StoreInt64(&b.length, 0) b.mallocSize = 0 // just release all @@ -540,7 +554,7 @@ func (b *LinkBuffer) Close() (err error) { // ------------------------------------------ implement connection interface ------------------------------------------ // Bytes returns all the readable bytes of this LinkBuffer. -func (b *LinkBuffer) Bytes() []byte { +func (b *UnsafeLinkBuffer) Bytes() []byte { node, flush := b.read, b.flush if node == flush { return node.buf[node.off:] @@ -558,7 +572,7 @@ func (b *LinkBuffer) Bytes() []byte { // GetBytes will read and fill the slice p as much as possible. // If p is not passed, return all readable bytes. -func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { +func (b *UnsafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { node, flush := b.read, b.flush if len(p) == 0 { n := 0 @@ -588,7 +602,7 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // maxSize: The maximum size of data between two Release(). In some cases, this can // // guarantee all data allocated in one node to reduce copy. -func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { +func (b *UnsafeLinkBuffer) book(bookSize, maxSize int) (p []byte) { l := cap(b.write.buf) - b.write.malloc // grow linkBuffer if l == 0 { @@ -605,7 +619,7 @@ func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { // bookAck will ack the first n malloc bytes and discard the rest. // // length: The size of data in inputBuffer. It is used to calculate the maxSize -func (b *LinkBuffer) bookAck(n int) (length int, err error) { +func (b *UnsafeLinkBuffer) bookAck(n int) (length int, err error) { b.write.malloc = n + len(b.write.buf) b.write.buf = b.write.buf[:b.write.malloc] b.flush = b.write @@ -616,7 +630,7 @@ func (b *LinkBuffer) bookAck(n int) (length int, err error) { } // calcMaxSize will calculate the data size between two Release() -func (b *LinkBuffer) calcMaxSize() (sum int) { +func (b *UnsafeLinkBuffer) calcMaxSize() (sum int) { for node := b.head; node != b.read; node = node.next { sum += len(node.buf) } @@ -624,8 +638,24 @@ func (b *LinkBuffer) calcMaxSize() (sum int) { return sum } +// resetTail will reset tail node or add an empty tail node to +// guarantee the tail node is not larger than 8KB +func (b *UnsafeLinkBuffer) resetTail(maxSize int) { + // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. + if maxSize <= pagesize { + b.write.Reset() + return + } + + // set nil tail + b.write.next = newLinkBufferNode(0) + b.write = b.write.next + b.flush = b.write + return +} + // indexByte returns the index of the first instance of c in buffer, or -1 if c is not present in buffer. -func (b *LinkBuffer) indexByte(c byte, skip int) int { +func (b *UnsafeLinkBuffer) indexByte(c byte, skip int) int { size := b.Len() if skip >= size { return -1 @@ -656,25 +686,41 @@ func (b *LinkBuffer) indexByte(c byte, skip int) int { return -1 } -// resetTail will reset tail node or add an empty tail node to -// guarantee the tail node is not larger than 8KB -func (b *LinkBuffer) resetTail(maxSize int) { - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if maxSize <= pagesize { - b.write.Reset() +// ------------------------------------------ private function ------------------------------------------ + +// recalLen re-calculate the length +func (b *UnsafeLinkBuffer) recalLen(delta int) (length int) { + return int(atomic.AddInt64(&b.length, int64(delta))) +} + +// growth directly create the next node, when b.write is not enough. +func (b *UnsafeLinkBuffer) growth(n int) { + if n <= 0 { return } - - // set nil tail - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.flush = b.write - return + // the memory of readonly node if not malloc by us so should skip them + for b.write.getMode(readonlyMask) || cap(b.write.buf)-b.write.malloc < n { + if b.write.next == nil { + b.write.next = newLinkBufferNode(n) + b.write = b.write.next + return + } + b.write = b.write.next + } } -// recalLen re-calculate the length -func (b *LinkBuffer) recalLen(delta int) (length int) { - return int(atomic.AddInt64(&b.length, int64(delta))) +// isSingleNode determines whether reading needs to cross nodes. +// Must require b.Len() > 0 +func (b *UnsafeLinkBuffer) isSingleNode(readN int) (single bool) { + if readN <= 0 { + return true + } + l := b.read.Len() + for l == 0 && b.read != b.flush { + b.read = b.read.next + l = b.read.Len() + } + return l >= readN } // ------------------------------------------ implement link node ------------------------------------------ @@ -684,9 +730,9 @@ func (b *LinkBuffer) recalLen(delta int) (length int) { func newLinkBufferNode(size int) *linkBufferNode { var node = linkedPool.Get().(*linkBufferNode) // reset node offset - node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false + node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode if size <= 0 { - node.readonly = true + node.setMode(readonlyMask, true) return node } if size < LinkBufferCap { @@ -705,13 +751,13 @@ var linkedPool = sync.Pool{ } type linkBufferNode struct { - buf []byte // buffer - off int // read-offset - malloc int // write-offset - refer int32 // reference count - readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false - origin *linkBufferNode // the root node of the extends - next *linkBufferNode // the next node of the linked buffer + buf []byte // buffer + off int // read-offset + malloc int // write-offset + refer int32 // reference count + mode uint8 // mode store all bool bit status + origin *linkBufferNode // the root node of the extends + next *linkBufferNode // the next node of the linked buffer } func (node *linkBufferNode) Len() (l int) { @@ -772,7 +818,7 @@ func (node *linkBufferNode) Release() (err error) { // release self if atomic.AddInt32(&node.refer, -1) == 0 { // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. - if !node.readonly { + if node.reusable() { free(node.buf) } node.buf, node.origin, node.next = nil, nil, nil @@ -781,68 +827,18 @@ func (node *linkBufferNode) Release() (err error) { return nil } -// ------------------------------------------ private function ------------------------------------------ - -// growth directly create the next node, when b.write is not enough. -func (b *LinkBuffer) growth(n int) { - if n <= 0 { - return - } - // Must skip read-only node. - for b.write.readonly || cap(b.write.buf)-b.write.malloc < n { - if b.write.next == nil { - b.write.next = newLinkBufferNode(n) - b.write = b.write.next - return - } - b.write = b.write.next - } +func (node *linkBufferNode) getMode(mask uint8) bool { + return node.mode&mask > 0 } -// isSingleNode determines whether reading needs to cross nodes. -// Must require b.Len() > 0 -func (b *LinkBuffer) isSingleNode(readN int) (single bool) { - if readN <= 0 { - return true - } - l := b.read.Len() - for l == 0 && b.read != b.flush { - b.read = b.read.next - l = b.read.Len() - } - return l >= readN -} - -// zero-copy slice convert to string -func unsafeSliceToString(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} - -// zero-copy slice convert to string -func unsafeStringToSlice(s string) (b []byte) { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b -} - -// mallocMax is 8MB -const mallocMax = block8k * block1k - -// malloc limits the cap of the buffer from mcache. -func malloc(size, capacity int) []byte { - if capacity > mallocMax { - return make([]byte, size, capacity) +func (node *linkBufferNode) setMode(mask uint8, enable bool) { + if enable { + node.mode = node.mode | mask + } else { + node.mode = node.mode & ^mask } - return mcache.Malloc(size, capacity) } -// free limits the cap of the buffer from mcache. -func free(buf []byte) { - if cap(buf) > mallocMax { - return - } - mcache.Free(buf) +func (node *linkBufferNode) reusable() bool { + return node.mode&(nocopyReadMask|readonlyMask) == 0 } diff --git a/nocopy_linkbuffer_norace.go b/nocopy_linkbuffer_norace.go new file mode 100644 index 00000000..8a07382f --- /dev/null +++ b/nocopy_linkbuffer_norace.go @@ -0,0 +1,20 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !race +// +build !race + +package netpoll + +type LinkBuffer = UnsafeLinkBuffer diff --git a/nocopy_linkbuffer_race.go b/nocopy_linkbuffer_race.go index 4166222e..642b01c6 100644 --- a/nocopy_linkbuffer_race.go +++ b/nocopy_linkbuffer_race.go @@ -18,612 +18,175 @@ package netpoll import ( - "bytes" - "errors" - "fmt" - "reflect" "sync" - "sync/atomic" - "unsafe" - - "github.com/bytedance/gopkg/lang/mcache" ) -// BinaryInplaceThreshold marks the minimum value of the nocopy slice length, -// which is the threshold to use copy to minimize overhead. -const BinaryInplaceThreshold = block4k - -// LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. -var LinkBufferCap = block4k - -// NewLinkBuffer size defines the initial capacity, but there is no readable data. -func NewLinkBuffer(size ...int) *LinkBuffer { - var buf = &LinkBuffer{} - var l int - if len(size) > 0 { - l = size[0] - } - var node = newLinkBufferNode(l) - buf.head, buf.read, buf.flush, buf.write = node, node, node, node - return buf -} +type LinkBuffer = SafeLinkBuffer -// LinkBuffer implements ReadWriter. -type LinkBuffer struct { +// SafeLinkBuffer only used to in go tests with -race +type SafeLinkBuffer struct { sync.Mutex - length int32 - mallocSize int - - head *linkBufferNode // release head - read *linkBufferNode // read head - flush *linkBufferNode // malloc head - write *linkBufferNode // malloc tail - - caches [][]byte // buf allocated by Next when cross-package, which should be freed when release -} - -var _ Reader = &LinkBuffer{} -var _ Writer = &LinkBuffer{} - -// Len implements Reader. -func (b *LinkBuffer) Len() int { - l := atomic.LoadInt32(&b.length) - return int(l) -} - -// IsEmpty check if this LinkBuffer is empty. -func (b *LinkBuffer) IsEmpty() (ok bool) { - return b.Len() == 0 + UnsafeLinkBuffer } // ------------------------------------------ implement zero-copy reader ------------------------------------------ // Next implements Reader. -func (b *LinkBuffer) Next(n int) (p []byte, err error) { +func (b *SafeLinkBuffer) Next(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer next[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - // single node - if b.isSingleNode(n) { - return b.read.Next(n), nil - } - // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = make([]byte, n) - } - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - pIdx += copy(p[pIdx:], b.read.Next(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], b.read.Next(l)) - } - b.read = b.read.next - } - _ = pIdx - return p, nil + return b.UnsafeLinkBuffer.Next(n) } -// Peek does not have an independent lifecycle, and there is no signal to -// indicate that Peek content can be released, so Peek will not introduce mcache for now. -func (b *LinkBuffer) Peek(n int) (p []byte, err error) { +// Peek implements Reader. +func (b *SafeLinkBuffer) Peek(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer peek[%d] not enough", n) - } - // single node - if b.isSingleNode(n) { - return b.read.Peek(n), nil - } - // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = make([]byte, n) - } - var node = b.read - var l int - for ack := n; ack > 0; ack = ack - l { - l = node.Len() - if l >= ack { - pIdx += copy(p[pIdx:], node.Peek(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], node.Peek(l)) - } - node = node.next - } - _ = pIdx - return p, nil + return b.UnsafeLinkBuffer.Peek(n) } // Skip implements Reader. -func (b *LinkBuffer) Skip(n int) (err error) { +func (b *SafeLinkBuffer) Skip(n int) (err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return fmt.Errorf("link buffer skip[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - b.read.off += ack - break - } - b.read = b.read.next - } - return nil + return b.UnsafeLinkBuffer.Skip(n) } -// Until returns a slice ends with the delim in the buffer. -func (b *LinkBuffer) Until(delim byte) (line []byte, err error) { +// Until implements Reader. +func (b *SafeLinkBuffer) Until(delim byte) (line []byte, err error) { b.Lock() defer b.Unlock() - n := b.indexByte(delim, 0) - if n < 0 { - return nil, fmt.Errorf("link buffer cannot find delim: '%b'", delim) - } - return b.Next(n + 1) + return b.UnsafeLinkBuffer.Until(delim) } -// Release the node that has been read. -// b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice -func (b *LinkBuffer) Release() (err error) { +// Release implements Reader. +func (b *SafeLinkBuffer) Release() (err error) { b.Lock() defer b.Unlock() - return b.release() -} - -func (b *LinkBuffer) release() (err error) { - for b.read != b.flush && b.read.Len() == 0 { - b.read = b.read.next - } - for b.head != b.read { - node := b.head - b.head = b.head.next - node.Release() - } - for i := range b.caches { - free(b.caches[i]) - b.caches[i] = nil - } - b.caches = b.caches[:0] - return nil + return b.UnsafeLinkBuffer.Release() } // ReadString implements Reader. -func (b *LinkBuffer) ReadString(n int) (s string, err error) { +func (b *SafeLinkBuffer) ReadString(n int) (s string, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return s, fmt.Errorf("link buffer read string[%d] not enough", n) - } - return unsafeSliceToString(b.readBinary(n)), nil + return b.UnsafeLinkBuffer.ReadString(n) } // ReadBinary implements Reader. -func (b *LinkBuffer) ReadBinary(n int) (p []byte, err error) { +func (b *SafeLinkBuffer) ReadBinary(n int) (p []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - // check whether enough or not. - if b.Len() < n { - return p, fmt.Errorf("link buffer read binary[%d] not enough", n) - } - return b.readBinary(n), nil -} - -// readBinary cannot use mcache, because the memory allocated by readBinary will not be recycled. -func (b *LinkBuffer) readBinary(n int) (p []byte) { - b.recalLen(-n) // re-cal length - - // single node - p = make([]byte, n) - if b.isSingleNode(n) { - copy(p, b.read.Next(n)) - return p - } - // multiple nodes - var pIdx int - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - pIdx += copy(p[pIdx:], b.read.Next(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], b.read.Next(l)) - } - b.read = b.read.next - } - _ = pIdx - return p + return b.UnsafeLinkBuffer.ReadBinary(n) } // ReadByte implements Reader. -func (b *LinkBuffer) ReadByte() (p byte, err error) { +func (b *SafeLinkBuffer) ReadByte() (p byte, err error) { b.Lock() defer b.Unlock() - // check whether enough or not. - if b.Len() < 1 { - return p, errors.New("link buffer read byte is empty") - } - b.recalLen(-1) // re-cal length - for { - if b.read.Len() >= 1 { - return b.read.Next(1)[0], nil - } - b.read = b.read.next - } + return b.UnsafeLinkBuffer.ReadByte() } -// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer, -// and only holds the ability of Reader. -// -// Slice will automatically execute a Release. -func (b *LinkBuffer) Slice(n int) (r Reader, err error) { +// Slice implements Reader. +func (b *SafeLinkBuffer) Slice(n int) (r Reader, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return NewLinkBuffer(0), nil - } - // check whether enough or not. - if b.Len() < n { - return r, fmt.Errorf("link buffer readv[%d] not enough", n) - } - b.recalLen(-n) // re-cal length - - // just use for range - p := &LinkBuffer{ - length: int32(n), - } - defer func() { - // set to read-only - p.flush = p.flush.next - p.write = p.flush - }() - - // single node - if b.isSingleNode(n) { - node := b.read.Refer(n) - p.head, p.read, p.flush = node, node, node - return p, nil - } - // multiple nodes - var l = b.read.Len() - node := b.read.Refer(l) - b.read = b.read.next - - p.head, p.read, p.flush = node, node, node - for ack := n - l; ack > 0; ack = ack - l { - l = b.read.Len() - if l >= ack { - p.flush.next = b.read.Refer(ack) - p.flush = p.flush.next - break - } else if l > 0 { - p.flush.next = b.read.Refer(l) - p.flush = p.flush.next - } - b.read = b.read.next - } - return p, b.release() + return b.UnsafeLinkBuffer.Slice(n) } // ------------------------------------------ implement zero-copy writer ------------------------------------------ -// Malloc pre-allocates memory, which is not readable, and becomes readable data after submission(e.g. Flush). -func (b *LinkBuffer) Malloc(n int) (buf []byte, err error) { +// Malloc implements Writer. +func (b *SafeLinkBuffer) Malloc(n int) (buf []byte, err error) { b.Lock() defer b.Unlock() - if n <= 0 { - return - } - b.mallocSize += n - b.growth(n) - return b.write.Malloc(n), nil + return b.UnsafeLinkBuffer.Malloc(n) } // MallocLen implements Writer. -func (b *LinkBuffer) MallocLen() (length int) { +func (b *SafeLinkBuffer) MallocLen() (length int) { b.Lock() defer b.Unlock() - length = b.mallocSize - return length + return b.UnsafeLinkBuffer.MallocLen() } -// MallocAck will keep the first n malloc bytes and discard the rest. -func (b *LinkBuffer) MallocAck(n int) (err error) { +// MallocAck implements Writer. +func (b *SafeLinkBuffer) MallocAck(n int) (err error) { b.Lock() defer b.Unlock() - if n < 0 { - return fmt.Errorf("link buffer malloc ack[%d] invalid", n) - } - b.mallocSize = n - b.write = b.flush - - var l int - for ack := n; ack > 0; ack = ack - l { - l = b.write.malloc - len(b.write.buf) - if l >= ack { - b.write.malloc = ack + len(b.write.buf) - break - } - b.write = b.write.next - } - // discard the rest - for node := b.write.next; node != nil; node = node.next { - node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0] - } - return nil + return b.UnsafeLinkBuffer.MallocAck(n) } -// Flush will submit all malloc data and must confirm that the allocated bytes have been correctly assigned. -func (b *LinkBuffer) Flush() (err error) { +// Flush implements Writer. +func (b *SafeLinkBuffer) Flush() (err error) { b.Lock() defer b.Unlock() - b.mallocSize = 0 - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if cap(b.write.buf) > pagesize { - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - } - var n int - for node := b.flush; node != b.write.next; node = node.next { - delta := node.malloc - len(node.buf) - if delta > 0 { - n += delta - node.buf = node.buf[:node.malloc] - } - } - b.flush = b.write - // re-cal length - b.recalLen(n) - return nil + return b.UnsafeLinkBuffer.Flush() } // Append implements Writer. -func (b *LinkBuffer) Append(w Writer) (err error) { - var buf, ok = w.(*LinkBuffer) - if !ok { - return errors.New("unsupported writer which is not LinkBuffer") - } - return b.WriteBuffer(buf) +func (b *SafeLinkBuffer) Append(w Writer) (err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.Append(w) } -// WriteBuffer will not submit(e.g. Flush) data to ensure normal use of MallocLen. -// you must actively submit before read the data. -// The argument buf can't be used after calling WriteBuffer. (set it to nil) -func (b *LinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { +// WriteBuffer implements Writer. +func (b *SafeLinkBuffer) WriteBuffer(buf *LinkBuffer) (err error) { b.Lock() defer b.Unlock() - if buf == nil { - return - } - bufLen, bufMallocLen := buf.Len(), buf.MallocLen() - if bufLen+bufMallocLen <= 0 { - return nil - } - b.write.next = buf.read - b.write = buf.write - - // close buf, prevents reuse. - for buf.head != buf.read { - nd := buf.head - buf.head = buf.head.next - nd.Release() - } - for buf.write = buf.write.next; buf.write != nil; { - nd := buf.write - buf.write = buf.write.next - nd.Release() - } - buf.length, buf.mallocSize, buf.head, buf.read, buf.flush, buf.write = 0, 0, nil, nil, nil, nil - - // DON'T MODIFY THE CODE BELOW UNLESS YOU KNOW WHAT YOU ARE DOING ! - // - // You may encounter a chain of bugs and not be able to - // find out within a week that they are caused by modifications here. - // - // After release buf, continue to adjust b. - b.write.next = nil - if bufLen > 0 { - b.recalLen(bufLen) - } - b.mallocSize += bufMallocLen - return nil + return b.UnsafeLinkBuffer.WriteBuffer(buf) } // WriteString implements Writer. -func (b *LinkBuffer) WriteString(s string) (n int, err error) { - if len(s) == 0 { - return - } - buf := unsafeStringToSlice(s) - return b.WriteBinary(buf) +func (b *SafeLinkBuffer) WriteString(s string) (n int, err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.WriteString(s) } // WriteBinary implements Writer. -func (b *LinkBuffer) WriteBinary(p []byte) (n int, err error) { +func (b *SafeLinkBuffer) WriteBinary(p []byte) (n int, err error) { b.Lock() defer b.Unlock() - n = len(p) - if n == 0 { - return - } - b.mallocSize += n - - // TODO: Verify that all nocopy is possible under mcache. - if n > BinaryInplaceThreshold { - // expand buffer directly with nocopy - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.write.buf, b.write.malloc = p[:0], n - return n, nil - } - // here will copy - b.growth(n) - buf := b.write.Malloc(n) - return copy(buf, p), nil + return b.UnsafeLinkBuffer.WriteBinary(p) } // WriteDirect cannot be mixed with WriteString or WriteBinary functions. -func (b *LinkBuffer) WriteDirect(p []byte, remainLen int) error { +func (b *SafeLinkBuffer) WriteDirect(p []byte, remainLen int) error { b.Lock() defer b.Unlock() - n := len(p) - if n == 0 || remainLen < 0 { - return nil - } - // find origin - origin := b.flush - malloc := b.mallocSize - remainLen // calculate the remaining malloc length - for t := origin.malloc - len(origin.buf); t < malloc; t = origin.malloc - len(origin.buf) { - malloc -= t - origin = origin.next - } - // Add the buf length of the original node - malloc += len(origin.buf) - - // Create dataNode and newNode and insert them into the chain - dataNode := newLinkBufferNode(0) - dataNode.buf, dataNode.malloc = p[:0], n - - if remainLen > 0 { - newNode := newLinkBufferNode(0) - newNode.off = malloc - newNode.buf = origin.buf[:malloc] - newNode.malloc = origin.malloc - newNode.readonly = false - origin.malloc = malloc - origin.readonly = true - - // link nodes - dataNode.next = newNode - newNode.next = origin.next - origin.next = dataNode - } else { - // link nodes - dataNode.next = origin.next - origin.next = dataNode - } - - // adjust b.write - for b.write.next != nil { - b.write = b.write.next - } - - b.mallocSize += n - return nil + return b.UnsafeLinkBuffer.WriteDirect(p, remainLen) } // WriteByte implements Writer. -func (b *LinkBuffer) WriteByte(p byte) (err error) { - dst, err := b.Malloc(1) - if len(dst) == 1 { - dst[0] = p - } - return err +func (b *SafeLinkBuffer) WriteByte(p byte) (err error) { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.WriteByte(p) } // Close will recycle all buffer. -func (b *LinkBuffer) Close() (err error) { +func (b *SafeLinkBuffer) Close() (err error) { b.Lock() defer b.Unlock() - atomic.StoreInt32(&b.length, 0) - b.mallocSize = 0 - // just release all - b.release() - for node := b.head; node != nil; { - nd := node - node = node.next - nd.Release() - } - b.head, b.read, b.flush, b.write = nil, nil, nil, nil - return nil + return b.UnsafeLinkBuffer.Close() } // ------------------------------------------ implement connection interface ------------------------------------------ -// Bytes returns all the readable bytes of this LinkBuffer. -func (b *LinkBuffer) Bytes() []byte { +// Bytes returns all the readable bytes of this SafeLinkBuffer. +func (b *SafeLinkBuffer) Bytes() []byte { b.Lock() defer b.Unlock() - node, flush := b.read, b.flush - if node == flush { - return node.buf[node.off:] - } - n := 0 - p := make([]byte, b.Len()) - for ; node != flush; node = node.next { - if node.Len() > 0 { - n += copy(p[n:], node.buf[node.off:]) - } - } - n += copy(p[n:], flush.buf[flush.off:]) - return p[:n] + return b.UnsafeLinkBuffer.Bytes() } // GetBytes will read and fill the slice p as much as possible. -// If p is not passed, return all readable bytes. -func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { +func (b *SafeLinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { b.Lock() defer b.Unlock() - node, flush := b.read, b.flush - if len(p) == 0 { - n := 0 - for ; node != flush; node = node.next { - n++ - } - node = b.read - p = make([][]byte, n) - } - var i int - for i = 0; node != flush && i < len(p); node = node.next { - if node.Len() > 0 { - p[i] = node.buf[node.off:] - i++ - } - } - if i < len(p) { - p[i] = flush.buf[flush.off:] - i++ - } - return p[:i] + return b.UnsafeLinkBuffer.GetBytes(p) } // book will grow and malloc buffer to hold data. @@ -632,266 +195,36 @@ func (b *LinkBuffer) GetBytes(p [][]byte) (vs [][]byte) { // maxSize: The maximum size of data between two Release(). In some cases, this can // // guarantee all data allocated in one node to reduce copy. -func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) { +func (b *SafeLinkBuffer) book(bookSize, maxSize int) (p []byte) { b.Lock() defer b.Unlock() - l := cap(b.write.buf) - b.write.malloc - // grow linkBuffer - if l == 0 { - l = maxSize - b.write.next = newLinkBufferNode(maxSize) - b.write = b.write.next - } - if l > bookSize { - l = bookSize - } - return b.write.Malloc(l) + return b.UnsafeLinkBuffer.book(bookSize, maxSize) } // bookAck will ack the first n malloc bytes and discard the rest. // // length: The size of data in inputBuffer. It is used to calculate the maxSize -func (b *LinkBuffer) bookAck(n int) (length int, err error) { +func (b *SafeLinkBuffer) bookAck(n int) (length int, err error) { b.Lock() defer b.Unlock() - b.write.malloc = n + len(b.write.buf) - b.write.buf = b.write.buf[:b.write.malloc] - b.flush = b.write - - // re-cal length - length = b.recalLen(n) - return length, nil + return b.UnsafeLinkBuffer.bookAck(n) } // calcMaxSize will calculate the data size between two Release() -func (b *LinkBuffer) calcMaxSize() (sum int) { - for node := b.head; node != b.read; node = node.next { - sum += len(node.buf) - } - sum += len(b.read.buf) - return sum -} - -func (b *LinkBuffer) indexByte(c byte, skip int) int { +func (b *SafeLinkBuffer) calcMaxSize() (sum int) { b.Lock() defer b.Unlock() - size := b.Len() - if skip >= size { - return -1 - } - var unread, n, l int - node := b.read - for unread = size; unread > 0; unread -= n { - l = node.Len() - if l >= unread { // last node - n = unread - } else { // read full node - n = l - } - - // skip current node - if skip >= n { - skip -= n - node = node.next - continue - } - i := bytes.IndexByte(node.Peek(n)[skip:], c) - if i >= 0 { - return (size - unread) + skip + i // past_read + skip_read + index - } - skip = 0 // no skip bytes - node = node.next - } - return -1 -} - -// resetTail will reset tail node or add an empty tail node to -// guarantee the tail node is not larger than 8KB -func (b *LinkBuffer) resetTail(maxSize int) { - // FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory. - if maxSize <= pagesize { - b.write.Reset() - return - } - - // set nil tail - b.write.next = newLinkBufferNode(0) - b.write = b.write.next - b.flush = b.write - return -} - -// recalLen re-calculate the length -func (b *LinkBuffer) recalLen(delta int) (length int) { - return int(atomic.AddInt32(&b.length, int32(delta))) -} - -// ------------------------------------------ implement link node ------------------------------------------ - -// newLinkBufferNode create or reuse linkBufferNode. -// Nodes with size <= 0 are marked as readonly, which means the node.buf is not allocated by this mcache. -func newLinkBufferNode(size int) *linkBufferNode { - var node = linkedPool.Get().(*linkBufferNode) - // reset node offset - node.off, node.malloc, node.refer, node.readonly = 0, 0, 1, false - if size <= 0 { - node.readonly = true - return node - } - if size < LinkBufferCap { - size = LinkBufferCap - } - node.buf = malloc(0, size) - return node -} - -var linkedPool = sync.Pool{ - New: func() interface{} { - return &linkBufferNode{ - refer: 1, // 自带 1 引用 - } - }, + return b.UnsafeLinkBuffer.calcMaxSize() } -type linkBufferNode struct { - buf []byte // buffer - off int // read-offset - malloc int // write-offset - refer int32 // reference count - readonly bool // read-only node, introduced by Refer, WriteString, WriteBinary, etc., default false - origin *linkBufferNode // the root node of the extends - next *linkBufferNode // the next node of the linked buffer -} - -func (node *linkBufferNode) Len() (l int) { - return len(node.buf) - node.off -} - -func (node *linkBufferNode) IsEmpty() (ok bool) { - return node.off == len(node.buf) -} - -func (node *linkBufferNode) Reset() { - if node.origin != nil || atomic.LoadInt32(&node.refer) != 1 { - return - } - node.off, node.malloc = 0, 0 - node.buf = node.buf[:0] - return -} - -func (node *linkBufferNode) Next(n int) (p []byte) { - off := node.off - node.off += n - return node.buf[off:node.off] -} - -func (node *linkBufferNode) Peek(n int) (p []byte) { - return node.buf[node.off : node.off+n] -} - -func (node *linkBufferNode) Malloc(n int) (buf []byte) { - malloc := node.malloc - node.malloc += n - return node.buf[malloc:node.malloc] -} - -// Refer holds a reference count at the same time as Next, and releases the real buffer after Release. -// The node obtained by Refer is read-only. -func (node *linkBufferNode) Refer(n int) (p *linkBufferNode) { - p = newLinkBufferNode(0) - p.buf = node.Next(n) - - if node.origin != nil { - p.origin = node.origin - } else { - p.origin = node - } - atomic.AddInt32(&p.origin.refer, 1) - return p -} - -// Release consists of two parts: -// 1. reduce the reference count of itself and origin. -// 2. recycle the buf when the reference count is 0. -func (node *linkBufferNode) Release() (err error) { - if node.origin != nil { - node.origin.Release() - } - // release self - if atomic.AddInt32(&node.refer, -1) == 0 { - // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. - if !node.readonly { - free(node.buf) - } - node.buf, node.origin, node.next = nil, nil, nil - linkedPool.Put(node) - } - return nil -} - -// ------------------------------------------ private function ------------------------------------------ - -// growth directly create the next node, when b.write is not enough. -func (b *LinkBuffer) growth(n int) { - if n <= 0 { - return - } - // Must skip read-only node. - for b.write.readonly || cap(b.write.buf)-b.write.malloc < n { - if b.write.next == nil { - b.write.next = newLinkBufferNode(n) - b.write = b.write.next - return - } - b.write = b.write.next - } -} - -// isSingleNode determines whether reading needs to cross nodes. -// Must require b.Len() > 0 -func (b *LinkBuffer) isSingleNode(readN int) (single bool) { - if readN <= 0 { - return true - } - l := b.read.Len() - for l == 0 && b.read != b.flush { - b.read = b.read.next - l = b.read.Len() - } - return l >= readN -} - -// zero-copy slice convert to string -func unsafeSliceToString(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} - -// zero-copy slice convert to string -func unsafeStringToSlice(s string) (b []byte) { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b -} - -// mallocMax is 8MB -const mallocMax = block8k * block1k - -// malloc limits the cap of the buffer from mcache. -func malloc(size, capacity int) []byte { - if capacity > mallocMax { - return make([]byte, size, capacity) - } - return mcache.Malloc(size, capacity) +func (b *SafeLinkBuffer) resetTail(maxSize int) { + b.Lock() + defer b.Unlock() + b.UnsafeLinkBuffer.resetTail(maxSize) } -// free limits the cap of the buffer from mcache. -func free(buf []byte) { - if cap(buf) > mallocMax { - return - } - mcache.Free(buf) +func (b *SafeLinkBuffer) indexByte(c byte, skip int) int { + b.Lock() + defer b.Unlock() + return b.UnsafeLinkBuffer.indexByte(c, skip) } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index a376b7bf..25770762 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -20,6 +20,7 @@ package netpoll import ( "bytes" "fmt" + "runtime" "sync/atomic" "testing" ) @@ -491,6 +492,107 @@ func TestWriteDirect(t *testing.T) { } } +func TestNoCopyWriteAndRead(t *testing.T) { + // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] + const ( + mallocLen = 4096 * 2 + originLen = 4096 + dataLen = 512 + newLen = 16 + normalLen = 4096 + ) + buf := NewLinkBuffer() + bt, _ := buf.Malloc(mallocLen) + originBuf := bt[:originLen] + newBuf := bt[originLen : originLen+newLen] + + // write origin_node + for i := 0; i < originLen; i++ { + bt[i] = 'a' + } + // write data_node + userBuf := make([]byte, dataLen) + for i := 0; i < len(userBuf); i++ { + userBuf[i] = 'b' + } + buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write + // write new_node + for i := 0; i < newLen; i++ { + bt[originLen+i] = 'c' + } + buf.MallocAck(originLen + dataLen + newLen) + buf.Flush() + // write normal_node + normalBuf, _ := buf.Malloc(normalLen) + for i := 0; i < normalLen; i++ { + normalBuf[i] = 'd' + } + buf.Flush() + Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen) + + // copy read origin_node + bt, _ = buf.ReadBinary(originLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'a') + } + MustTrue(t, &bt[0] != &originBuf[0]) + // next read node is data node and must be readonly and non-reusable + MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable()) + // copy read data_node + bt, _ = buf.ReadBinary(dataLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'b') + } + MustTrue(t, &bt[0] != &userBuf[0]) + // copy read new_node + bt, _ = buf.ReadBinary(newLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'c') + } + MustTrue(t, &bt[0] != &newBuf[0]) + // current read node is the new node and must not be reusable + newnode := buf.read + t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask)) + MustTrue(t, newnode.reusable()) + var nodeReleased int32 + runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + // nocopy read normal_node + bt, _ = buf.ReadBinary(normalLen) + for i := 0; i < len(bt); i++ { + MustTrue(t, bt[i] == 'd') + } + MustTrue(t, &bt[0] == &normalBuf[0]) + // normal buffer never should be released + runtime.SetFinalizer(&bt[0], func(_ *byte) { + atomic.AddInt32(&nodeReleased, 1) + }) + _ = buf.Release() + MustTrue(t, newnode.buf == nil) + for atomic.LoadInt32(&nodeReleased) == 0 { + runtime.GC() + t.Log("newnode release check failed") + } + Equal(t, atomic.LoadInt32(&nodeReleased), int32(1)) + runtime.KeepAlive(normalBuf) +} + +func TestBufferMode(t *testing.T) { + bufnode := newLinkBufferNode(0) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + MustTrue(t, bufnode.getMode(readonlyMask)) + MustTrue(t, !bufnode.reusable()) + + bufnode = newLinkBufferNode(1) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + MustTrue(t, !bufnode.getMode(readonlyMask)) + bufnode.setMode(nocopyReadMask, false) + MustTrue(t, !bufnode.getMode(nocopyReadMask)) + bufnode.setMode(nocopyReadMask, true) + MustTrue(t, bufnode.getMode(nocopyReadMask)) +} + func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) { b.StopTimer() @@ -653,3 +755,43 @@ func BenchmarkCopyString(b *testing.B) { } }) } + +func BenchmarkNoCopyRead(b *testing.B) { + totalSize := 0 + minSize := 32 + maxSize := minSize << 9 + for size := minSize; size <= maxSize; size = size << 1 { + totalSize += size + } + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + var buffer = NewLinkBuffer(pagesize) + for pb.Next() { + buf, err := buffer.Malloc(totalSize) + if len(buf) != totalSize || err != nil { + b.Fatal(err) + } + err = buffer.MallocAck(totalSize) + if err != nil { + b.Fatal(err) + } + err = buffer.Flush() + if err != nil { + b.Fatal(err) + } + + for size := minSize; size <= maxSize; size = size << 1 { + buf, err = buffer.ReadBinary(size) + if len(buf) != size || err != nil { + b.Fatal(err) + } + } + // buffer.Release will not reuse memory since we use no copy mode here + err = buffer.Release() + if err != nil { + b.Fatal(err) + } + } + }) +} From 67865906a4ef908acf0323969d59688916e59173 Mon Sep 17 00:00:00 2001 From: hakusai22 Date: Mon, 6 May 2024 11:05:28 +0800 Subject: [PATCH 04/11] fix: _typos.toml and some typos (#327) --- _typos.toml | 10 ++++++++++ connection_test.go | 4 ++-- net_dialer.go | 2 +- netpoll_test.go | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 _typos.toml diff --git a/_typos.toml b/_typos.toml new file mode 100644 index 00000000..b4b3d5bd --- /dev/null +++ b/_typos.toml @@ -0,0 +1,10 @@ +# Typo check: https://github.com/crate-ci/typos + +[files] +extend-exclude = ["go.sum"] + +[default.extend-identifiers] +# *sigh* this just isn't worth the cost of fixing +nd = "nd" +paniced = "paniced" +write_datas = "write_datas" diff --git a/connection_test.go b/connection_test.go index 548d98a2..f79b2484 100644 --- a/connection_test.go +++ b/connection_test.go @@ -601,7 +601,7 @@ func TestConnectionServerClose(t *testing.T) { go func() { err := el.Serve(ln) if err != nil { - t.Logf("servce end with error: %v", err) + t.Logf("service end with error: %v", err) } }() @@ -658,7 +658,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) { go func() { err := el.Serve(ln) if err != nil { - t.Logf("servce end with error: %v", err) + t.Logf("service end with error: %v", err) } }() diff --git a/net_dialer.go b/net_dialer.go index 4c4e8dd2..f39245ff 100644 --- a/net_dialer.go +++ b/net_dialer.go @@ -54,7 +54,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration) switch network { case "tcp", "tcp4", "tcp6": return d.dialTCP(ctx, network, address) - // case "udp", "udp4", "udp6": // TODO: unsupport now + // case "udp", "udp4", "udp6": // TODO: unsupported now case "unix", "unixgram", "unixpacket": raddr := &UnixAddr{ UnixAddr: net.UnixAddr{Name: address, Net: network}, diff --git a/netpoll_test.go b/netpoll_test.go index b01f0a95..843be8cd 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -542,7 +542,7 @@ func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { }, WithOnConnect(func(ctx context.Context, connection Connection) context.Context { atomic.AddInt32(&connected, 1) - t.Logf("Conn[%s] accpeted", connection.RemoteAddr()) + t.Logf("Conn[%s] accepted", connection.RemoteAddr()) return ctx }), WithOnDisconnect(func(ctx context.Context, connection Connection) { From cd1474871eccfb699d228eadd28d4f8e433a2fdc Mon Sep 17 00:00:00 2001 From: Jayant Date: Tue, 7 May 2024 17:50:07 +0800 Subject: [PATCH 05/11] perf: use dirtmake to reduce memclr cost (#321) --- go.mod | 4 ++-- go.sum | 13 +++++++++---- nocopy.go | 3 ++- nocopy_linkbuffer.go | 12 +++++++----- nocopy_linkbuffer_test.go | 3 +++ 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 2e73daa5..b9a2a4bf 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/cloudwego/netpoll go 1.15 require ( - github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 - golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe + github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 ) diff --git a/go.sum b/go.sum index 32a454e1..49445cde 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 h1:PtwsQyQJGxf8iaPptPNaduEIu9BnrNms+pcRdHAxZaM= -github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7/go.mod h1:2ZlV9BaUH4+NXIBF0aMdKKAnHTzqH+iMU4KUjAbL23Q= +github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY= +github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -7,9 +7,14 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe h1:W8vbETX/n8S6EmY0Pu4Ix7VvpsJUESTwl0oCK8MJOgk= -golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/nocopy.go b/nocopy.go index 6df02e8f..53acab2c 100644 --- a/nocopy.go +++ b/nocopy.go @@ -19,6 +19,7 @@ import ( "reflect" "unsafe" + "github.com/bytedance/gopkg/lang/dirtmake" "github.com/bytedance/gopkg/lang/mcache" ) @@ -285,7 +286,7 @@ func unsafeStringToSlice(s string) (b []byte) { // malloc limits the cap of the buffer from mcache. func malloc(size, capacity int) []byte { if capacity > mallocMax { - return make([]byte, size, capacity) + return dirtmake.Bytes(size, capacity) } return mcache.Malloc(size, capacity) } diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index cb48cad1..2f4f3364 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -20,6 +20,8 @@ import ( "fmt" "sync" "sync/atomic" + + "github.com/bytedance/gopkg/lang/dirtmake" ) // BinaryInplaceThreshold marks the minimum value of the nocopy slice length, @@ -91,7 +93,7 @@ func (b *UnsafeLinkBuffer) Next(n int) (p []byte, err error) { p = malloc(n, n) b.caches = append(b.caches, p) } else { - p = make([]byte, n) + p = dirtmake.Bytes(n, n) } var l int for ack := n; ack > 0; ack = ack - l { @@ -128,7 +130,7 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { p = malloc(n, n) b.caches = append(b.caches, p) } else { - p = make([]byte, n) + p = dirtmake.Bytes(n, n) } var node = b.read var l int @@ -232,11 +234,11 @@ func (b *UnsafeLinkBuffer) readBinary(n int) (p []byte) { } } // if the underlying buffer too large, we shouldn't use no-copy mode - p = make([]byte, n) + p = dirtmake.Bytes(n, n) copy(p, b.read.Next(n)) return p } - p = make([]byte, n) + p = dirtmake.Bytes(n, n) // multiple nodes var pIdx int var l int @@ -560,7 +562,7 @@ func (b *UnsafeLinkBuffer) Bytes() []byte { return node.buf[node.off:] } n := 0 - p := make([]byte, b.Len()) + p := dirtmake.Bytes(b.Len(), b.Len()) for ; node != flush; node = node.next { if node.Len() > 0 { n += copy(p[n:], node.buf[node.off:]) diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 25770762..8a9f8cfe 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -668,9 +668,12 @@ func TestLinkBufferIndexByte(t *testing.T) { trigger := make(chan struct{}, 16) lb := NewLinkBuffer() + empty := make([]byte, 1002) go func() { for i := 0; i < loopSize; i++ { buf, err := lb.Malloc(1002) + // need clear buffer + copy(buf, empty) buf[500] = '\n' buf[1001] = '\n' MustNil(t, err) From 37e4d10ba7120db89eed1829b64ee77545216061 Mon Sep 17 00:00:00 2001 From: Joway Date: Thu, 9 May 2024 14:22:17 +0800 Subject: [PATCH 06/11] chore: replace external dns server to private tcp server (#329) --- poll_default_linux_test.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 072963d7..58517d47 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -18,6 +18,7 @@ package netpoll import ( + "context" "errors" "syscall" "testing" @@ -239,6 +240,18 @@ func TestEpollETDel(t *testing.T) { } func TestEpollConnectSameFD(t *testing.T) { + addr := syscall.SockaddrInet4{ + Port: 12345, + Addr: [4]byte{127, 0, 0, 1}, + } + var loop = newTestEventLoop("tcp", "127.0.0.1:12345", + func(ctx context.Context, connection Connection) error { + _, err := connection.Reader().Next(connection.Reader().Len()) + return err + }, + ) + defer loop.Shutdown(context.Background()) + var epollfd, err = EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) @@ -257,10 +270,6 @@ func TestEpollConnectSameFD(t *testing.T) { events: syscall.EPOLLIN | syscall.EPOLLRDHUP | syscall.EPOLLERR, data: eventdata1, } - addr := syscall.SockaddrInet4{ - Port: 53, - Addr: [4]byte{8, 8, 8, 8}, - } // connect non-block socket fd1, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP) @@ -271,12 +280,12 @@ func TestEpollConnectSameFD(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd1, event1) MustNil(t, err) err = syscall.Connect(fd1, &addr) - t.Log(err) + t.Log(err) // EINPROGRESS _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) Assert(t, events[0].events&syscall.EPOLLOUT != 0) - //Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) - //Assert(t, events[0].events&syscall.EPOLLERR == 0) + Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) + Assert(t, events[0].events&syscall.EPOLLERR == 0) // forget to del fd //err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) //MustNil(t, err) @@ -292,7 +301,7 @@ func TestEpollConnectSameFD(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd2, event2) MustNil(t, err) err = syscall.Connect(fd2, &addr) - t.Log(err) + t.Log(err) // EINPROGRESS _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) Assert(t, events[0].events&syscall.EPOLLOUT != 0) @@ -313,7 +322,7 @@ func TestEpollConnectSameFD(t *testing.T) { err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, fd3, event1) MustNil(t, err) err = syscall.Connect(fd3, &addr) - t.Log(err) + t.Log(err) // EINPROGRESS _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) Assert(t, events[0].events&syscall.EPOLLOUT != 0) From a9a224c3e494b10cc15e08a55aa1a4de7f7e6593 Mon Sep 17 00:00:00 2001 From: Liu <46311996+gh-liu@users.noreply.github.com> Date: Thu, 16 May 2024 11:00:22 +0800 Subject: [PATCH 07/11] docs: clearer doc for linkBufferNode mode (#331) --- nocopy.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nocopy.go b/nocopy.go index 53acab2c..ad74634a 100644 --- a/nocopy.go +++ b/nocopy.go @@ -261,10 +261,12 @@ const ( minReuseBytes = 64 // only reuse bytes if n >= minReuseBytes defaultLinkBufferMode = 0 - // readonly mode indicate that the buffer node memory is not controlled by itself, - // so we cannot reuse the buffer or nocopy read it, default value is false. + // readonlyMask is used to set readonly mode, + // which indicate that the buffer node memory is not controlled by itself, + // so we cannot reuse the buffer or nocopy read it. readonlyMask uint8 = 1 << 0 // 0000 0001 - // nocopyRead mode indicate that the buffer node has been no copy read and cannot reuse the buffer, default value is false. + // readonlyMask is used to set nocopyRead mode, + // which indicate that the buffer node has been no copy read and cannot reuse the buffer. nocopyReadMask uint8 = 1 << 1 // 0000 0010 ) From 3e411b105a0b7672cda9c0e3ca682b9d8e5b5f8c Mon Sep 17 00:00:00 2001 From: Xin ZF <43563921+TremblingV5@users.noreply.github.com> Date: Wed, 29 May 2024 17:13:07 +0800 Subject: [PATCH 08/11] fix: return a correct error when failed to access concurrent connections (#332) Co-authored-by: TremblingV5 --- connection_errors.go | 17 ++++++++++------- connection_impl.go | 13 +++++++++++-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/connection_errors.go b/connection_errors.go index 1edfa21d..f14070ab 100644 --- a/connection_errors.go +++ b/connection_errors.go @@ -36,6 +36,8 @@ const ( ErrEOF = syscall.Errno(0x106) // Write I/O buffer timeout, calling by Connection.Writer ErrWriteTimeout = syscall.Errno(0x107) + // Concurrent connection access error + ErrConcurrentAccess = syscall.Errno(0x108) ) const ErrnoMask = 0xFF @@ -110,11 +112,12 @@ func (e *exception) Temporary() bool { // Errors defined in netpoll var errnos = [...]string{ - ErrnoMask & ErrConnClosed: "connection has been closed", - ErrnoMask & ErrReadTimeout: "connection read timeout", - ErrnoMask & ErrDialTimeout: "dial wait timeout", - ErrnoMask & ErrDialNoDeadline: "dial no deadline", - ErrnoMask & ErrUnsupported: "netpoll dose not support", - ErrnoMask & ErrEOF: "EOF", - ErrnoMask & ErrWriteTimeout: "connection write timeout", + ErrnoMask & ErrConnClosed: "connection has been closed", + ErrnoMask & ErrReadTimeout: "connection read timeout", + ErrnoMask & ErrDialTimeout: "dial wait timeout", + ErrnoMask & ErrDialNoDeadline: "dial no deadline", + ErrnoMask & ErrUnsupported: "netpoll does not support", + ErrnoMask & ErrEOF: "EOF", + ErrnoMask & ErrWriteTimeout: "connection write timeout", + ErrnoMask & ErrConcurrentAccess: "concurrent connection access", } diff --git a/connection_impl.go b/connection_impl.go index b683b4df..bb2609b6 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -220,10 +220,15 @@ func (c *connection) MallocLen() (length int) { // If empty, it will call syscall.Write to send data directly, // otherwise the buffer will be sent asynchronously by the epoll trigger. func (c *connection) Flush() error { - if !c.IsActive() || !c.lock(flushing) { + if !c.IsActive() { return Exception(ErrConnClosed, "when flush") } + + if !c.lock(flushing) { + return Exception(ErrConcurrentAccess, "when flush") + } defer c.unlock(flushing) + c.outputBuffer.Flush() return c.flush() } @@ -282,9 +287,13 @@ func (c *connection) Read(p []byte) (n int, err error) { // Write will Flush soon. func (c *connection) Write(p []byte) (n int, err error) { - if !c.IsActive() || !c.lock(flushing) { + if !c.IsActive() { return 0, Exception(ErrConnClosed, "when write") } + + if !c.lock(flushing) { + return 0, Exception(ErrConcurrentAccess, "when write") + } defer c.unlock(flushing) dst, _ := c.outputBuffer.Malloc(len(p)) From c4ec256b49d696ecf9092db2833cce78571f640b Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Fri, 7 Jun 2024 14:28:21 +0800 Subject: [PATCH 09/11] fix: Peek OOM and performance issue (#335) --- nocopy_linkbuffer.go | 76 ++++++++++++++++++++++++++++----------- nocopy_linkbuffer_test.go | 52 +++++++++++++++++++++------ 2 files changed, 97 insertions(+), 31 deletions(-) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 2f4f3364..bfa80d38 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -56,7 +56,12 @@ type UnsafeLinkBuffer struct { flush *linkBufferNode // malloc head write *linkBufferNode // malloc tail - caches [][]byte // buf allocated by Next when cross-package, which should be freed when release + // buf allocated by Next when cross-package, which should be freed when release + caches [][]byte + + // for `Peek` only, avoid creating too many []byte in `caches` + // fix the issue when we have a large buffer and we call `Peek` multiple times + cachePeek []byte } // Len implements Reader. @@ -124,28 +129,50 @@ func (b *UnsafeLinkBuffer) Peek(n int) (p []byte, err error) { if b.isSingleNode(n) { return b.read.Peek(n), nil } + // multiple nodes - var pIdx int - if block1k < n && n <= mallocMax { - p = malloc(n, n) - b.caches = append(b.caches, p) - } else { - p = dirtmake.Bytes(n, n) - } - var node = b.read - var l int - for ack := n; ack > 0; ack = ack - l { - l = node.Len() - if l >= ack { - pIdx += copy(p[pIdx:], node.Peek(ack)) - break - } else if l > 0 { - pIdx += copy(p[pIdx:], node.Peek(l)) + + // try to make use of the cap of b.cachePeek, if can't, free it. + if b.cachePeek != nil && cap(b.cachePeek) < n { + free(b.cachePeek) + b.cachePeek = nil + } + if b.cachePeek == nil { + b.cachePeek = malloc(0, n) // init with zero len, will append later + } + p = b.cachePeek + if len(p) >= n { + // in case we peek smaller than last time, + // we can return cache data directly. + // we will reset cachePeek when Next or Skip, no worries about stale data + return p[:n], nil + } + + // How it works >>>>>> + // [ -------- node0 -------- ][ --------- node1 --------- ] <- b.read + // [ --------------- p --------------- ] + // ^ len(p) ^ n here + // ^ scanned + // `scanned` var is the len of last nodes which we scanned and already copied to p + // `len(p) - scanned` is the start pos of current node for p to copy from + // `n - len(p)` is the len of bytes we're going to append to p + // we copy `len(node1)` - `len(p) - scanned` bytes in case node1 doesn't have enough data + for scanned, node := 0, b.read; len(p) < n; node = node.next { + l := node.Len() + if scanned+l <= len(p) { // already copied in p, skip + scanned += l + continue } - node = node.next + start := len(p) - scanned // `start` must be smaller than l coz `scanned+l <= len(p)` is false + copyn := n - len(p) + if nodeLeftN := l - start; copyn > nodeLeftN { + copyn = nodeLeftN + } + p = append(p, node.Peek(l)[start:start+copyn]...) + scanned += l } - _ = pIdx - return p, nil + b.cachePeek = p + return p[:n], nil } // Skip implements Reader. @@ -187,6 +214,10 @@ func (b *UnsafeLinkBuffer) Release() (err error) { b.caches[i] = nil } b.caches = b.caches[:0] + if b.cachePeek != nil { + free(b.cachePeek) + b.cachePeek = nil + } return nil } @@ -692,6 +723,11 @@ func (b *UnsafeLinkBuffer) indexByte(c byte, skip int) int { // recalLen re-calculate the length func (b *UnsafeLinkBuffer) recalLen(delta int) (length int) { + if delta < 0 && len(b.cachePeek) > 0 { + // b.cachePeek will contain stale data if we read out even a single byte from buffer, + // so we need to reset it or the next Peek call will return invalid bytes. + b.cachePeek = b.cachePeek[:0] + } return int(atomic.AddInt64(&b.length, int64(delta))) } diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 8a9f8cfe..8abbebe0 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -85,7 +85,7 @@ func TestLinkBuffer(t *testing.T) { Equal(t, buf.Len(), 100) } -func TestGetBytes(t *testing.T) { +func TestLinkBufferGetBytes(t *testing.T) { buf := NewLinkBuffer() var ( num = 10 @@ -195,8 +195,7 @@ func TestLinkBufferWithInvalid(t *testing.T) { } } -// cross-block operation test -func TestLinkBufferIndex(t *testing.T) { +func TestLinkBufferMultiNode(t *testing.T) { // clean & new LinkBufferCap = 8 @@ -206,6 +205,9 @@ func TestLinkBufferIndex(t *testing.T) { var p []byte p, _ = buf.Malloc(15) + for i := 0; i < len(p); i++ { // updates p[0] - p[14] to 0 - 14 + p[i] = byte(i) + } Equal(t, len(p), 15) MustTrue(t, buf.read == buf.flush) Equal(t, buf.read.off, 0) @@ -215,6 +217,9 @@ func TestLinkBufferIndex(t *testing.T) { Equal(t, cap(buf.write.buf), 16) // mcache up-aligned to the power of 2 p, _ = buf.Malloc(7) + for i := 0; i < len(p); i++ { // updates p[0] - p[6] to 15 - 21 + p[i] = byte(i + 15) + } Equal(t, len(p), 7) MustTrue(t, buf.read == buf.flush) Equal(t, buf.read.off, 0) @@ -236,19 +241,44 @@ func TestLinkBufferIndex(t *testing.T) { p, _ = buf.Next(13) Equal(t, len(p), 13) + Equal(t, p[0], byte(0)) + Equal(t, p[12], byte(12)) MustTrue(t, buf.read != buf.flush) Equal(t, buf.read.off, 13) Equal(t, buf.read.Len(), 2) + Equal(t, buf.read.next.Len(), 7) Equal(t, buf.flush.off, 0) Equal(t, buf.flush.malloc, 7) + // Peek p, _ = buf.Peek(4) Equal(t, len(p), 4) + Equal(t, p[0], byte(13)) + Equal(t, p[1], byte(14)) + Equal(t, p[2], byte(15)) + Equal(t, p[3], byte(16)) + Equal(t, len(buf.cachePeek), 4) + p, _ = buf.Peek(3) // case: smaller than the last call + Equal(t, len(p), 3) + Equal(t, p[0], byte(13)) + Equal(t, p[2], byte(15)) + Equal(t, len(buf.cachePeek), 4) + p, _ = buf.Peek(5) // case: Peek than the max call, and cap(buf.cachePeek) < n + Equal(t, len(p), 5) + Equal(t, p[0], byte(13)) + Equal(t, p[4], byte(17)) + Equal(t, len(buf.cachePeek), 5) + p, _ = buf.Peek(6) // case: Peek than the last call, and cap(buf.cachePeek) > n + Equal(t, len(p), 6) + Equal(t, p[0], byte(13)) + Equal(t, p[5], byte(18)) + Equal(t, len(buf.cachePeek), 6) MustTrue(t, buf.read != buf.flush) Equal(t, buf.read.off, 13) Equal(t, buf.read.Len(), 2) Equal(t, buf.flush.off, 0) Equal(t, buf.flush.malloc, 7) + // Peek ends buf.book(block8k, block8k) MustTrue(t, buf.flush == buf.write) @@ -377,7 +407,7 @@ func TestLinkBufferResetTail(t *testing.T) { Equal(t, got, except) } -func TestWriteBuffer(t *testing.T) { +func TestLinkBufferWriteBuffer(t *testing.T) { buf1 := NewLinkBuffer() buf2 := NewLinkBuffer() b2, _ := buf2.Malloc(1) @@ -414,7 +444,7 @@ func TestLinkBufferCheckSingleNode(t *testing.T) { buf.isSingleNode(1) } -func TestWriteMultiFlush(t *testing.T) { +func TestLinkBufferWriteMultiFlush(t *testing.T) { buf := NewLinkBuffer() b1, _ := buf.Malloc(4) b1[0] = 1 @@ -444,7 +474,7 @@ func TestWriteMultiFlush(t *testing.T) { MustTrue(t, len(buf.Bytes()) == 4) } -func TestWriteBinary(t *testing.T) { +func TestLinkBufferWriteBinary(t *testing.T) { // clean & new LinkBufferCap = 8 @@ -465,7 +495,7 @@ func TestWriteBinary(t *testing.T) { Equal(t, b[9], byte(0)) } -func TestWriteDirect(t *testing.T) { +func TestLinkBufferWriteDirect(t *testing.T) { // clean & new LinkBufferCap = 32 @@ -492,7 +522,7 @@ func TestWriteDirect(t *testing.T) { } } -func TestNoCopyWriteAndRead(t *testing.T) { +func TestLinkBufferNoCopyWriteAndRead(t *testing.T) { // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B] const ( mallocLen = 4096 * 2 @@ -578,7 +608,7 @@ func TestNoCopyWriteAndRead(t *testing.T) { runtime.KeepAlive(normalBuf) } -func TestBufferMode(t *testing.T) { +func TestLinkBufferBufferMode(t *testing.T) { bufnode := newLinkBufferNode(0) MustTrue(t, !bufnode.getMode(nocopyReadMask)) MustTrue(t, bufnode.getMode(readonlyMask)) @@ -726,7 +756,7 @@ func BenchmarkStringToCopy(b *testing.B) { _ = bs } -func BenchmarkPoolGet(b *testing.B) { +func BenchmarkLinkBufferPoolGet(b *testing.B) { var v *linkBufferNode if false { b.Logf("bs = %v", v) @@ -759,7 +789,7 @@ func BenchmarkCopyString(b *testing.B) { }) } -func BenchmarkNoCopyRead(b *testing.B) { +func BenchmarkLinkBufferNoCopyRead(b *testing.B) { totalSize := 0 minSize := 32 maxSize := minSize << 9 From b383e39c4a22d89f627f7e8b347e7b01cfa1850a Mon Sep 17 00:00:00 2001 From: Joway Date: Mon, 10 Jun 2024 13:44:06 +0800 Subject: [PATCH 10/11] chore: use 127.0.0.1 and unique port for all test listener (#336) --- connection_test.go | 30 ++++++++++++++++++------------ mux/shard_queue_test.go | 4 ++-- net_dialer.go | 2 +- net_dialer_test.go | 31 ++++++++++++++++++------------- net_listener_test.go | 2 +- net_polldesc_test.go | 5 +++-- netpoll_test.go | 32 ++++++++++++++++++++------------ 7 files changed, 63 insertions(+), 43 deletions(-) diff --git a/connection_test.go b/connection_test.go index f79b2484..890c3239 100644 --- a/connection_test.go +++ b/connection_test.go @@ -212,7 +212,8 @@ func writeAll(fd int, buf []byte) error { // Large packet write test. The socket buffer is 2MB by default, here to verify // whether Connection.Close can be executed normally after socket output buffer is full. func TestLargeBufferWrite(t *testing.T) { - ln, err := createTestListener("tcp", ":12345") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) trigger := make(chan int) @@ -231,7 +232,7 @@ func TestLargeBufferWrite(t *testing.T) { } }() - conn, err := DialConnection("tcp", ":12345", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) rfd := <-trigger @@ -267,7 +268,8 @@ func TestLargeBufferWrite(t *testing.T) { } func TestWriteTimeout(t *testing.T) { - ln, err := createTestListener("tcp", ":1234") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) interval := time.Millisecond * 100 @@ -296,7 +298,7 @@ func TestWriteTimeout(t *testing.T) { } }() - conn, err := DialConnection("tcp", ":1234", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) _, err = conn.Writer().Malloc(1024) @@ -440,7 +442,8 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) { } func TestConnDetach(t *testing.T) { - ln, err := createTestListener("tcp", ":1234") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) go func() { @@ -470,7 +473,7 @@ func TestConnDetach(t *testing.T) { } }() - c, err := DialConnection("tcp", ":1234", time.Second) + c, err := DialConnection("tcp", address, time.Second) MustNil(t, err) conn := c.(*TCPConnection) @@ -497,7 +500,8 @@ func TestConnDetach(t *testing.T) { } func TestParallelShortConnection(t *testing.T) { - ln, err := createTestListener("tcp", ":12345") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) defer ln.Close() @@ -523,7 +527,7 @@ func TestParallelShortConnection(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - conn, err := DialConnection("tcp", ":12345", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) n, err := conn.Writer().WriteBinary(make([]byte, sizePerConn)) MustNil(t, err) @@ -546,7 +550,8 @@ func TestParallelShortConnection(t *testing.T) { } func TestConnectionServerClose(t *testing.T) { - ln, err := createTestListener("tcp", ":12345") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) defer ln.Close() @@ -628,7 +633,7 @@ func TestConnectionServerClose(t *testing.T) { wg.Add(conns * 6) for i := 0; i < conns; i++ { go func() { - conn, err := DialConnection("tcp", ":12345", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) @@ -644,7 +649,8 @@ func TestConnectionServerClose(t *testing.T) { } func TestConnectionDailTimeoutAndClose(t *testing.T) { - ln, err := createTestListener("tcp", ":12345") + address := getTestAddress() + ln, err := createTestListener("tcp", address) MustNil(t, err) defer ln.Close() @@ -670,7 +676,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) { for i := 0; i < conns; i++ { go func() { defer wg.Done() - conn, err := DialConnection("tcp", ":12345", time.Nanosecond) + conn, err := DialConnection("tcp", address, time.Nanosecond) Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout")) _ = conn }() diff --git a/mux/shard_queue_test.go b/mux/shard_queue_test.go index 7a595d21..e5384a24 100644 --- a/mux/shard_queue_test.go +++ b/mux/shard_queue_test.go @@ -29,8 +29,8 @@ func TestShardQueue(t *testing.T) { var svrConn net.Conn accepted := make(chan struct{}) - network, address := "tcp", ":18888" - ln, err := net.Listen("tcp", ":18888") + network, address := "tcp", "localhost:12345" + ln, err := net.Listen("tcp", address) MustNil(t, err) stop := make(chan int, 1) defer close(stop) diff --git a/net_dialer.go b/net_dialer.go index f39245ff..85f1b7c6 100644 --- a/net_dialer.go +++ b/net_dialer.go @@ -75,7 +75,7 @@ func (d *dialer) dialTCP(ctx context.Context, network, address string) (connecti return nil, err } var ipaddrs []net.IPAddr - // host maybe empty if address is ":1234" + // host maybe empty if address is :12345 if host == "" { ipaddrs = []net.IPAddr{{}} } else { diff --git a/net_dialer_test.go b/net_dialer_test.go index 7383fd0d..64fa50cf 100644 --- a/net_dialer_test.go +++ b/net_dialer_test.go @@ -31,11 +31,12 @@ import ( func TestDialerTCP(t *testing.T) { dialer := NewDialer() - conn, err := dialer.DialTimeout("tcp", ":1234", time.Second) + address := getTestAddress() + conn, err := dialer.DialTimeout("tcp", address, time.Second) MustTrue(t, err != nil) MustTrue(t, conn.(*TCPConnection) == nil) - ln, err := CreateListener("tcp", ":1234") + ln, err := CreateListener("tcp", address) MustNil(t, err) stop := make(chan int, 1) @@ -57,10 +58,10 @@ func TestDialerTCP(t *testing.T) { } }() - conn, err = dialer.DialTimeout("tcp", ":1234", time.Second) + conn, err = dialer.DialTimeout("tcp", address, time.Second) MustNil(t, err) MustTrue(t, strings.HasPrefix(conn.LocalAddr().String(), "127.0.0.1:")) - Equal(t, conn.RemoteAddr().String(), "127.0.0.1:1234") + Equal(t, conn.RemoteAddr().String(), address) } func TestDialerUnix(t *testing.T) { @@ -106,7 +107,8 @@ func TestDialerUnix(t *testing.T) { } func TestDialerFdAlloc(t *testing.T) { - ln, err := CreateListener("tcp", ":1234") + address := getTestAddress() + ln, err := CreateListener("tcp", address) MustNil(t, err) defer ln.Close() el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error { @@ -121,7 +123,7 @@ func TestDialerFdAlloc(t *testing.T) { defer el1.Shutdown(ctx1) for i := 0; i < 100; i++ { - conn, err := DialConnection("tcp", ":1234", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) fd := conn.(*TCPConnection).fd conn.Write([]byte("hello world")) @@ -134,7 +136,8 @@ func TestDialerFdAlloc(t *testing.T) { } func TestFDClose(t *testing.T) { - ln, err := CreateListener("tcp", ":1234") + address := getTestAddress() + ln, err := CreateListener("tcp", address) MustNil(t, err) defer ln.Close() el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error { @@ -150,13 +153,13 @@ func TestFDClose(t *testing.T) { var fd int var conn Connection - conn, err = DialConnection("tcp", ":1234", time.Second) + conn, err = DialConnection("tcp", address, time.Second) MustNil(t, err) fd = conn.(*TCPConnection).fd syscall.SetNonblock(fd, true) conn.Close() - conn, err = DialConnection("tcp", ":1234", time.Second) + conn, err = DialConnection("tcp", address, time.Second) MustNil(t, err) fd = conn.(*TCPConnection).fd syscall.SetNonblock(fd, true) @@ -166,8 +169,10 @@ func TestFDClose(t *testing.T) { // fd data package race test, use two servers and two dialers. func TestDialerThenClose(t *testing.T) { + address1 := getTestAddress() + address2 := getTestAddress() // server 1 - ln1, _ := createTestListener("tcp", ":1231") + ln1, _ := createTestListener("tcp", address1) el1 := mockDialerEventLoop(1) go func() { el1.Serve(ln1) @@ -177,7 +182,7 @@ func TestDialerThenClose(t *testing.T) { defer el1.Shutdown(ctx1) // server 2 - ln2, _ := createTestListener("tcp", ":1232") + ln2, _ := createTestListener("tcp", address2) el2 := mockDialerEventLoop(2) go func() { el2.Serve(ln2) @@ -194,12 +199,12 @@ func TestDialerThenClose(t *testing.T) { defer wg.Done() for i := 0; i < 50; i++ { // send server 1 - conn, err := DialConnection("tcp", ":1231", time.Second) + conn, err := DialConnection("tcp", address1, time.Second) if err == nil { mockDialerSend(1, &conn.(*TCPConnection).connection) } // send server 2 - conn, err = DialConnection("tcp", ":1232", time.Second) + conn, err = DialConnection("tcp", address2, time.Second) if err == nil { mockDialerSend(2, &conn.(*TCPConnection).connection) } diff --git a/net_listener_test.go b/net_listener_test.go index 9516f8e5..71983028 100644 --- a/net_listener_test.go +++ b/net_listener_test.go @@ -27,7 +27,7 @@ import ( func TestListenerDialer(t *testing.T) { network := "tcp" - addr := ":1234" + addr := getTestAddress() ln, err := CreateListener(network, addr) MustNil(t, err) defer ln.Close() diff --git a/net_polldesc_test.go b/net_polldesc_test.go index 40804b62..9850a39c 100644 --- a/net_polldesc_test.go +++ b/net_polldesc_test.go @@ -27,7 +27,8 @@ func TestZeroTimer(t *testing.T) { } func TestRuntimePoll(t *testing.T) { - ln, err := CreateListener("tcp", ":1234") + address := getTestAddress() + ln, err := CreateListener("tcp", address) MustNil(t, err) stop := make(chan int, 1) @@ -50,7 +51,7 @@ func TestRuntimePoll(t *testing.T) { }() for i := 0; i < 10; i++ { - conn, err := DialConnection("tcp", ":1234", time.Second) + conn, err := DialConnection("tcp", address, time.Second) MustNil(t, err) conn.Close() } diff --git a/netpoll_test.go b/netpoll_test.go index 843be8cd..4156a0b8 100644 --- a/netpoll_test.go +++ b/netpoll_test.go @@ -20,6 +20,7 @@ package netpoll import ( "context" "errors" + "fmt" "math/rand" "os" "runtime" @@ -64,6 +65,13 @@ func Assert(t *testing.T, cond bool, val ...interface{}) { } } +var testPort int32 = 10000 + +// getTestAddress return a unique port for every tests, so all tests will not share a same listerner +func getTestAddress() string { + return fmt.Sprintf("127.0.0.1:%d", atomic.AddInt32(&testPort, 1)) +} + func TestEqual(t *testing.T) { var err error MustNil(t, err) @@ -73,7 +81,7 @@ func TestEqual(t *testing.T) { } func TestOnConnect(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() req, resp := "ping", "pong" var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { @@ -117,7 +125,7 @@ func TestOnConnect(t *testing.T) { } func TestOnConnectWrite(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { return nil @@ -140,7 +148,7 @@ func TestOnConnectWrite(t *testing.T) { func TestOnDisconnect(t *testing.T) { type ctxKey struct{} - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() var canceled, closed int32 var conns int32 = 100 req := "ping" @@ -201,7 +209,7 @@ func TestOnDisconnect(t *testing.T) { func TestOnDisconnectWhenOnConnect(t *testing.T) { type ctxPrepareKey struct{} type ctxConnectKey struct{} - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() var conns int32 = 100 var wg sync.WaitGroup wg.Add(int(conns) * 3) @@ -249,7 +257,7 @@ func TestOnDisconnectWhenOnConnect(t *testing.T) { } func TestGracefulExit(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() // exit without processing connections var eventLoop1 = newTestEventLoop(network, address, @@ -306,7 +314,7 @@ func TestGracefulExit(t *testing.T) { } func TestCloseCallbackWhenOnRequest(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() var requested, closed = make(chan struct{}), make(chan struct{}) var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { @@ -337,7 +345,7 @@ func TestCloseCallbackWhenOnRequest(t *testing.T) { } func TestCloseCallbackWhenOnConnect(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", getTestAddress() var connected, closed = make(chan struct{}), make(chan struct{}) var loop = newTestEventLoop(network, address, nil, @@ -364,7 +372,7 @@ func TestCloseCallbackWhenOnConnect(t *testing.T) { } func TestCloseConnWhenOnConnect(t *testing.T) { - var network, address = "tcp", ":8888" + var network, address = "tcp", "localhost:8888" conns := 10 var wg sync.WaitGroup wg.Add(conns) @@ -399,7 +407,7 @@ func TestCloseConnWhenOnConnect(t *testing.T) { } func TestServerReadAndClose(t *testing.T) { - var network, address = "tcp", ":18888" + var network, address = "tcp", getTestAddress() var sendMsg = []byte("hello") var closed int32 var loop = newTestEventLoop(network, address, @@ -435,7 +443,7 @@ func TestServerReadAndClose(t *testing.T) { } func TestServerPanicAndClose(t *testing.T) { - var network, address = "tcp", ":18888" + var network, address = "tcp", getTestAddress() var sendMsg = []byte("hello") var paniced int32 var loop = newTestEventLoop(network, address, @@ -467,7 +475,7 @@ func TestServerPanicAndClose(t *testing.T) { func TestClientWriteAndClose(t *testing.T) { var ( - network, address = "tcp", ":18889" + network, address = "tcp", getTestAddress() connnum = 10 packetsize, packetnum = 1000 * 5, 1 recvbytes int32 = 0 @@ -531,7 +539,7 @@ func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { MustNil(t, err) }() - var network, address = "tcp", ":18888" + var network, address = "tcp", getTestAddress() var connected int32 var loop = newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { From ae18a62cacc0e5da7f2ed9be29234fce4398acf5 Mon Sep 17 00:00:00 2001 From: Joway Date: Mon, 10 Jun 2024 13:56:54 +0800 Subject: [PATCH 11/11] chore: add memory-leak test for cross nodes peek (#337) --- nocopy_linkbuffer.go | 12 ++++++++++ nocopy_linkbuffer_test.go | 48 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index bfa80d38..0dbe5831 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -761,6 +761,18 @@ func (b *UnsafeLinkBuffer) isSingleNode(readN int) (single bool) { return l >= readN } +// memorySize return the real memory size in bytes the LinkBuffer occupied +func (b *LinkBuffer) memorySize() (bytes int) { + for node := b.head; node != nil; node = node.next { + bytes += cap(node.buf) + } + for _, c := range b.caches { + bytes += cap(c) + } + bytes += cap(b.cachePeek) + return bytes +} + // ------------------------------------------ implement link node ------------------------------------------ // newLinkBufferNode create or reuse linkBufferNode. diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 8abbebe0..bf3d2cc1 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -19,6 +19,7 @@ package netpoll import ( "bytes" + "encoding/binary" "fmt" "runtime" "sync/atomic" @@ -724,6 +725,53 @@ func TestLinkBufferIndexByte(t *testing.T) { } } +func TestLinkBufferPeekOutOfMemory(t *testing.T) { + bufCap := 1024 * 8 + bufNodes := 100 + magicN := uint64(2024) + buf := NewLinkBuffer(bufCap) + MustTrue(t, buf.IsEmpty()) + Equal(t, cap(buf.write.buf), bufCap) + Equal(t, buf.memorySize(), bufCap) + + var p []byte + var err error + // write data that cross multi nodes + for n := 0; n < bufNodes; n++ { + p, err = buf.Malloc(bufCap) + MustNil(t, err) + Equal(t, len(p), bufCap) + binary.BigEndian.PutUint64(p, magicN) + } + Equal(t, buf.MallocLen(), bufCap*bufNodes) + buf.Flush() + Equal(t, buf.MallocLen(), 0) + + // peak data that in single node + for i := 0; i < 10; i++ { + p, err = buf.Peek(bufCap) + Equal(t, binary.BigEndian.Uint64(p), magicN) + MustNil(t, err) + Equal(t, len(p), bufCap) + Equal(t, buf.memorySize(), bufCap*bufNodes) + } + + // peak data that cross nodes + memorySize := 0 + for i := 0; i < 1024; i++ { + p, err = buf.Peek(bufCap + 1) + MustNil(t, err) + Equal(t, binary.BigEndian.Uint64(p), magicN) + Equal(t, len(p), bufCap+1) + if memorySize == 0 { + memorySize = buf.memorySize() + t.Logf("after Peek: memorySize=%d", memorySize) + } else { + Equal(t, buf.memorySize(), memorySize) + } + } +} + func BenchmarkStringToSliceByte(b *testing.B) { b.StopTimer() s := "hello world"