Skip to content

Commit

Permalink
refactor(plc4go/spi): split up request transaction into separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 13, 2023
1 parent ff18104 commit d7d5491
Show file tree
Hide file tree
Showing 5 changed files with 501 additions and 401 deletions.
148 changes: 148 additions & 0 deletions plc4go/spi/transactions/RequestTransaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package transactions

import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"sync"
"time"
)

// RequestTransaction represents a transaction
type RequestTransaction interface {
fmt.Stringer
// FailRequest signals that this transaction has failed
FailRequest(err error) error
// EndRequest signals that this transaction is done
EndRequest() error
// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
Submit(operation RequestTransactionRunnable)
// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
AwaitCompletion(ctx context.Context) error
// IsCompleted indicates that the that this RequestTransaction is completed
IsCompleted() bool
}

///////////////////////////////////////
///////////////////////////////////////
//
// Internal section
//

type requestTransaction struct {
parent *requestTransactionManager
transactionId int32

/** The initial operation to perform to kick off the request */
operation pool.Runnable
completionFuture pool.CompletionFuture

stateChangeMutex sync.Mutex
completed bool

transactionLog zerolog.Logger
}

//
// Internal section
//
///////////////////////////////////////
///////////////////////////////////////

func (t *requestTransaction) FailRequest(err error) error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.Wrap(err, "calling fail on a already completed transaction")
}
t.transactionLog.Trace().Msg("Fail the request")
t.completed = true
return t.parent.failRequest(t, err)
}

func (t *requestTransaction) EndRequest() error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.New("calling end on a already completed transaction")
}
t.transactionLog.Trace().Msg("Ending the request")
t.completed = true
// Remove it from Running Requests
return t.parent.endRequest(t)
}

func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
return
}
if t.operation != nil {
t.transactionLog.Warn().Msg("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
t.operation = func() {
t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
operation(t)
t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
}
t.parent.submitTransaction(t)
}

func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
defer cancelFunc()
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
if err := timeout.Err(); err != nil {
log.Error().Msg("Timout after a long time. This means something is very of here")
return errors.Wrap(err, "Error waiting for completion future to be set")
}
}
if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
return err
}
stillActive := true
for stillActive {
stillActive = false
for _, runningRequest := range t.parent.runningRequests {
if runningRequest.transactionId == t.transactionId {
stillActive = true
break
}
}
}
return nil
}

func (t *requestTransaction) IsCompleted() bool {
return t.completed
}

func (t *requestTransaction) String() string {
return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
}
113 changes: 0 additions & 113 deletions plc4go/spi/transactions/RequestTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package transactions
import (
"container/list"
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"io"
Expand All @@ -46,21 +45,6 @@ func init() {

type RequestTransactionRunnable func(RequestTransaction)

// RequestTransaction represents a transaction
type RequestTransaction interface {
fmt.Stringer
// FailRequest signals that this transaction has failed
FailRequest(err error) error
// EndRequest signals that this transaction is done
EndRequest() error
// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
Submit(operation RequestTransactionRunnable)
// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
AwaitCompletion(ctx context.Context) error
// IsCompleted indicates that the that this RequestTransaction is completed
IsCompleted() bool
}

// RequestTransactionManager handles transactions
type RequestTransactionManager interface {
io.Closer
Expand Down Expand Up @@ -109,20 +93,6 @@ type withCustomExecutor struct {
executor pool.Executor
}

type requestTransaction struct {
parent *requestTransactionManager
transactionId int32

/** The initial operation to perform to kick off the request */
operation pool.Runnable
completionFuture pool.CompletionFuture

stateChangeMutex sync.Mutex
completed bool

transactionLog zerolog.Logger
}

type requestTransactionManager struct {
runningRequests []*requestTransaction
// How many transactions are allowed to run at the same time?
Expand Down Expand Up @@ -189,18 +159,6 @@ func (r *requestTransactionManager) processWorklog() {
}
}

type completedFuture struct {
err error
}

func (c completedFuture) AwaitCompletion(_ context.Context) error {
return c.err
}

func (completedFuture) Cancel(_ bool, _ error) {
// No op
}

func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
Expand Down Expand Up @@ -288,74 +246,3 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
r.runningRequests = nil
return r.executor.Close()
}

func (t *requestTransaction) FailRequest(err error) error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.Wrap(err, "calling fail on a already completed transaction")
}
t.transactionLog.Trace().Msg("Fail the request")
t.completed = true
return t.parent.failRequest(t, err)
}

func (t *requestTransaction) EndRequest() error {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
return errors.New("calling end on a already completed transaction")
}
t.transactionLog.Trace().Msg("Ending the request")
t.completed = true
// Remove it from Running Requests
return t.parent.endRequest(t)
}

func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
t.stateChangeMutex.Lock()
defer t.stateChangeMutex.Unlock()
if t.completed {
t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
return
}
if t.operation != nil {
t.transactionLog.Warn().Msg("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
t.operation = func() {
t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
operation(t)
t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
}
t.parent.submitTransaction(t)
}

func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
// TODO: this should timeout and not loop infinite...
}
if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
return err
}
stillActive := true
for stillActive {
stillActive = false
for _, runningRequest := range t.parent.runningRequests {
if runningRequest.transactionId == t.transactionId {
stillActive = true
break
}
}
}
return nil
}

func (t *requestTransaction) IsCompleted() bool {
return t.completed
}

func (t *requestTransaction) String() string {
return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
}
Loading

0 comments on commit d7d5491

Please sign in to comment.