Skip to content

Commit

Permalink
feat(plc4go/spi): initial dynamic executor for worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 21, 2023
1 parent e4e92b0 commit ea79a34
Showing 1 changed file with 87 additions and 17 deletions.
104 changes: 87 additions & 17 deletions plc4go/spi/utils/WorkerPool.go
Expand Up @@ -33,11 +33,21 @@ import (
type Runnable func()

type worker struct {
id int
shutdown atomic.Bool
interrupted atomic.Bool
executor *executor
hasEnded bool
id int
shutdown atomic.Bool
interrupted atomic.Bool
interrupter chan struct{}
executor *executor
hasEnded bool
lastReceived time.Time
}

func (w *worker) initialize() {
w.shutdown.Store(false)
w.interrupted.Store(false)
w.interrupter = make(chan struct{})
w.hasEnded = false
w.lastReceived = time.Now()
}

func (w *worker) work() {
Expand All @@ -60,6 +70,7 @@ func (w *worker) work() {
workerLog.Debug().Msg("Working")
select {
case _workItem := <-w.executor.queue:
w.lastReceived = time.Now()
workerLog.Debug().Msgf("Got work item %v", _workItem)
if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
workerLog.Debug().Msg("We need to stop")
Expand All @@ -70,6 +81,8 @@ func (w *worker) work() {
_workItem.completionFuture.complete()
workerLog.Debug().Msgf("work item %v completed", _workItem)
}
case <-w.interrupter:
log.Debug().Msg("We got interrupted")
default:
workerLog.Debug().Msgf("Idling")
time.Sleep(time.Millisecond * 10)
Expand All @@ -96,12 +109,13 @@ type Executor interface {
}

type executor struct {
running bool
shutdown bool
stateChange sync.Mutex
worker []*worker
queue chan workItem
traceWorkers bool
maxNumberOfWorkers int
running bool
shutdown bool
stateChange sync.Mutex
worker []*worker
queue chan workItem
traceWorkers bool
}

func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
Expand All @@ -111,17 +125,71 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOp
id: i,
}
}
executor := &executor{
queue: make(chan workItem, queueDepth),
worker: workers,
_executor := &executor{
maxNumberOfWorkers: numberOfWorkers,
queue: make(chan workItem, queueDepth),
worker: workers,
}
for _, option := range options {
option(executor)
option(_executor)
}
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = executor
workers[i].executor = _executor
}
return _executor
}

func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
_executor := &executor{
maxNumberOfWorkers: maxNumberOfWorkers,
queue: make(chan workItem, queueDepth),
worker: make([]*worker, 0),
}
for _, option := range options {
option(_executor)
}
return executor
// We spawn one initial worker
_executor.worker = append(_executor.worker, &worker{
id: 0,
executor: _executor,
})
mutex := sync.Mutex{}
// Worker spawner
go func() {
for {
time.Sleep(100 * time.Millisecond)
mutex.Lock()
if len(_executor.queue) > len(_executor.worker) && len(_executor.worker) < maxNumberOfWorkers {
worker := &worker{
id: len(_executor.worker) - 1,
executor: _executor,
}
_executor.worker = append(_executor.worker, worker)
worker.initialize()
go worker.work()
}
mutex.Unlock()
}
}()
// Worker killer
go func() {
for {
time.Sleep(5 * time.Second)
mutex.Lock()
newWorkers := make([]*worker, 0)
for _, worker := range _executor.worker {
if worker.lastReceived.Before(time.Now().Add(-5 * time.Second)) {
worker.interrupted.Store(true)
close(worker.interrupter)
} else {
newWorkers = append(newWorkers, worker)
}
}
_executor.worker = newWorkers
mutex.Unlock()
}
}()
return _executor
}

type ExecutorOption func(*executor)
Expand Down Expand Up @@ -160,6 +228,7 @@ func (e *executor) Start() {
e.shutdown = false
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
worker.initialize()
go worker.work()
}
}
Expand All @@ -176,6 +245,7 @@ func (e *executor) Stop() {
worker := e.worker[i]
worker.shutdown.Store(true)
worker.interrupted.Store(true)
close(worker.interrupter)
}
e.running = false
e.shutdown = false
Expand Down

0 comments on commit ea79a34

Please sign in to comment.