Skip to content

Commit

Permalink
refactor(plc4go/spi): move worker related code into WorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 21, 2023
1 parent 93341ca commit 23f19db
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 168 deletions.
184 changes: 16 additions & 168 deletions plc4go/spi/RequestTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,184 +22,32 @@ package spi
import (
"container/list"
"fmt"
"runtime"
"sync"
"time"

"github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/apache/plc4x/plc4go/spi/utils"

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"runtime"
"sync"
"time"
)

var sharedExecutorInstance Executor // shared instance
var sharedExecutorInstance utils.Executor // shared instance

func init() {
sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
sharedExecutorInstance.start()
}

type Runnable func()

type Worker struct {
id int
shutdown bool
runnable Runnable
interrupted bool
executor *Executor
}

func (w *Worker) work() {
defer func() {
if recovered := recover(); recovered != nil {
log.Error().Msgf("Recovering from panic()=%v", recovered)
}
if !w.shutdown {
// TODO: if we are not in shutdown we continue
w.work()
}
}()
workerLog := log.With().Int("Worker id", w.id).Logger()
if !config.TraceTransactionManagerWorkers {
workerLog = zerolog.Nop()
}

for !w.shutdown {
workerLog.Debug().Msg("Working")
select {
case workItem := <-w.executor.queue:
workerLog.Debug().Msgf("Got work item %v", workItem)
if workItem.completionFuture.cancelRequested || (w.shutdown && w.interrupted) {
workerLog.Debug().Msg("We need to stop")
// TODO: do we need to complete with a error?
} else {
workerLog.Debug().Msgf("Running work item %v", workItem)
workItem.runnable()
workItem.completionFuture.complete()
workerLog.Debug().Msgf("work item %v completed", workItem)
}
default:
workerLog.Debug().Msgf("Idling")
time.Sleep(time.Millisecond * 10)
}
}
}

type WorkItem struct {
transactionId int32
runnable Runnable
completionFuture *CompletionFuture
}

func (w *WorkItem) String() string {
return fmt.Sprintf("Workitem{tid:%d}", w.transactionId)
}

type Executor struct {
running bool
shutdown bool
stateChange sync.Mutex
worker []*Worker
queue chan WorkItem
}

func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
workers := make([]*Worker, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
workers[i] = &Worker{
id: i,
shutdown: false,
runnable: nil,
interrupted: false,
executor: nil,
}
}
executor := Executor{
queue: make(chan WorkItem, 100),
worker: workers,
}
for i := 0; i < numberOfWorkers; i++ {
worker := workers[i]
worker.executor = &executor
}
return &executor
}

func (e *Executor) submit(transactionId int32, runnable Runnable) *CompletionFuture {
log.Trace().Int32("transactionId", transactionId).Msg("Submitting runnable")
completionFuture := &CompletionFuture{}
// TODO: add select and timeout if queue is full
e.queue <- WorkItem{
transactionId: transactionId,
runnable: runnable,
completionFuture: completionFuture,
}
log.Trace().Int32("transactionId", transactionId).Msg("runnable queued")
return completionFuture
}

func (e *Executor) start() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
if e.running {
return
}
e.running = true
e.shutdown = false
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
go worker.work()
}
}

func (e *Executor) stop() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
if !e.running {
return
}
e.shutdown = true
close(e.queue)
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
worker.shutdown = true
worker.interrupted = true
}
e.running = false
}

type CompletionFuture struct {
cancelRequested bool
interruptRequested bool
completed bool
errored bool
err error
}

func (f *CompletionFuture) cancel(interrupt bool, err error) {
f.cancelRequested = true
f.interruptRequested = interrupt
f.errored = true
f.err = err
}

func (f *CompletionFuture) complete() {
f.completed = true
}

func (f *CompletionFuture) AwaitCompletion() error {
for !f.completed && !f.errored {
time.Sleep(time.Millisecond * 10)
}
return f.err
sharedExecutorInstance = *utils.NewFixedSizeExecutor(runtime.NumCPU())
sharedExecutorInstance.Start()
}

type RequestTransaction struct {
parent *RequestTransactionManager
transactionId int32

/** The initial operation to perform to kick off the request */
operation Runnable
completionFuture *CompletionFuture
operation utils.Runnable
completionFuture *utils.CompletionFuture

transactionLog zerolog.Logger
}
Expand All @@ -219,7 +67,7 @@ type RequestTransactionManager struct {
// Important, this is a FIFO Queue for Fairness!
workLog list.List
workLogMutex sync.RWMutex
executor *Executor
executor *utils.Executor
}

// NewRequestTransactionManager creates a new RequestTransactionManager
Expand All @@ -239,7 +87,7 @@ func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransac
type RequestTransactionManagerOption func(requestTransactionManager *RequestTransactionManager)

// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
func WithCustomExecutor(executor *utils.Executor) RequestTransactionManagerOption {
return func(requestTransactionManager *RequestTransactionManager) {
requestTransactionManager.executor = executor
}
Expand Down Expand Up @@ -286,7 +134,7 @@ func (r *RequestTransactionManager) processWorklog() {
log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
// TODO: use sharedInstance if none is present
completionFuture := sharedExecutorInstance.submit(next.transactionId, next.operation)
completionFuture := sharedExecutorInstance.Submit(next.transactionId, next.operation)
next.completionFuture = completionFuture
r.workLog.Remove(front)
}
Expand Down Expand Up @@ -317,7 +165,7 @@ func (r *RequestTransactionManager) getNumberOfActiveRequests() int {

func (r *RequestTransactionManager) failRequest(transaction *RequestTransaction, err error) error {
// Try to fail it!
transaction.completionFuture.cancel(true, err)
transaction.completionFuture.Cancel(true, err)
// End it
return r.endRequest(transaction)
}
Expand Down Expand Up @@ -359,7 +207,7 @@ func (t *RequestTransaction) EndRequest() error {
}

// Submit submits a Runnable to the RequestTransactionManager
func (t *RequestTransaction) Submit(operation Runnable) {
func (t *RequestTransaction) Submit(operation utils.Runnable) {
if t.operation != nil {
panic("Operation already set")
}
Expand Down
Loading

0 comments on commit 23f19db

Please sign in to comment.