Skip to content

Commit

Permalink
Imp: just treat task pool as a independent role
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Jun 9, 2019
1 parent ac39a3b commit 1e0ac26
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 114 deletions.
12 changes: 0 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type client struct {
newSession NewSessionCallback
ssMap map[Session]struct{}

// task queue pool
tQPool *taskPool

sync.Once
done chan struct{}
wg sync.WaitGroup
Expand All @@ -79,10 +76,6 @@ func newClient(t EndPointType, opts ...ClientOption) *client {

c.ssMap = make(map[Session]struct{}, c.number)

if c.tQPoolSize > 0 {
c.tQPool = newTaskPool(c.taskPoolOptions)
}

return c
}

Expand Down Expand Up @@ -357,7 +350,6 @@ func (c *client) connect() {
}
err = c.newSession(ss)
if err == nil {
ss.(*session).SetTaskPool(c.tQPool)
ss.(*session).run()
c.Lock()
if c.ssMap == nil {
Expand Down Expand Up @@ -429,10 +421,6 @@ func (c *client) stop() {
}
c.ssMap = nil

if c.tQPool != nil {
c.tQPool.close()
c.tQPool = nil
}
c.Unlock()
})
}
Expand Down
91 changes: 49 additions & 42 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

package getty

import (
"fmt"
)

/////////////////////////////////////////
// Server Options
/////////////////////////////////////////
Expand All @@ -19,9 +23,6 @@ type ServerOption func(*ServerOptions)
type ServerOptions struct {
addr string

// task pool
taskPoolOptions

// websocket
path string
cert string
Expand All @@ -36,27 +37,6 @@ func WithLocalAddress(addr string) ServerOption {
}
}

// @size is the task queue pool size
func WithServerTaskPoolSize(size int32) ServerOption {
return func(o *ServerOptions) {
o.taskPoolOptions.tQPoolSize = size
}
}

// @length is the task queue length
func WithServerTaskQueueLength(length int32) ServerOption {
return func(o *ServerOptions) {
o.taskPoolOptions.tQLen = length
}
}

// @number is the task queue number
func WithServerTaskQueueNumber(number int32) ServerOption {
return func(o *ServerOptions) {
o.taskPoolOptions.tQNumber = number
}
}

// @path: websocket request url path
func WithWebsocketServerPath(path string) ServerOption {
return func(o *ServerOptions) {
Expand Down Expand Up @@ -96,9 +76,6 @@ type ClientOptions struct {
number int
reconnectInterval int // reConnect Interval

// task pool
taskPoolOptions

// the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// duration, the hash alg, the len of the private key.
// wss client will use it.
Expand Down Expand Up @@ -126,30 +103,60 @@ func WithConnectionNumber(num int) ClientOption {
}
}

// @size is the task queue pool size
func WithClientTaskPoolSize(size int32) ClientOption {
// @cert is client certificate file. it can be empty.
func WithRootCertificateFile(cert string) ClientOption {
return func(o *ClientOptions) {
o.taskPoolOptions.tQPoolSize = size
o.cert = cert
}
}

// @length is the task queue length
func WithClientTaskQueueLength(length int32) ClientOption {
return func(o *ClientOptions) {
o.taskPoolOptions.tQLen = length
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////

type TaskPoolOptions struct {
tQLen int // task queue length
tQNumber int // task queue number
tQPoolSize int // task pool size
}

func (o *TaskPoolOptions) validate() {
if o.tQPoolSize == 0 {
panic(fmt.Sprintf("[getty][task_pool] illegal pool size %d", o.tQPoolSize))
}

if o.tQLen == 0 {
o.tQLen = defaultTaskQLen
}

if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}

if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}

// @number is the task queue number
func WithClientTaskQueueNumber(number int32) ClientOption {
return func(o *ClientOptions) {
o.taskPoolOptions.tQNumber = number
type TaskPoolOption func(*TaskPoolOptions)

// @size is the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}

// @cert is client certificate file. it can be empty.
func WithRootCertificateFile(cert string) ClientOption {
return func(o *ClientOptions) {
o.cert = cert
// @length is the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}

// @number is the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
}
14 changes: 0 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type server struct {
endPointType EndPointType
server *http.Server // for ws or wss server

// task queue pool
tQPool *taskPool

sync.Once
done chan struct{}
wg sync.WaitGroup
Expand All @@ -67,10 +64,6 @@ func newServer(t EndPointType, opts ...ServerOption) *server {
panic(fmt.Sprintf("@addr:%s", s.addr))
}

if s.tQPoolSize > 0 {
s.tQPool = newTaskPool(s.taskPoolOptions)
}

return s
}

Expand Down Expand Up @@ -137,10 +130,6 @@ func (s *server) stop() {
s.pktListener.Close()
s.pktListener = nil
}
if s.tQPool != nil {
s.tQPool.close()
s.tQPool = nil
}
})
}
}
Expand Down Expand Up @@ -263,7 +252,6 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
continue
}
delay = 0
client.(*session).SetTaskPool(s.tQPool)
client.(*session).run()
}
}()
Expand All @@ -278,7 +266,6 @@ func (s *server) runUDPEventLoop(newSession NewSessionCallback) {
if err := newSession(ss); err != nil {
panic(err.Error())
}
ss.(*session).SetTaskPool(s.tQPool)
ss.(*session).run()
}

Expand Down Expand Up @@ -335,7 +322,6 @@ func (s *wsHandler) serveWSRequest(w http.ResponseWriter, r *http.Request) {
if ss.(*session).maxMsgLen > 0 {
conn.SetReadLimit(int64(ss.(*session).maxMsgLen))
}
ss.(*session).SetTaskPool(s.server.tQPool)
ss.(*session).run()
}

Expand Down
13 changes: 6 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type session struct {
// handle logic
maxMsgLen int32
// task queue
tQPool *taskPool
tPool *TaskPool

// heartbeat
period time.Duration
Expand All @@ -89,7 +89,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
Connection: conn,

maxMsgLen: maxReadBufLen,
tQLen: defaultTaskQLen,

period: period,

Expand Down Expand Up @@ -302,11 +301,11 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
}

// set task pool
func (s *session) SetTaskPool(p *taskPool) {
func (s *session) SetTaskPool(p *TaskPool) {
s.lock.Lock()
defer s.lock.Unlock()

s.tQPool = p
s.tPool = p
}

// set attribute of key @session:key
Expand Down Expand Up @@ -457,7 +456,7 @@ func (s *session) run() {
s.wQ = make(chan interface{}, defaultQLen)
}

if s.rQ == nil && s.tQPool == nil {
if s.rQ == nil && s.tPool == nil {
s.rQ = make(chan interface{}, defaultQLen)
}

Expand Down Expand Up @@ -564,8 +563,8 @@ LOOP:
}

func (s *session) addTask(pkg interface{}) {
if s.tQPool != nil {
s.tQPool.AddTask(task{session: s, pkg: pkg})
if s.tPool != nil {
s.tPool.AddTask(task{session: s, pkg: pkg})
} else {
s.rQ <- pkg
}
Expand Down
Loading

0 comments on commit 1e0ac26

Please sign in to comment.