Skip to content

Commit

Permalink
refactor(plc4go/spi): clean up interfaces of WorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 21, 2023
1 parent 7efcb36 commit 12596b0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
7 changes: 4 additions & 3 deletions plc4go/spi/RequestTransactionManager.go
Expand Up @@ -21,6 +21,7 @@ package spi

import (
"container/list"
"context"
"fmt"
"runtime"
"sync"
Expand All @@ -47,7 +48,7 @@ type RequestTransaction struct {

/** The initial operation to perform to kick off the request */
operation utils.Runnable
completionFuture *utils.CompletionFuture
completionFuture utils.CompletionFuture

transactionLog zerolog.Logger
}
Expand Down Expand Up @@ -221,11 +222,11 @@ func (t *RequestTransaction) Submit(operation utils.Runnable) {
}

// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
func (t *RequestTransaction) AwaitCompletion() error {
func (t *RequestTransaction) AwaitCompletion(ctx context.Context) error {
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
}
if err := t.completionFuture.AwaitCompletion(); err != nil {
if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
return err
}
stillActive := true
Expand Down
27 changes: 18 additions & 9 deletions plc4go/spi/utils/WorkerPool.go
Expand Up @@ -20,6 +20,7 @@
package utils

import (
"context"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (w *Worker) work() {
} else {
workerLog.Debug().Msgf("Running work item %v", workItem)
workItem.runnable()
workItem.completionFuture.Complete()
workItem.completionFuture.complete()
workerLog.Debug().Msgf("work item %v completed", workItem)
}
default:
Expand All @@ -76,7 +77,7 @@ func (w *Worker) work() {
type WorkItem struct {
workItemId int32
runnable Runnable
completionFuture *CompletionFuture
completionFuture *future
}

func (w *WorkItem) String() string {
Expand Down Expand Up @@ -125,9 +126,9 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
}
}

func (e *Executor) Submit(workItemId int32, runnable Runnable) *CompletionFuture {
func (e *Executor) Submit(workItemId int32, runnable Runnable) CompletionFuture {
log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
completionFuture := &CompletionFuture{}
completionFuture := &future{}
// TODO: add select and timeout if queue is full
e.queue <- WorkItem{
workItemId: workItemId,
Expand Down Expand Up @@ -168,28 +169,36 @@ func (e *Executor) Stop() {
e.running = false
}

type CompletionFuture struct {
type CompletionFuture interface {
AwaitCompletion(ctx context.Context) error
Cancel(interrupt bool, err error)
}

type future struct {
cancelRequested bool
interruptRequested bool
completed bool
errored bool
err error
}

func (f *CompletionFuture) Cancel(interrupt bool, err error) {
func (f *future) Cancel(interrupt bool, err error) {
f.cancelRequested = true
f.interruptRequested = interrupt
f.errored = true
f.err = err
}

func (f *CompletionFuture) Complete() {
func (f *future) complete() {
f.completed = true
}

func (f *CompletionFuture) AwaitCompletion() error {
for !f.completed && !f.errored {
func (f *future) AwaitCompletion(ctx context.Context) error {
for !f.completed && !f.errored && ctx.Err() != nil {
time.Sleep(time.Millisecond * 10)
}
if err := ctx.Err(); err != nil {
return err
}
return f.err
}

0 comments on commit 12596b0

Please sign in to comment.