Skip to content

Commit

Permalink
feat(plc4go/spi): use atomic.Bool for state changes on WorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 21, 2023
1 parent e3030a4 commit 48c398c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion plc4go/spi/RequestTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
var sharedExecutorInstance utils.Executor // shared instance

func init() {
sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(), utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU(), 100, utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance.Start()
}

Expand Down
52 changes: 24 additions & 28 deletions plc4go/spi/utils/WorkerPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"sync"
"sync/atomic"
"time"
)

type Runnable func()

type Worker struct {
id int
shutdown bool
shutdown atomic.Bool
runnable Runnable
interrupted bool
interrupted atomic.Bool
executor *Executor
}

Expand All @@ -43,7 +44,7 @@ func (w *Worker) work() {
if recovered := recover(); recovered != nil {
log.Error().Msgf("Recovering from panic()=%v", recovered)
}
if !w.shutdown {
if !w.shutdown.Load() {
// if we are not in shutdown we continue
w.work()
}
Expand All @@ -53,12 +54,12 @@ func (w *Worker) work() {
workerLog = zerolog.Nop()
}

for !w.shutdown {
for !w.shutdown.Load() {
workerLog.Debug().Msg("Working")
select {
case workItem := <-w.executor.queue:
workerLog.Debug().Msgf("Got work item %v", workItem)
if workItem.completionFuture.cancelRequested || (w.shutdown && w.interrupted) {
if workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
workerLog.Debug().Msg("We need to stop")
// TODO: do we need to complete with a error?
} else {
Expand Down Expand Up @@ -93,27 +94,22 @@ type Executor struct {
traceWorkers bool
}

func NewFixedSizeExecutor(numberOfWorkers int, options ...ExecutorOption) *Executor {
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOption) *Executor {
workers := make([]*Worker, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
workers[i] = &Worker{
id: i,
shutdown: false,
runnable: nil,
interrupted: false,
executor: nil,
id: i,
}
}
executor := &Executor{
queue: make(chan WorkItem, 100),
queue: make(chan WorkItem, queueDepth),
worker: workers,
}
for _, option := range options {
option(executor)
}
for i := 0; i < numberOfWorkers; i++ {
worker := workers[i]
worker.executor = executor
workers[i].executor = executor
}
return executor
}
Expand Down Expand Up @@ -163,8 +159,8 @@ func (e *Executor) Stop() {
close(e.queue)
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
worker.shutdown = true
worker.interrupted = true
worker.shutdown.Store(true)
worker.interrupted.Store(true)
}
e.running = false
}
Expand All @@ -175,30 +171,30 @@ type CompletionFuture interface {
}

type future struct {
cancelRequested bool
interruptRequested bool
completed bool
errored bool
err error
cancelRequested atomic.Bool
interruptRequested atomic.Bool
completed atomic.Bool
errored atomic.Bool
err atomic.Value
}

func (f *future) Cancel(interrupt bool, err error) {
f.cancelRequested = true
f.interruptRequested = interrupt
f.errored = true
f.err = err
f.cancelRequested.Store(true)
f.interruptRequested.Store(interrupt)
f.errored.Store(true)
f.err.Store(err)
}

func (f *future) complete() {
f.completed = true
f.completed.Store(true)
}

func (f *future) AwaitCompletion(ctx context.Context) error {
for !f.completed && !f.errored && ctx.Err() != nil {
for !f.completed.Load() && !f.errored.Load() && ctx.Err() != nil {
time.Sleep(time.Millisecond * 10)
}
if err := ctx.Err(); err != nil {
return err
}
return f.err
return f.err.Load().(error)
}

0 comments on commit 48c398c

Please sign in to comment.