Skip to content

Commit

Permalink
refactor(spi): optimize RequestTransactionManager
Browse files Browse the repository at this point in the history
+ fixed shared instance being set by every NewFixedSizeExecutor call
+ AwaitCompletion returns err
+ fixed typos
+ added AwaitCompletion to the RequestTransaction to wait for a transaction to finish
  • Loading branch information
sruehl committed Aug 29, 2022
1 parent 1216bbc commit 49809c5
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 39 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewFieldHandler()),
tm: spi.NewRequestTransactionManager(math.MaxInt),
tm: *spi.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/cbus/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("c-bus", "Clipsal Bus", "tcp", NewFieldHandler()),
tm: spi.NewRequestTransactionManager(1),
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/eip/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("eip", "EthernetIP", "tcp", NewFieldHandler()),
tm: spi.NewRequestTransactionManager(1),
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/s7/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("s7", "Siemens S7 (Basic)", "tcp", NewFieldHandler()),
tm: spi.NewRequestTransactionManager(1),
tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
Expand Down
94 changes: 59 additions & 35 deletions plc4go/spi/RequestTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"runtime"
"sync"
"time"
)

/** Executor that performs all operations */
var executor Executor // shared instance
var sharedExecutorInstance Executor // shared instance

func init() {
executor = *NewFixedSizeExecutor(4)
executor.start()
sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
sharedExecutorInstance.start()
}

type Runnable func()
Expand Down Expand Up @@ -113,7 +113,7 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
executor: nil,
}
}
executor = Executor{
executor := Executor{
queue: make(chan WorkItem, 100),
worker: workers,
}
Expand Down Expand Up @@ -186,10 +186,11 @@ func (f CompletionFuture) complete() {
f.completed = true
}

func (f CompletionFuture) AwaitCompletion() {
func (f CompletionFuture) AwaitCompletion() error {
for !f.completed || !f.errored {
time.Sleep(time.Millisecond * 10)
}
return f.err
}

type RequestTransaction struct {
Expand All @@ -207,30 +208,44 @@ func (t *RequestTransaction) String() string {
return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
}

// RequestTransactionManager handles transactions
type RequestTransactionManager struct {
runningRequests []*RequestTransaction
// How many Transactions are allowed to run at the same time?
numberOfConcurrentRequests int
// Assigns each request a Unique Transaction Id, especially important for failure handling
transactionId int32
transationMutex sync.RWMutex
transactionId int32
transactionMutex sync.RWMutex
// Important, this is a FIFO Queue for Fairness!
worklog list.List
worklogMutex sync.RWMutex
workLog list.List
workLogMutex sync.RWMutex
executor *Executor
}

func NewRequestTransactionManager(numberOfConcurrentRequests int) RequestTransactionManager {
return RequestTransactionManager{
// NewRequestTransactionManager creates a new RequestTransactionManager
func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransactionManagerOptions ...RequestTransactionManagerOption) *RequestTransactionManager {
requestTransactionManager := &RequestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
transactionId: 0,
worklog: *list.New(),
workLog: *list.New(),
executor: &sharedExecutorInstance,
}
for _, requestTransactionManagerOption := range requestTransactionManagerOptions {
requestTransactionManagerOption(requestTransactionManager)
}
return requestTransactionManager
}

func (r *RequestTransactionManager) getNumberOfConcurrentRequests() int {
return r.numberOfConcurrentRequests
type RequestTransactionManagerOption func(requestTransactionManager *RequestTransactionManager)

// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
return func(requestTransactionManager *RequestTransactionManager) {
requestTransactionManager.executor = executor
}
}

// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
// If we reduced the number of concurrent requests and more requests are in-flight
Expand All @@ -251,34 +266,36 @@ func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
}
// Add this Request with this handle i the Worklog
// Put Transaction into Worklog
r.worklogMutex.Lock()
r.worklog.PushFront(handle)
r.worklogMutex.Unlock()
r.workLogMutex.Lock()
r.workLog.PushFront(handle)
r.workLogMutex.Unlock()
// Try to Process the Worklog
r.processWorklog()
}

func (r *RequestTransactionManager) processWorklog() {
r.worklogMutex.RLock()
defer r.worklogMutex.RUnlock()
log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests)
for len(r.runningRequests) < r.numberOfConcurrentRequests && r.worklog.Len() > 0 {
front := r.worklog.Front()
r.workLogMutex.RLock()
defer r.workLogMutex.RUnlock()
log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
front := r.workLog.Front()
if front == nil {
return
}
next := front.Value.(*RequestTransaction)
log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
completionFuture := executor.submit(next.transactionId, next.operation)
// TODO: use sharedInstance if none is present
completionFuture := sharedExecutorInstance.submit(next.transactionId, next.operation)
next.completionFuture = completionFuture
r.worklog.Remove(front)
r.workLog.Remove(front)
}
}

// StartTransaction starts a RequestTransaction
func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
r.transationMutex.Lock()
defer r.transationMutex.Unlock()
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
currentTransactionId := r.transactionId
r.transactionId += 1
transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
Expand Down Expand Up @@ -322,36 +339,43 @@ func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction)
}
transaction.transactionLog.Debug().Msg("Removing the existing transaction transaction")
r.runningRequests = append(r.runningRequests[:index], r.runningRequests[index+1:]...)
// Process the worklog, a slot should be free now
transaction.transactionLog.Debug().Msg("Processing the worklog")
// Process the workLog, a slot should be free now
transaction.transactionLog.Debug().Msg("Processing the workLog")
r.processWorklog()
return nil
}

// FailRequest signals that this transaction has failed
func (t *RequestTransaction) FailRequest(err error) error {
t.transactionLog.Trace().Msg("Fail the request")
return t.parent.failRequest(t, err)
}

// EndRequest signals that this transaction is done
func (t *RequestTransaction) EndRequest() error {
t.transactionLog.Trace().Msg("Ending the request")
// Remove it from Running Requests
return t.parent.endRequest(t)
}

// Submit submits a Runnable to the RequestTransactionManager
func (t *RequestTransaction) Submit(operation Runnable) {
if t.operation != nil {
panic("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
t.operation = t.NewTransactionOperation(operation)
t.operation = func() {
t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
operation()
t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
}
t.parent.submitHandle(t)
}

func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) Runnable {
return func() {
t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
delegate()
t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
func (t *RequestTransaction) AwaitCompletion() error {
for t.completionFuture == nil {
time.Sleep(time.Millisecond * 10)
}
return t.completionFuture.AwaitCompletion()
}

0 comments on commit 49809c5

Please sign in to comment.