From fe482d9305136b0a012f3dc86a005fdbab875ab1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20R=C3=BChl?= Date: Tue, 2 May 2023 15:32:55 +0200 Subject: [PATCH] refactor(plc4go/spi): introduce RequestTransactionRunnable This runnable contains the current transaction as argument so it doesn't need to be passed from the outside --- plc4go/internal/bacnetip/Reader.go | 3 +- .../internal/cbus/CBusMessageMapper_test.go | 44 +++++++++---------- plc4go/internal/cbus/Reader.go | 2 +- plc4go/internal/cbus/Writer.go | 2 +- plc4go/internal/eip/Reader.go | 2 +- plc4go/internal/eip/Writer.go | 4 +- plc4go/internal/s7/Reader.go | 2 +- plc4go/internal/s7/Writer.go | 2 +- plc4go/spi/RequestTransactionManager.go | 10 +++-- plc4go/spi/RequestTransactionManager_test.go | 6 +-- 10 files changed, 39 insertions(+), 38 deletions(-) diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go index d67378150ed..e67149466d1 100644 --- a/plc4go/internal/bacnetip/Reader.go +++ b/plc4go/internal/bacnetip/Reader.go @@ -130,8 +130,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { - + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire log.Trace().Msg("Send ") if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool { diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go index acf788b02bf..4da9984e321 100644 --- a/plc4go/internal/cbus/CBusMessageMapper_test.go +++ b/plc4go/internal/cbus/CBusMessageMapper_test.go @@ -1453,7 +1453,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1478,7 +1478,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1540,7 +1540,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1602,7 +1602,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1656,7 +1656,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1700,7 +1700,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1747,7 +1747,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1814,7 +1814,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1899,7 +1899,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1947,7 +1947,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -1991,7 +1991,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2035,7 +2035,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2079,7 +2079,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2123,7 +2123,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2187,7 +2187,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2231,7 +2231,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2275,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2319,7 +2319,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2363,7 +2363,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2407,7 +2407,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2467,7 +2467,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction @@ -2511,7 +2511,7 @@ func TestMapEncodedReply(t *testing.T) { transaction: func() spi.RequestTransaction { transactionManager := spi.NewRequestTransactionManager(1) transaction := transactionManager.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // NO-OP }) return transaction diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go index 64a2baa7715..967ec3ae13e 100644 --- a/plc4go/internal/cbus/Reader.go +++ b/plc4go/internal/cbus/Reader.go @@ -113,7 +113,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) { // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue) }) if err := transaction.AwaitCompletion(ctx); err != nil { diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go index 42d6fc8f3fe..46247c1ad46 100644 --- a/plc4go/internal/cbus/Writer.go +++ b/plc4go/internal/cbus/Writer.go @@ -101,7 +101,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques tagNameCopy := tagName // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire log.Trace().Msg("Send ") if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool { diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go index 6a77f8f6cce..65260715427 100644 --- a/plc4go/internal/eip/Reader.go +++ b/plc4go/internal/eip/Reader.go @@ -86,7 +86,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c } request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { if err := m.messageCodec.SendRequest(ctx, request, func(message spi.Message) bool { eipPacket := message.(readWriteModel.EipPacket) diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go index b8d2d10d6ed..b77256611a6 100644 --- a/plc4go/internal/eip/Writer.go +++ b/plc4go/internal/eip/Writer.go @@ -120,7 +120,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) < ) // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool { eipPacket := message.(readWriteModel.EipPacket) @@ -212,7 +212,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) < ) // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool { eipPacket := message.(readWriteModel.EipPacket) diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go index 3379200e78d..92ae8a9686f 100644 --- a/plc4go/internal/s7/Reader.go +++ b/plc4go/internal/s7/Reader.go @@ -95,7 +95,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c ) // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire log.Trace().Msg("Send ") diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go index e8bf2219538..5333146918f 100644 --- a/plc4go/internal/s7/Writer.go +++ b/plc4go/internal/s7/Writer.go @@ -100,7 +100,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) < // Start a new request-transaction (Is ended in the response-handler) transaction := m.tm.StartTransaction() - transaction.Submit(func() { + transaction.Submit(func(transaction spi.RequestTransaction) { // Send the over the wire if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool { tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly) diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go index de9f461aa2a..cf14625ad67 100644 --- a/plc4go/spi/RequestTransactionManager.go +++ b/plc4go/spi/RequestTransactionManager.go @@ -42,6 +42,8 @@ func init() { sharedExecutorInstance.Start() } +type RequestTransactionRunnable func(transaction RequestTransaction) + // RequestTransaction represents a transaction type RequestTransaction interface { fmt.Stringer @@ -49,8 +51,8 @@ type RequestTransaction interface { FailRequest(err error) error // EndRequest signals that this transaction is done EndRequest() error - // Submit submits a Runnable to the RequestTransactionManager - Submit(operation utils.Runnable) + // Submit submits a RequestTransactionRunnable to the RequestTransactionManager + Submit(operation RequestTransactionRunnable) // AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful AwaitCompletion(ctx context.Context) error } @@ -224,14 +226,14 @@ func (t *requestTransaction) EndRequest() error { return t.parent.endRequest(t) } -func (t *requestTransaction) Submit(operation utils.Runnable) { +func (t *requestTransaction) Submit(operation RequestTransactionRunnable) { if t.operation != nil { log.Warn().Msg("Operation already set") } t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId) t.operation = func() { t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId) - operation() + operation(t) t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId) } t.parent.submitTransaction(t) diff --git a/plc4go/spi/RequestTransactionManager_test.go b/plc4go/spi/RequestTransactionManager_test.go index e746b7569ae..0a590bef890 100644 --- a/plc4go/spi/RequestTransactionManager_test.go +++ b/plc4go/spi/RequestTransactionManager_test.go @@ -622,7 +622,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) { transactionLog zerolog.Logger } type args struct { - operation utils.Runnable + operation RequestTransactionRunnable } tests := []struct { name string @@ -635,7 +635,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) { parent: &requestTransactionManager{}, }, args: args{ - operation: func() { + operation: func(_ RequestTransaction) { // NOOP }, }, @@ -649,7 +649,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) { }, }, args: args{ - operation: func() { + operation: func(_ RequestTransaction) { // NOOP }, },