Skip to content
Permalink
Browse files
Imp: add task pool options
  • Loading branch information
AlexStocks committed Jun 9, 2019
1 parent 1991056 commit ac39a3bac521836d6993bc847f48961a8e27b7d3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 42 deletions.
@@ -80,11 +80,7 @@ func newClient(t EndPointType, opts ...ClientOption) *client {
c.ssMap = make(map[Session]struct{}, c.number)

if c.tQPoolSize > 0 {
qLen := c.tQLen
if qLen == 0 {
qLen = defaultTaskQLen
}
c.tQPool = newTaskPool(c.tQPoolSize, qLen)
c.tQPool = newTaskPool(c.taskPoolOptions)
}

return c
@@ -14,18 +14,13 @@ package getty
// Server Options
/////////////////////////////////////////

const (
defaultTaskQLen = 128
)

type ServerOption func(*ServerOptions)

type ServerOptions struct {
addr string

// task pool
tQLen int32
tQPoolSize int32
taskPoolOptions

// websocket
path string
@@ -44,14 +39,21 @@ func WithLocalAddress(addr string) ServerOption {
// @size is the task queue pool size
func WithServerTaskPoolSize(size int32) ServerOption {
return func(o *ServerOptions) {
o.tQPoolSize = size
o.taskPoolOptions.tQPoolSize = size
}
}

// @length is the task queue length
func WithServerTaskQueueLength(length int32) ServerOption {
return func(o *ServerOptions) {
o.tQLen = length
o.taskPoolOptions.tQLen = length
}
}

// @number is the task queue number
func WithServerTaskQueueNumber(number int32) ServerOption {
return func(o *ServerOptions) {
o.taskPoolOptions.tQNumber = number
}
}

@@ -95,8 +97,7 @@ type ClientOptions struct {
reconnectInterval int // reConnect Interval

// task pool
tQLen int32
tQPoolSize int32
taskPoolOptions

// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
@@ -128,14 +129,21 @@ func WithConnectionNumber(num int) ClientOption {
// @size is the task queue pool size
func WithClientTaskPoolSize(size int32) ClientOption {
return func(o *ClientOptions) {
o.tQPoolSize = size
o.taskPoolOptions.tQPoolSize = size
}
}

// @length is the task queue length
func WithClientTaskQueueLength(length int32) ClientOption {
return func(o *ClientOptions) {
o.tQLen = length
o.taskPoolOptions.tQLen = length
}
}

// @number is the task queue number
func WithClientTaskQueueNumber(number int32) ClientOption {
return func(o *ClientOptions) {
o.taskPoolOptions.tQNumber = number
}
}

@@ -68,11 +68,7 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
}

if s.tQPoolSize > 0 {
qLen := s.tQLen
if qLen == 0 {
qLen = defaultTaskQLen
}
s.tQPool = newTaskPool(s.tQPoolSize, qLen)
s.tQPool = newTaskPool(s.taskPoolOptions)
}

return s
@@ -8,6 +8,7 @@ import (

const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
)

// task t
@@ -16,11 +17,35 @@ type task struct {
pkg interface{}
}

type taskPoolOptions struct {
tQLen int32 // task queue length
tQNumber int32 // task queue number
tQPoolSize int32 // task pool size
}

func (o *taskPoolOptions) Validate() {
if o.tQPoolSize == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", o.tQPoolSize))
}

if o.tQLen == 0 {
o.tQLen = defaultTaskQLen
}

if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}

if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}

// task pool: manage task ts
type taskPool struct {
idx uint32
qLen int32 // task queue length
size int32 // task queue pool size
taskPoolOptions

idx uint32 // round robin index
qArray []chan task
wg sync.WaitGroup

@@ -29,35 +54,28 @@ type taskPool struct {
}

// build a task pool
func newTaskPool(poolSize int32, taskQLen int32) *taskPool {
func newTaskPool(opts taskPoolOptions) *taskPool {
opts.Validate()

p := &taskPool{
size: poolSize,
qLen: taskQLen,
qArray: make([]chan task, defaultTaskQNumber),
done: make(chan struct{}),
taskPoolOptions: opts,
qArray: make([]chan task, opts.tQNumber),
done: make(chan struct{}),
}

for i := 0; i < defaultTaskQNumber; i++ {
p.qArray[i] = make(chan task, taskQLen)
for i := int32(0); i < p.tQNumber; i++ {
p.qArray[i] = make(chan task, p.tQLen)
}

return p
}

// start task pool
func (p *taskPool) start() {
if p.size == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", p.size))
}

if p.qLen == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal t queue length %d", p.qLen))
}

for i := int32(0); i < p.size; i++ {
for i := int32(0); i < p.tQPoolSize; i++ {
p.wg.Add(1)
workerID := i
q := p.qArray[workerID%defaultTaskQNumber]
q := p.qArray[workerID%p.tQNumber]
go p.run(int(workerID), q)
}
}

0 comments on commit ac39a3b

Please sign in to comment.