Skip to content

Commit

Permalink
refactor(plc4go/cbus): split up reader into multiple methods
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed May 2, 2023
1 parent df293b2 commit fcd62ce
Showing 1 changed file with 85 additions and 81 deletions.
166 changes: 85 additions & 81 deletions plc4go/internal/cbus/Reader.go
Expand Up @@ -101,7 +101,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
}
return
}
m.sendMessage(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
}
readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
result <- &spiModel.DefaultPlcReadRequestResult{
Expand All @@ -110,94 +110,98 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
}
}

func (m *Reader) sendMessage(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
if !ok {
return false
}
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
return false
}
// Check if this errored
if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
// This means we must handle this below
return true
}
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
log.Warn().Err(err).Msg("Error while awaiting completion")
}
}

confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
if !ok {
return false
}
return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
}, func(receivedMessage spi.Message) error {
defer func(transaction spi.RequestTransaction) {
// This is just to make sure we don't forget to close the transaction here
_ = transaction.EndRequest()
}(transaction)
// Convert the response into an
log.Trace().Msg("convert response to ")
cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
log.Trace().Msg("We got a server failure")
addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
return transaction.EndRequest()
}
replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
var responseCode apiModel.PlcResponseCode
switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
default:
return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
}
log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
addResponseCode(tagName, responseCode)
return transaction.EndRequest()
}
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
if !ok {
return false
}
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
return false
}
// Check if this errored
if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
// This means we must handle this below
return true
}

alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
// TODO: it could be double confirmed but this is not implemented yet
embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
if !ok {
log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
return transaction.EndRequest()
confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
if !ok {
return false
}
return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
}, func(receivedMessage spi.Message) error {
defer func(transaction spi.RequestTransaction) {
// This is just to make sure we don't forget to close the transaction here
_ = transaction.EndRequest()
}(transaction)
// Convert the response into an
log.Trace().Msg("convert response to ")
cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
log.Trace().Msg("We got a server failure")
addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
return transaction.EndRequest()
}
replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
var responseCode apiModel.PlcResponseCode
switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
responseCode = apiModel.PlcResponseCode_INVALID_DATA
default:
return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
}
log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
addResponseCode(tagName, responseCode)
return transaction.EndRequest()
}

log.Trace().Msg("Handling confirmed data")
// TODO: check if we can use a plcValueSerializer
encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
return errors.Wrap(err, "error encoding reply")
}
alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
// TODO: it could be double confirmed but this is not implemented yet
embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
if !ok {
log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
return transaction.EndRequest()
}, func(err error) error {
addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
return transaction.FailRequest(err)
}, time.Second*1); err != nil {
log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
log.Debug().Err(err).Msg("Error failing request")
}
}
})
if err := transaction.AwaitCompletion(ctx); err != nil {
log.Warn().Err(err).Msg("Error while awaiting completion")

log.Trace().Msg("Handling confirmed data")
// TODO: check if we can use a plcValueSerializer
encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
return errors.Wrap(err, "error encoding reply")
}
return transaction.EndRequest()
}, func(err error) error {
addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
return transaction.FailRequest(err)
}, time.Second*1); err != nil {
log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
log.Debug().Err(err).Msg("Error failing request")
}
}
}

0 comments on commit fcd62ce

Please sign in to comment.