Skip to content

Commit

Permalink
fix(plc4go/spi): potential fix with request transaction manager produ…
Browse files Browse the repository at this point in the history
…cing race conditions
  • Loading branch information
sruehl committed Jun 16, 2023
1 parent e6c897c commit 32c5531
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
10 changes: 5 additions & 5 deletions plc4go/spi/transactions/RequestTransactionManager.go
Expand Up @@ -24,10 +24,10 @@ import (
"context"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog/log"
"io"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/apache/plc4x/plc4go/pkg/api/config"
Expand All @@ -44,7 +44,7 @@ func init() {
runtime.NumCPU(),
100,
options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
config.WithCustomLogger(zerolog.Nop()),
)
sharedExecutorInstance.Start()
runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
Expand Down Expand Up @@ -117,7 +117,7 @@ type requestTransactionManager struct {

executor pool.Executor

shutdown bool // Indicates it this rtm is in shutdown
shutdown atomic.Bool // Indicates it this rtm is in shutdown

traceTransactionManagerTransactions bool // flag set to true if it should trace transactions

Expand Down Expand Up @@ -188,7 +188,7 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
transactionId: currentTransactionId,
transactionLog: transactionLogger,
}
if r.shutdown {
if r.shutdown.Load() {
transaction.completed = true
transaction.completionFuture = &completedFuture{errors.New("request transaction manager in shutdown")}
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (r *requestTransactionManager) Close() error {

func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
r.log.Debug().Msgf("closing with a timeout of %s", timeout)
r.shutdown = true
r.shutdown.Store(true)
if timeout > 0 {
timer := time.NewTimer(timeout)
defer utils.CleanupTimer(timer)
Expand Down
27 changes: 11 additions & 16 deletions plc4go/spi/transactions/RequestTransactionManager_test.go
Expand Up @@ -151,14 +151,14 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
traceTransactionManagerTransactions bool
}
tests := []struct {
name string
fields fields
setup func(t *testing.T, fields *fields)
wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
name string
fields fields
setup func(t *testing.T, fields *fields)
manipulator func(t *testing.T, manager *requestTransactionManager)
wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
}{
{
name: "start one",
Expand All @@ -169,8 +169,8 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
},
{
name: "start one in shutdown",
fields: fields{
shutdown: true,
manipulator: func(t *testing.T, manager *requestTransactionManager) {
manager.shutdown.Store(true)
},
wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
assert.True(t, requestTransaction.IsCompleted())
Expand All @@ -190,10 +190,12 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: testutils.ProduceTestingLogger(t),
}
if tt.manipulator != nil {
tt.manipulator(t, r)
}
if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
t.Errorf("StartTransaction() = %v", got)
}
Expand Down Expand Up @@ -448,7 +450,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
traceTransactionManagerTransactions bool
}
tests := []struct {
Expand Down Expand Up @@ -478,7 +479,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: testutils.ProduceTestingLogger(t),
}
Expand All @@ -494,7 +494,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
traceTransactionManagerTransactions bool
log zerolog.Logger
}
Expand Down Expand Up @@ -558,7 +557,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: tt.fields.log,
}
Expand All @@ -574,7 +572,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
traceTransactionManagerTransactions bool
}
tests := []struct {
Expand All @@ -598,7 +595,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
return v
}(),
executor: pool.NewFixedSizeExecutor(1, 1),
shutdown: true,
traceTransactionManagerTransactions: true,
},
want: `
Expand All @@ -609,7 +605,7 @@ func Test_requestTransactionManager_String(t *testing.T) {
║║ ╚══════════════╝╚══════════╝ ║ ║
║╚═════════════════════════════════════════╝ ║
║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
║║╔═running╗╔═shutdown╗ ║║ b1 true ║║
║║╔═running╗╔═shutdown╗ ║║b0 false ║║
║║║b0 false║║b0 false ║ ║╚═════════╝║
║║╚════════╝╚═════════╝ ║ ║
║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║ ║
Expand All @@ -635,7 +631,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
log: testutils.ProduceTestingLogger(t),
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 32c5531

Please sign in to comment.