Skip to content

Commit

Permalink
fix(plc4go): transaction should now be properly handled
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 2, 2023
1 parent 2ff14f3 commit 25480b1
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}

func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm))
return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
}

func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
Expand Down
12 changes: 10 additions & 2 deletions plc4go/internal/bacnetip/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package bacnetip
import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/rs/zerolog"
"time"

apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
Expand All @@ -42,16 +44,20 @@ type Reader struct {

maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted

log zerolog.Logger
}

func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
return &Reader{
invokeIdGenerator: invokeIdGenerator,
messageCodec: messageCodec,
tm: tm,

maxSegmentsAccepted: readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS,
maxApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476,

log: options.ExtractCustomLogger(_options...),
}
}

Expand Down Expand Up @@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}()
Expand Down
4 changes: 0 additions & 4 deletions plc4go/internal/cbus/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
return actualAlpha == expectedAlpha
}, func(receivedMessage spi.Message) error {
defer func(transaction transactions.RequestTransaction) {
// This is just to make sure we don't forget to close the transaction here
_ = transaction.EndRequest()
}(transaction)
// Convert the response into an
m.log.Trace().Msg("convert response to ")
messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/cbus/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
}, time.Second*1); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/eip/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions plc4go/internal/eip/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message"))
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
} else {
Expand Down Expand Up @@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message"))
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}*/
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/s7/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}()
Expand Down
4 changes: 3 additions & 1 deletion plc4go/internal/s7/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "error sending message"))
_ = transaction.EndRequest()
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")
}
}
})
}()
Expand Down

0 comments on commit 25480b1

Please sign in to comment.