Skip to content

Commit

Permalink
refactor(plc4go/spi): slight cleanup of pool
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 28, 2023
1 parent 3478e34 commit 3ea774c
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 52 deletions.
16 changes: 2 additions & 14 deletions plc4go/spi/pool/WorkerPool.go
Expand Up @@ -54,12 +54,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
w.lastReceived.Store(time.Time{})
workers[i] = &w
}
_executor := &executor{
queueDepth: queueDepth,
workItems: make(chan workItem, queueDepth),
worker: workers,
log: customLogger,
}
_executor := newExecutor(queueDepth, workers, customLogger)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = _executor
Expand All @@ -69,14 +64,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W

func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
_executor := &dynamicExecutor{
executor: &executor{
workItems: make(chan workItem, queueDepth),
worker: make([]*worker, 0),
log: customLogger,
},
maxNumberOfWorkers: maxNumberOfWorkers,
}
_executor := newDynamicExecutor(queueDepth, maxNumberOfWorkers, customLogger)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
// We spawn one initial worker
w := worker{
Expand Down
7 changes: 7 additions & 0 deletions plc4go/spi/pool/dynamicExecutor.go
Expand Up @@ -44,6 +44,13 @@ type dynamicExecutor struct {
dynamicWorkers sync.WaitGroup
}

func newDynamicExecutor(queueDepth, maxNumberOfWorkers int, log zerolog.Logger) *dynamicExecutor {
return &dynamicExecutor{
executor: newExecutor(queueDepth, make([]*worker, 0), log),
maxNumberOfWorkers: maxNumberOfWorkers,
}
}

func (e *dynamicExecutor) Start() {
e.dynamicStateChange.Lock()
defer e.dynamicStateChange.Unlock()
Expand Down
45 changes: 41 additions & 4 deletions plc4go/spi/pool/dynamicExecutor_test.go
Expand Up @@ -20,12 +20,49 @@
package pool

import (
"github.com/stretchr/testify/assert"
"sync/atomic"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)

func Test_newDynamicExecutor(t *testing.T) {
type args struct {
queueDepth int
maxNumberOfWorkers int
log zerolog.Logger
}
tests := []struct {
name string
args args
want *dynamicExecutor
manipulator func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor)
}{
{
name: "just create it",
want: &dynamicExecutor{
executor: newExecutor(0, make([]*worker, 0), zerolog.Logger{}),
},
manipulator: func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor) {
assert.NotNil(t, got.workItems)
want.workItems = got.workItems
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newDynamicExecutor(tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
want := tt.want
if tt.manipulator != nil {
tt.manipulator(t, want, got)
}
assert.Equalf(t, want, got, "newDynamicExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
})
}
}

func Test_dynamicExecutor_Start(t *testing.T) {
type fields struct {
executor *executor
Expand Down Expand Up @@ -208,9 +245,9 @@ func Test_dynamicExecutor_String(t *testing.T) {
║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║║
║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
║║╔═workItems══╗╔═traceWorkers╗ ║║
║║║0 element(s)║║ b0 false ║ ║║
║║╚════════════╝╚═════════════╝ ║║
║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
Expand Down
9 changes: 8 additions & 1 deletion plc4go/spi/pool/executor.go
Expand Up @@ -34,7 +34,6 @@ type executor struct {
shutdown bool

worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool

Expand All @@ -44,6 +43,14 @@ type executor struct {
log zerolog.Logger `ignore:"true"`
}

func newExecutor(queueDepth int, workers []*worker, log zerolog.Logger) *executor {
return &executor{
workItems: make(chan workItem, queueDepth),
worker: workers,
log: log,
}
}

func (e *executor) isTraceWorkers() bool {
return e.traceWorkers
}
Expand Down
4 changes: 0 additions & 4 deletions plc4go/spi/pool/executor_plc4xgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 46 additions & 26 deletions plc4go/spi/pool/executor_test.go
Expand Up @@ -31,12 +31,44 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_newExecutor(t *testing.T) {
type args struct {
queueDepth int
workers []*worker
log zerolog.Logger
}
tests := []struct {
name string
args args
want *executor
manipulator func(t *testing.T, want *executor, got *executor)
}{
{
name: "just create it",
want: &executor{},
manipulator: func(t *testing.T, want *executor, got *executor) {
assert.NotNil(t, got.workItems)
want.workItems = got.workItems
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newExecutor(tt.args.queueDepth, tt.args.workers, tt.args.log)
want := tt.want
if tt.manipulator != nil {
tt.manipulator(t, want, got)
}
assert.Equalf(t, want, got, "newExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.workers, tt.args.log)
})
}
}

func Test_executor_Close(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
}
Expand All @@ -56,7 +88,6 @@ func Test_executor_Close(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
Expand All @@ -71,7 +102,6 @@ func Test_executor_IsRunning(t *testing.T) {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
}
Expand All @@ -90,7 +120,6 @@ func Test_executor_IsRunning(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
Expand Down Expand Up @@ -311,7 +340,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
}
Expand All @@ -331,7 +359,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
Expand All @@ -346,7 +373,6 @@ func Test_executor_getWorksItems(t *testing.T) {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
log zerolog.Logger
Expand All @@ -366,7 +392,6 @@ func Test_executor_getWorksItems(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: tt.fields.log,
Expand All @@ -381,7 +406,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
}
Expand All @@ -400,7 +424,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
Expand All @@ -415,7 +438,6 @@ func Test_executor_String(t *testing.T) {
running bool
shutdown bool
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
}
Expand All @@ -441,23 +463,22 @@ func Test_executor_String(t *testing.T) {
}(),
},
},
queueDepth: 2,
traceWorkers: true,
},
want: `
╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
║╔═running╗╔═shutdown╗ ║
║║b1 true ║║ b1 true ║ ║
║╚════════╝╚═════════╝ ║
║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║
║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
║╚════════════════════╝╚════════════╝╚═════════════╝
╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
╔═executor═════════════════════════════════════════════════════════════════════════════════════════════════════════════
║╔═running╗╔═shutdown╗
║║b1 true ║║ b1 true ║
║╚════════╝╚═════════╝
║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗╔═workItems══╗
║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║0 element(s)║║
║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║╚════════════╝
║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║
║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝
║╔═traceWorkers╗
║║ b1 true ║
║╚═════════════
╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
Expand All @@ -466,7 +487,6 @@ func Test_executor_String(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
Expand Down
6 changes: 3 additions & 3 deletions plc4go/spi/transactions/RequestTransactionManager_test.go
Expand Up @@ -654,9 +654,9 @@ func Test_requestTransactionManager_String(t *testing.T) {
║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║
║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║
║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
║║╔═workItems══╗╔═traceWorkers╗ ║ ║
║║║0 element(s)║║ b0 false ║ ║ ║
║║╚════════════╝╚═════════════╝ ║ ║
║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
║╔═traceTransactionManagerTransactions╗ ║
║║ b1 true ║ ║
Expand Down

0 comments on commit 3ea774c

Please sign in to comment.