Skip to content

Commit

Permalink
refactor(plc4go/spi): move pool option to other options
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 16, 2023
1 parent c02f8f5 commit ac95770
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 87 deletions.
42 changes: 21 additions & 21 deletions plc4go/internal/cbus/CBusMessageMapper_test.go
Expand Up @@ -1544,7 +1544,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1619,7 +1619,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1683,7 +1683,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1737,7 +1737,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1794,7 +1794,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1871,7 +1871,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -1966,7 +1966,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2024,7 +2024,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2078,7 +2078,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2132,7 +2132,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2186,7 +2186,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2240,7 +2240,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2314,7 +2314,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2368,7 +2368,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2422,7 +2422,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2476,7 +2476,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2530,7 +2530,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2584,7 +2584,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2654,7 +2654,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2708,7 +2708,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down Expand Up @@ -2762,7 +2762,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
Expand Down
3 changes: 1 addition & 2 deletions plc4go/pkg/api/config/config.go
Expand Up @@ -21,7 +21,6 @@ package config

import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -67,7 +66,7 @@ func WithTraceDefaultMessageCodecWorker(traceWorkers bool) WithOption {

// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
return pool.WithExecutorOptionTracerWorkers(traceWorkers)
return options.WithExecutorOptionTracerWorkers(traceWorkers)
}

// WithOption is a marker interface for options
Expand Down
21 changes: 21 additions & 0 deletions plc4go/spi/options/Option.go
Expand Up @@ -118,6 +118,22 @@ func ExtractTraceDefaultMessageCodecWorker(options ...WithOption) bool {
return false
}

// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
func WithExecutorOptionTracerWorkers(traceWorkers bool) WithOption {
return &withTracerExecutorWorkersOption{traceWorkers: traceWorkers}
}

// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
func ExtractTracerWorkers(_options ...WithOption) (traceWorkers bool, found bool) {
for _, option := range _options {
switch option := option.(type) {
case *withTracerExecutorWorkersOption:
return option.traceWorkers, true
}
}
return false, false
}

// GetLoggerContextForModel returns a log context if the WithPassLoggerToModel WithOption is set
func GetLoggerContextForModel(ctx context.Context, log zerolog.Logger, options ...WithOption) context.Context {
passToModel := false
Expand Down Expand Up @@ -166,6 +182,11 @@ type withTraceDefaultMessageCodecWorker struct {
traceWorkers bool
}

type withTracerExecutorWorkersOption struct {
Option
traceWorkers bool
}

//
//
///////////////////////////////////////
Expand Down
37 changes: 2 additions & 35 deletions plc4go/spi/pool/WorkerPool.go
Expand Up @@ -60,7 +60,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
worker: workers,
log: customLogger,
}
_executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = _executor
}
Expand All @@ -77,7 +77,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
},
maxNumberOfWorkers: maxNumberOfWorkers,
}
_executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
// We spawn one initial worker
w := worker{
id: 0,
Expand All @@ -89,36 +89,3 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
_executor.worker = append(_executor.worker, &w)
return _executor
}

// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
return &tracerWorkersOption{traceWorkers: traceWorkers}
}

// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
func ExtractTracerWorkers(_options ...options.WithOption) (traceWorkers bool, found bool) {
for _, option := range _options {
switch option := option.(type) {
case *tracerWorkersOption:
return option.traceWorkers, true
}
}
return false, false
}

///////////////////////////////////////
///////////////////////////////////////
//
// Internal section
//

type tracerWorkersOption struct {
options.Option
traceWorkers bool
}

//
// Internal section
//
///////////////////////////////////////
///////////////////////////////////////
28 changes: 3 additions & 25 deletions plc4go/spi/pool/WorkerPool_test.go
Expand Up @@ -46,7 +46,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
Expand All @@ -104,7 +104,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 2,
queueDepth: 2,
options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
Expand Down Expand Up @@ -175,25 +175,3 @@ func TestNewDynamicExecutor(t *testing.T) {
})
}
}

func TestWithExecutorOptionTracerWorkers(t *testing.T) {
type args struct {
traceWorkers bool
}
tests := []struct {
name string
args args
executorValidator options.WithOption
}{
{
name: "option should set option",
args: args{traceWorkers: true},
executorValidator: &tracerWorkersOption{traceWorkers: true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.executorValidator, WithExecutorOptionTracerWorkers(tt.args.traceWorkers))
})
}
}
5 changes: 2 additions & 3 deletions plc4go/spi/testutils/TestUtils.go
Expand Up @@ -22,7 +22,6 @@ package testutils
import (
"context"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/utils"
"os"
"runtime/debug"
Expand Down Expand Up @@ -155,14 +154,14 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
// EnrichOptionsWithOptionsForTesting appends options useful for testing to config.WithOption s
func EnrichOptionsWithOptionsForTesting(t *testing.T, _options ...options.WithOption) []options.WithOption {
traceWorkers := true
if extractedTraceWorkers, found := pool.ExtractTracerWorkers(_options...); found {
if extractedTraceWorkers, found := options.ExtractTracerWorkers(_options...); found {
traceWorkers = extractedTraceWorkers
}
// TODO: apply to other options like above
return append(_options,
options.WithCustomLogger(ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
pool.WithExecutorOptionTracerWorkers(traceWorkers),
options.WithExecutorOptionTracerWorkers(traceWorkers),
)
}

Expand Down
8 changes: 7 additions & 1 deletion plc4go/spi/transactions/RequestTransactionManager.go
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog/log"
"io"
"runtime"
"sync"
Expand All @@ -39,7 +40,12 @@ import (
var sharedExecutorInstance pool.Executor // shared instance

func init() {
sharedExecutorInstance = pool.NewFixedSizeExecutor(runtime.NumCPU(), 100, pool.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance = pool.NewFixedSizeExecutor(
runtime.NumCPU(),
100,
options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
)
sharedExecutorInstance.Start()
runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
sharedExecutorInstance.Stop()
Expand Down

0 comments on commit ac95770

Please sign in to comment.