Skip to content

Commit

Permalink
fix(plc4go/spi): harden request transaction manager implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 2, 2023
1 parent 25480b1 commit b9c89eb
Show file tree
Hide file tree
Showing 6 changed files with 613 additions and 39 deletions.
41 changes: 41 additions & 0 deletions plc4go/internal/cbus/mock_RequestTransaction_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 52 additions & 11 deletions plc4go/spi/transactions/RequestTransactionManager.go
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var sharedExecutorInstance pool.Executor // shared instance
Expand All @@ -58,6 +57,8 @@ type RequestTransaction interface {
Submit(operation RequestTransactionRunnable)
// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
AwaitCompletion(ctx context.Context) error
// IsCompleted indicates that the that this RequestTransaction is completed
IsCompleted() bool
}

// RequestTransactionManager handles transactions
Expand All @@ -79,6 +80,8 @@ func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...op
workLog: *list.New(),
executor: sharedExecutorInstance,

traceTransactionManagerTransactions: config.TraceTransactionManagerTransactions,

log: options.ExtractCustomLogger(_options...),
}
for _, option := range _options {
Expand Down Expand Up @@ -114,6 +117,9 @@ type requestTransaction struct {
operation pool.Runnable
completionFuture pool.CompletionFuture

stateChangeMutex sync.Mutex
completed bool

transactionLog zerolog.Logger
}

Expand All @@ -129,8 +135,12 @@ type requestTransactionManager struct {
workLogMutex sync.RWMutex
executor pool.Executor

// Indicates it this rtm is in shutdown
shutdown bool

// flag set to true if it should trace transactions
traceTransactionManagerTransactions bool

log zerolog.Logger
}

Expand Down Expand Up @@ -179,26 +189,35 @@ func (r *requestTransactionManager) processWorklog() {
}
}

type completedFuture struct {
err error
}

func (c completedFuture) AwaitCompletion(_ context.Context) error {
return c.err
}

func (completedFuture) Cancel(_ bool, _ error) {
// No op
}

func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
currentTransactionId := r.transactionId
r.transactionId += 1
transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
if !config.TraceTransactionManagerTransactions {
transactionLogger := r.log.With().Int32("transactionId", currentTransactionId).Logger()
if !r.traceTransactionManagerTransactions {
transactionLogger = zerolog.Nop()
}
transaction := &requestTransaction{
r,
currentTransactionId,
nil,
nil,
transactionLogger,
parent: r,
transactionId: currentTransactionId,
transactionLog: transactionLogger,
}
if r.shutdown {
if err := r.failRequest(transaction, errors.New("request transaction manager in shutdown")); err != nil {
r.log.Error().Err(err).Msg("error shutting down transaction")
}
transaction.completed = true
transaction.completionFuture = completedFuture{errors.New("request transaction manager in shutdown")}
}
return transaction
}
Expand Down Expand Up @@ -271,17 +290,35 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
}

func (t *requestTransaction) FailRequest(err error) error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.Wrap(err, "calling fail on a already completed transaction")
}
t.transactionLog.Trace().Msg("Fail the request")
t.completed = true
return t.parent.failRequest(t, err)
}

func (t *requestTransaction) EndRequest() error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.New("calling end on a already completed transaction")
}
t.transactionLog.Trace().Msg("Ending the request")
t.completed = true
// Remove it from Running Requests
return t.parent.endRequest(t)
}

func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
return
}
if t.operation != nil {
t.transactionLog.Warn().Msg("Operation already set")
}
Expand Down Expand Up @@ -315,6 +352,10 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
return nil
}

func (t *requestTransaction) IsCompleted() bool {
return t.completed
}

func (t *requestTransaction) String() string {
return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
}

0 comments on commit b9c89eb

Please sign in to comment.