Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose config and features control #349

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ func (c *connection) init(conn Conn, opts *options) (err error) {
// init buffer, barrier, finalizer
c.readTrigger = make(chan error, 1)
c.writeTrigger = make(chan error, 1)
c.bookSize, c.maxSize = pagesize, pagesize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
c.bookSize, c.maxSize = defaultLinkBufferSize, defaultLinkBufferSize
c.inputBuffer, c.outputBuffer = NewLinkBuffer(defaultLinkBufferSize), NewLinkBuffer()
c.outputBarrier = barrierPool.Get().(*barrier)
c.state = 0

Expand Down
85 changes: 76 additions & 9 deletions netpoll_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,75 @@ package netpoll
import (
"context"
"io"
"log"
"os"
"runtime"
"time"
)

var (
pollmanager = newManager(runtime.GOMAXPROCS(0)/20 + 1) // pollmanager manage all pollers
logger = log.New(os.Stderr, "", log.LstdFlags)

// global config
defaultLinkBufferSize = pagesize
featureAlwaysNoCopyRead = false
)

// Config expose some tuning parameters to control the internal behaviors of netpoll.
// Every parameter with the default zero value should keep the default behavior of netpoll.
type Config struct {
PollerNum int // number of pollers
BufferSize int // default size of a new connection's LinkBuffer
Runner func(ctx context.Context, f func()) // runner for event handler, most of the time use a goroutine pool.
LoggerOutput io.Writer // logger output
LoadBalance LoadBalance // load balance for poller picker
Feature // define all features that not enable by default
}

// Feature expose some new features maybe promoted as a default behavior but not yet.
type Feature struct {
// AlwaysNoCopyRead allows some copy Read functions like ReadBinary/ReadString
// will use NoCopy read and will not reuse the underlying buffer.
// It gains more performance benefits when need read much big string/bytes in codec.
AlwaysNoCopyRead bool
}

// Configure the internal behaviors of netpoll.
// Configure must called in init() function, because the poller will read some global variable after init() finished
func Configure(config Config) (err error) {
if config.PollerNum > 0 {
if err = pollmanager.SetNumLoops(config.PollerNum); err != nil {
return err
}
}
if config.BufferSize > 0 {
defaultLinkBufferSize = config.BufferSize
}

if config.Runner != nil {
setRunner(config.Runner)
}
if config.LoggerOutput != nil {
logger = log.New(config.LoggerOutput, "", log.LstdFlags)
}
if config.LoadBalance >= 0 {
if err = pollmanager.SetLoadBalance(config.LoadBalance); err != nil {
return err
}
}

featureAlwaysNoCopyRead = config.AlwaysNoCopyRead
return nil
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
// The first call of Pick() will init pollers
_ = pollmanager.Pick()
}

// SetNumLoops is used to set the number of pollers, generally do not need to actively set.
// By default, the number of pollers is equal to runtime.GOMAXPROCS(0)/20+1.
// If the number of cores in your service process is less than 20c, theoretically only one poller is needed.
Expand All @@ -34,28 +100,28 @@ import (
// func init() {
// netpoll.SetNumLoops(...)
// }
//
// Deprecated: use Configure instead.
func SetNumLoops(numLoops int) error {
return setNumLoops(numLoops)
return pollmanager.SetNumLoops(numLoops)
}

// SetLoadBalance sets the load balancing method. Load balancing is always a best effort to attempt
// to distribute the incoming connections between multiple polls.
// This option only works when numLoops is set.
// Deprecated: use Configure instead.
func SetLoadBalance(lb LoadBalance) error {
return setLoadBalance(lb)
}

// Initialize the pollers actively. By default, it's lazy initialized.
// It's safe to call it multi times.
func Initialize() {
initialize()
return pollmanager.SetLoadBalance(lb)
}

// SetLoggerOutput sets the logger output target.
// Deprecated: use Configure instead.
func SetLoggerOutput(w io.Writer) {
setLoggerOutput(w)
logger = log.New(w, "", log.LstdFlags)
}

// SetRunner set the runner function for every OnRequest/OnConnect callback
// Deprecated: use Configure instead.
func SetRunner(f func(ctx context.Context, f func())) {
setRunner(f)
}
Expand All @@ -65,6 +131,7 @@ func SetRunner(f func(ctx context.Context, f func())) {
// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine.
// But if you can confirm that the OnRequest will not cause stack expansion,
// it is recommended to use DisableGopool to reduce redundancy and improve performance.
// Deprecated: use Configure instead.
func DisableGopool() error {
return disableGopool()
}
Expand Down
26 changes: 13 additions & 13 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,19 @@
// single node
if b.isSingleNode(n) {
// TODO: enable nocopy read mode when ensure no legacy depend on copy-read
//// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
//if !b.read.getMode(readonlyMask) {
// // if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// // for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// // no need to malloc 10 times and the string slice could have the compact memory allocation.
// if b.read.getMode(nocopyReadMask) {
// return b.read.Next(n)
// }
// if n >= minReuseBytes && cap(b.read.buf) <= block32k {
// b.read.setMode(nocopyReadMask, true)
// return b.read.Next(n)
// }
//}
// we cannot nocopy read a readonly mode buffer, since readonly buffer's memory is not control by itself
if !b.read.getMode(readonlyMask) {
// if readBinary use no-copy mode, it will cause more memory used but get higher memory access efficiently
// for example, if user's codec need to decode 10 strings and each have 100 bytes, here could help the codec
// no need to malloc 10 times and the string slice could have the compact memory allocation.
if b.read.getMode(nocopyReadMask) {
return b.read.Next(n)
}
if featureAlwaysNoCopyRead && n >= minReuseBytes {

Check failure on line 263 in nocopy_linkbuffer.go

View workflow job for this annotation

GitHub Actions / windows-test

undefined: featureAlwaysNoCopyRead

Check failure on line 263 in nocopy_linkbuffer.go

View workflow job for this annotation

GitHub Actions / windows-test

undefined: featureAlwaysNoCopyRead
joway marked this conversation as resolved.
Show resolved Hide resolved
b.read.setMode(nocopyReadMask, true)
return b.read.Next(n)
}
}
// if the underlying buffer too large, we shouldn't use no-copy mode
p = dirtmake.Bytes(n, n)
copy(p, b.read.Next(n))
Expand Down
177 changes: 92 additions & 85 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"sync/atomic"
"testing"
)
Expand Down Expand Up @@ -522,91 +523,97 @@ func TestLinkBufferWriteDirect(t *testing.T) {
}
}

//func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
// // [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
// const (
// mallocLen = 4096 * 2
// originLen = 4096
// dataLen = 512
// newLen = 16
// normalLen = 4096
// )
// buf := NewLinkBuffer()
// bt, _ := buf.Malloc(mallocLen)
// originBuf := bt[:originLen]
// newBuf := bt[originLen : originLen+newLen]
//
// // write origin_node
// for i := 0; i < originLen; i++ {
// bt[i] = 'a'
// }
// // write data_node
// userBuf := make([]byte, dataLen)
// for i := 0; i < len(userBuf); i++ {
// userBuf[i] = 'b'
// }
// buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// // write new_node
// for i := 0; i < newLen; i++ {
// bt[originLen+i] = 'c'
// }
// buf.MallocAck(originLen + dataLen + newLen)
// buf.Flush()
// // write normal_node
// normalBuf, _ := buf.Malloc(normalLen)
// for i := 0; i < normalLen; i++ {
// normalBuf[i] = 'd'
// }
// buf.Flush()
// Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)
//
// // copy read origin_node
// bt, _ = buf.ReadBinary(originLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'a')
// }
// MustTrue(t, &bt[0] != &originBuf[0])
// // next read node is data node and must be readonly and non-reusable
// MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// // copy read data_node
// bt, _ = buf.ReadBinary(dataLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'b')
// }
// MustTrue(t, &bt[0] != &userBuf[0])
// // copy read new_node
// bt, _ = buf.ReadBinary(newLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'c')
// }
// MustTrue(t, &bt[0] != &newBuf[0])
// // current read node is the new node and must not be reusable
// newnode := buf.read
// t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
// MustTrue(t, newnode.reusable())
// var nodeReleased int32
// runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// // nocopy read normal_node
// bt, _ = buf.ReadBinary(normalLen)
// for i := 0; i < len(bt); i++ {
// MustTrue(t, bt[i] == 'd')
// }
// MustTrue(t, &bt[0] == &normalBuf[0])
// // normal buffer never should be released
// runtime.SetFinalizer(&bt[0], func(_ *byte) {
// atomic.AddInt32(&nodeReleased, 1)
// })
// _ = buf.Release()
// MustTrue(t, newnode.buf == nil)
// for atomic.LoadInt32(&nodeReleased) == 0 {
// runtime.GC()
// t.Log("newnode release checking")
// }
// Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
// runtime.KeepAlive(normalBuf)
//}
func TestLinkBufferNoCopyWriteAndRead(t *testing.T) {
err := Configure(Config{Feature: Feature{AlwaysNoCopyRead: true}})
MustNil(t, err)
defer func() {
err = Configure(Config{Feature: Feature{AlwaysNoCopyRead: false}})
MustNil(t, err)
}()
// [origin_node:4096B] + [data_node:512B] + [new_node:16B] + [normal_node:4096B]
const (
mallocLen = 4096 * 2
originLen = 4096
dataLen = 512
newLen = 16
normalLen = 4096
)
buf := NewLinkBuffer()
bt, _ := buf.Malloc(mallocLen)
originBuf := bt[:originLen]
newBuf := bt[originLen : originLen+newLen]

// write origin_node
for i := 0; i < originLen; i++ {
bt[i] = 'a'
}
// write data_node
userBuf := make([]byte, dataLen)
for i := 0; i < len(userBuf); i++ {
userBuf[i] = 'b'
}
buf.WriteDirect(userBuf, mallocLen-originLen) // nocopy write
// write new_node
for i := 0; i < newLen; i++ {
bt[originLen+i] = 'c'
}
buf.MallocAck(originLen + dataLen + newLen)
buf.Flush()
// write normal_node
normalBuf, _ := buf.Malloc(normalLen)
for i := 0; i < normalLen; i++ {
normalBuf[i] = 'd'
}
buf.Flush()
Equal(t, buf.Len(), originLen+dataLen+newLen+normalLen)

// copy read origin_node
bt, _ = buf.ReadBinary(originLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'a')
}
MustTrue(t, &bt[0] != &originBuf[0])
// next read node is data node and must be readonly and non-reusable
MustTrue(t, buf.read.next.getMode(readonlyMask) && !buf.read.next.reusable())
// copy read data_node
bt, _ = buf.ReadBinary(dataLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'b')
}
MustTrue(t, &bt[0] != &userBuf[0])
// copy read new_node
bt, _ = buf.ReadBinary(newLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'c')
}
MustTrue(t, &bt[0] != &newBuf[0])
// current read node is the new node and must not be reusable
newnode := buf.read
t.Log("newnode", newnode.getMode(readonlyMask), newnode.getMode(nocopyReadMask))
MustTrue(t, newnode.reusable())
var nodeReleased int32
runtime.SetFinalizer(&newnode.buf[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
// nocopy read normal_node
bt, _ = buf.ReadBinary(normalLen)
for i := 0; i < len(bt); i++ {
MustTrue(t, bt[i] == 'd')
}
MustTrue(t, &bt[0] == &normalBuf[0])
// normal buffer never should be released
runtime.SetFinalizer(&bt[0], func(_ *byte) {
atomic.AddInt32(&nodeReleased, 1)
})
_ = buf.Release()
MustTrue(t, newnode.buf == nil)
for atomic.LoadInt32(&nodeReleased) == 0 {
runtime.GC()
t.Log("newnode release checking")
}
Equal(t, atomic.LoadInt32(&nodeReleased), int32(1))
runtime.KeepAlive(normalBuf)
}

func TestLinkBufferBufferMode(t *testing.T) {
bufnode := newLinkBufferNode(0)
Expand Down
12 changes: 6 additions & 6 deletions poll_loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ import (
type LoadBalance int

const (
// Random requests that connections are randomly distributed.
Random LoadBalance = iota
// RoundRobin requests that connections are distributed to a Poll
// in a round-robin fashion.
RoundRobin
RoundRobin LoadBalance = iota
// Random requests that connections are randomly distributed.
Random
)

// loadbalance sets the load balancing method for []*polls
type loadbalance interface {
LoadBalance() LoadBalance
// Choose the most qualified Poll
// Pick choose the most qualified Poll
Pick() (poll Poll)

Rebalance(polls []Poll)
}

func newLoadbalance(lb LoadBalance, polls []Poll) loadbalance {
switch lb {
case Random:
return newRandomLB(polls)
case RoundRobin:
return newRoundRobinLB(polls)
case Random:
return newRandomLB(polls)
}
return newRoundRobinLB(polls)
}
Expand Down
Loading
Loading