Skip to content

Commit

Permalink
fix(plc4go/spi): fix RequestTransationManager using the wrong executor
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 21, 2023
1 parent 48c398c commit 23f07b6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
3 changes: 1 addition & 2 deletions plc4go/spi/RequestTransactionManager.go
Expand Up @@ -134,8 +134,7 @@ func (r *RequestTransactionManager) processWorklog() {
next := front.Value.(*RequestTransaction)
log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
// TODO: use sharedInstance if none is present
completionFuture := sharedExecutorInstance.Submit(next.transactionId, next.operation)
completionFuture := r.executor.Submit(context.Background(), next.transactionId, next.operation)
next.completionFuture = completionFuture
r.workLog.Remove(front)
}
Expand Down
18 changes: 14 additions & 4 deletions plc4go/spi/utils/WorkerPool.go
Expand Up @@ -122,23 +122,28 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
}
}

func (e *Executor) Submit(workItemId int32, runnable Runnable) CompletionFuture {
func (e *Executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
completionFuture := &future{}
// TODO: add select and timeout if queue is full
e.queue <- WorkItem{
select {
case e.queue <- WorkItem{
workItemId: workItemId,
runnable: runnable,
completionFuture: completionFuture,
}:
log.Trace().Msg("Item added")
case <-ctx.Done():
completionFuture.Cancel(false, ctx.Err())
}

log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
return completionFuture
}

func (e *Executor) Start() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
if e.running {
if e.running || e.shutdown {
return
}
e.running = true
Expand All @@ -163,6 +168,11 @@ func (e *Executor) Stop() {
worker.interrupted.Store(true)
}
e.running = false
e.shutdown = false
}

func (e *Executor) IsRunning() bool {
return e.running && !e.shutdown
}

type CompletionFuture interface {
Expand Down

0 comments on commit 23f07b6

Please sign in to comment.