Skip to content

Commit

Permalink
fix(plc4go/cbus): change handling of error responses
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 10, 2022
1 parent dde3b01 commit aae82e9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 28 deletions.
102 changes: 79 additions & 23 deletions plc4go/internal/cbus/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type MessageCodec struct {
monitoredSALs chan readwriteModel.MonitoredSAL
lastPackageHash uint32
hashEncountered uint

currentlyReportedServerErrors uint
}

func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
Expand Down Expand Up @@ -99,26 +101,37 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
log.Trace().Msg("receiving")

ti := m.GetTransportInstance()
if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
hitCr := currentByte == '\r'
if hitCr {
// Make sure we peek one more
_, _ = reader.Peek(1)
return false
// Fill the buffer
{
if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
hitCr := currentByte == '\r'
if hitCr {
// Make sure we peek one more
_, _ = reader.Peek(1)
return false
}
return true
}); err != nil {
return nil, err
}
return true
}); err != nil {
return nil, err
}
readableBytes, err := ti.GetNumBytesAvailableInBuffer()
if err != nil {
log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
if readableBytes == 0 {
log.Trace().Msg("Nothing to read")
return nil, nil

// Check how many readable bytes we have
var readableBytes uint32
{
numBytesAvailableInBuffer, err := ti.GetNumBytesAvailableInBuffer()
if err != nil {
log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
if numBytesAvailableInBuffer == 0 {
log.Trace().Msg("Nothing to read")
return nil, nil
}
readableBytes = numBytesAvailableInBuffer
}

// Check for an isolated error
if bytes, err := ti.PeekReadableBytes(1); err != nil && (bytes[0] == '!') {
_, _ = ti.Read(1)
return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
Expand Down Expand Up @@ -179,6 +192,7 @@ lookingForTheEnd:
return nil, nil
}

// Build length
packetLength := indexOfCR + 1
if pciResponse {
packetLength = indexOfLF + 1
Expand All @@ -189,16 +203,58 @@ lookingForTheEnd:
panic("Invalid state... Can not be response and request at the same time")
}

read, err := ti.Read(uint32(packetLength))
if err != nil {
panic("Invalid state... If we have peeked that before we should be able to read that now")
// We need to ensure that there is no ! till the first /r
{
peekedBytes, err := ti.PeekReadableBytes(readableBytes)
if err != nil {
return nil, err
}
// We check in the current stream for reported errors
foundErrors := uint(0)
for _, peekedByte := range peekedBytes {
if peekedByte == '!' {
foundErrors++
}
if peekedByte == '\r' {
// We only look for errors within
}
}
// Now we report the errors one by one so for every request we get a proper rejection
if foundErrors > m.currentlyReportedServerErrors {
log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
m.currentlyReportedServerErrors++
return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
}
if foundErrors > 0 {
log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
m.currentlyReportedServerErrors = 0
}
log.Trace().Msgf("currentlyReportedServerErrors %d should be 0", m.currentlyReportedServerErrors)
}

var rawInput []byte
{
read, err := ti.Read(uint32(packetLength))
if err != nil {
panic("Invalid state... If we have peeked that before we should be able to read that now")
}
rawInput = read
}
var sanitizedInput []byte
// We remove every error marker we find
{
for _, b := range rawInput {
if b != '!' {
sanitizedInput = append(sanitizedInput, b)
}
}
}
rb := utils.NewReadBufferByteBased(read)
rb := utils.NewReadBufferByteBased(sanitizedInput)
cBusMessage, err := readwriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
if err != nil {
log.Debug().Err(err).Msg("First Parse Failed")
{ // Try SAL
rb := utils.NewReadBufferByteBased(read)
rb := utils.NewReadBufferByteBased(sanitizedInput)
cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, pciResponse, readwriteModel.NewRequestContext(false), m.cbusOptions)
if secondErr == nil {
return cBusMessage, nil
Expand All @@ -209,7 +265,7 @@ lookingForTheEnd:
{ // Try MMI
requestContext := readwriteModel.NewRequestContext(false)
cbusOptions := readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
rb := utils.NewReadBufferByteBased(read)
rb := utils.NewReadBufferByteBased(sanitizedInput)
cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
if secondErr == nil {
return cBusMessage, nil
Expand Down
26 changes: 21 additions & 5 deletions plc4go/protocols/knxnetip/readwrite/model/KnxManufacturer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit aae82e9

Please sign in to comment.