Skip to content

Commit

Permalink
refactor(plc4go/spi): introduce RequestTransactionRunnable
Browse files Browse the repository at this point in the history
This runnable contains the current transaction as argument so it doesn't need to be passed from the outside
  • Loading branch information
sruehl committed May 2, 2023
1 parent fcd62ce commit fe482d9
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 38 deletions.
3 changes: 1 addition & 2 deletions plc4go/internal/bacnetip/Reader.go
Expand Up @@ -130,8 +130,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)

// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {

transaction.Submit(func(transaction spi.RequestTransaction) {
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool {
Expand Down
44 changes: 22 additions & 22 deletions plc4go/internal/cbus/CBusMessageMapper_test.go
Expand Up @@ -1453,7 +1453,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand All @@ -1478,7 +1478,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1540,7 +1540,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1602,7 +1602,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1656,7 +1656,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1700,7 +1700,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1747,7 +1747,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1814,7 +1814,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1899,7 +1899,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1947,7 +1947,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -1991,7 +1991,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2035,7 +2035,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2079,7 +2079,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2123,7 +2123,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2187,7 +2187,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2231,7 +2231,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2275,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2319,7 +2319,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2363,7 +2363,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2407,7 +2407,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2467,7 +2467,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down Expand Up @@ -2511,7 +2511,7 @@ func TestMapEncodedReply(t *testing.T) {
transaction: func() spi.RequestTransaction {
transactionManager := spi.NewRequestTransactionManager(1)
transaction := transactionManager.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// NO-OP
})
return transaction
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/cbus/Reader.go
Expand Up @@ -113,7 +113,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/cbus/Writer.go
Expand Up @@ -101,7 +101,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
tagNameCopy := tagName
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/eip/Reader.go
Expand Up @@ -86,7 +86,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
}
request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
if err := m.messageCodec.SendRequest(ctx, request,
func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
Expand Down
4 changes: 2 additions & 2 deletions plc4go/internal/eip/Writer.go
Expand Up @@ -120,7 +120,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
Expand Down Expand Up @@ -212,7 +212,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/s7/Reader.go
Expand Up @@ -95,7 +95,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {

// Send the over the wire
log.Trace().Msg("Send ")
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/s7/Writer.go
Expand Up @@ -100,7 +100,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <

// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
transaction.Submit(func(transaction spi.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
Expand Down
10 changes: 6 additions & 4 deletions plc4go/spi/RequestTransactionManager.go
Expand Up @@ -42,15 +42,17 @@ func init() {
sharedExecutorInstance.Start()
}

type RequestTransactionRunnable func(transaction RequestTransaction)

// RequestTransaction represents a transaction
type RequestTransaction interface {
fmt.Stringer
// FailRequest signals that this transaction has failed
FailRequest(err error) error
// EndRequest signals that this transaction is done
EndRequest() error
// Submit submits a Runnable to the RequestTransactionManager
Submit(operation utils.Runnable)
// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
Submit(operation RequestTransactionRunnable)
// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
AwaitCompletion(ctx context.Context) error
}
Expand Down Expand Up @@ -224,14 +226,14 @@ func (t *requestTransaction) EndRequest() error {
return t.parent.endRequest(t)
}

func (t *requestTransaction) Submit(operation utils.Runnable) {
func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
if t.operation != nil {
log.Warn().Msg("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
t.operation = func() {
t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
operation()
operation(t)
t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
}
t.parent.submitTransaction(t)
Expand Down
6 changes: 3 additions & 3 deletions plc4go/spi/RequestTransactionManager_test.go
Expand Up @@ -622,7 +622,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
transactionLog zerolog.Logger
}
type args struct {
operation utils.Runnable
operation RequestTransactionRunnable
}
tests := []struct {
name string
Expand All @@ -635,7 +635,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
parent: &requestTransactionManager{},
},
args: args{
operation: func() {
operation: func(_ RequestTransaction) {
// NOOP
},
},
Expand All @@ -649,7 +649,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
},
},
args: args{
operation: func() {
operation: func(_ RequestTransaction) {
// NOOP
},
},
Expand Down

0 comments on commit fe482d9

Please sign in to comment.