Skip to content

Commit

Permalink
fix(plc4go/cbus): reworked connection to use ack
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 1, 2022
1 parent f20adc9 commit e5acb84
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 31 deletions.
4 changes: 1 addition & 3 deletions plc4go/internal/cbus/Configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ type Configuration struct {
}

func ParseFromOptions(options map[string][]string) (Configuration, error) {
configuration := Configuration{
srchk: true,
}
configuration := Configuration{}
if srchk := getFromOptions(options, "srchk"); srchk != "" {
parseBool, err := strconv.ParseBool(srchk)
if err != nil {
Expand Down
160 changes: 136 additions & 24 deletions plc4go/internal/cbus/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/plc4x/plc4go/internal/spi"
"github.com/apache/plc4x/plc4go/internal/spi/default"
internalModel "github.com/apache/plc4x/plc4go/internal/spi/model"
"github.com/apache/plc4x/plc4go/internal/spi/plcerrors"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
Expand Down Expand Up @@ -179,59 +180,90 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
requestContext := &c.messageCodec.(*MessageCodec).requestContext

{
log.Debug().Msg("Send a reset Request")
log.Debug().Msg("Send a reset")
requestTypeReset := readWriteModel.RequestType_RESET
requestTypeResetByte := byte(readWriteModel.RequestType_RESET)
requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeResetByte, requestTypeReset, &requestTypeResetByte, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)

receivedResetEchoChan := make(chan bool)
receivedResetEchoErrorChan := make(chan error)
if err := c.messageCodec.SendRequest(
cBusMessage,
func(message spi.Message) bool {
cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
if !ok {
return false
}
_, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
return ok
},
func(message spi.Message) error {
receivedResetEchoChan <- true
return nil
},
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
log.Warn().Msg("Timeout during Connection establishing, closing channel...")
c.Close()
}
receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
return nil
},
c.GetTtl(),
); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
return
}

select {
case receivedResetEcho := <-receivedResetEchoChan:
log.Debug().Msgf("We received the echo {}", receivedResetEcho)
case err := <-receivedResetEchoErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
return
case timeout := <-time.After(time.Second * 2):
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
return
}
time.Sleep(time.Millisecond * 100)
}
{
log.Debug().Msg("Set application filter to all")
applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(0xFF), 1)
calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_APPLICATION_ADDRESS_1, 0x0, applicationAddress1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
return
}
time.Sleep(time.Millisecond * 100)
}
{
log.Debug().Msg("Set interface options 3")
interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(true, false, true, false), 1)
calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_3, 0x0, interfaceOptions3, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
var newCBusOptions readWriteModel.CBusOptions
newCBusOptions = readWriteModel.NewCBusOptions(false, false, false, true, false, false, false, false, false)
cbusOptions = &newCBusOptions
if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
return
}
time.Sleep(time.Millisecond * 100)
}
{
log.Debug().Msg("Set interface options 1 power up settings")
var newCBusOptions readWriteModel.CBusOptions
newCBusOptions = readWriteModel.NewCBusOptions(false, true, true, true, true, false, false, false, true)
cbusOptions = &newCBusOptions
interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true)), 1)
calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, 0x0, interfaceOptions1PowerUpSettings, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
return
}
time.Sleep(time.Millisecond * 100)
}
{
log.Debug().Msg("Set interface options 1")
var newCBusOptions readWriteModel.CBusOptions
newCBusOptions = readWriteModel.NewCBusOptions(false, true, true, true, true, false, false, false, true)
cbusOptions = &newCBusOptions
interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true), 1)
calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1, 0x0, interfaceOptions1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
return
}
time.Sleep(time.Millisecond * 100)
}
c.fireConnected(ch)

Expand All @@ -248,6 +280,86 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
}()
}

func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
// TODO: we assume that is always a one byte request otherwise we need to map the length here
calData := readWriteModel.NewCALDataWrite(paramNo, 0x0, parameterValue, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
cBusMessage := readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)

directCommandAckChan := make(chan bool)
directCommandAckErrorChan := make(chan error)
if err := c.messageCodec.SendRequest(
cBusMessage,
func(message spi.Message) bool {
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
switch reply := message.GetReply().(type) {
case readWriteModel.ReplyOrConfirmationReplyExactly:
switch reply := reply.GetReply().(type) {
case readWriteModel.ReplyEncodedReplyExactly:
switch encodedReply := reply.GetEncodedReply().(type) {
case readWriteModel.EncodedReplyCALReplyExactly:
switch data := encodedReply.GetCalReply().GetCalData().(type) {
case readWriteModel.CALDataAcknowledgeExactly:
if data.GetParamNo() == paramNo {
return true
}
}
}
}
}
}
return false
},
func(message spi.Message) error {
switch message := message.(type) {
case readWriteModel.CBusMessageToClientExactly:
switch reply := message.GetReply().(type) {
case readWriteModel.ReplyOrConfirmationReplyExactly:
switch reply := reply.GetReply().(type) {
case readWriteModel.ReplyEncodedReplyExactly:
switch encodedReply := reply.GetEncodedReply().(type) {
case readWriteModel.EncodedReplyCALReplyExactly:
switch data := encodedReply.GetCalReply().GetCalData().(type) {
case readWriteModel.CALDataAcknowledgeExactly:
if data.GetParamNo() == paramNo {
directCommandAckChan <- true
}
}
}
}
}
}
return nil
},
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
log.Warn().Msg("Timeout during Connection establishing, closing channel...")
c.Close()
}
directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
return nil
},
c.GetTtl(),
); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)
return false
}

select {
case receivedResetEcho := <-directCommandAckChan:
log.Debug().Msgf("We received the ack {}", receivedResetEcho)
case err := <-directCommandAckErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
case timeout := <-time.After(time.Second * 2):
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
return false
}
return true
}

func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
Expand Down
3 changes: 2 additions & 1 deletion plc4go/internal/cbus/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ lookingForTheEnd:
// This means a <cr> is directly followed by a <lf> which means that we know for sure this is a response
pciResponse = true
}
if !requestToPci && indexOfLF < 0 {
if !pciResponse && !requestToPci && indexOfLF < 0 {
// To be sure we might receive that package later we hash the bytes and check if we might receive one
hash := crc32.NewIEEE()
_, _ = hash.Write(peekedBytes)
Expand All @@ -153,6 +153,7 @@ lookingForTheEnd:
} else {
// after 90ms we give up finding a lf
m.lastPackageHash, m.hashEncountered = 0, 0
requestToPci = true
}
}
if !pciResponse && !requestToPci {
Expand Down
8 changes: 5 additions & 3 deletions plc4go/pkg/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package config

// TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
var TraceTransactionManagerWorkers bool
var TraceTransactionManagerTransactions bool
var TraceDefaultMessageCodecWorker bool
var (
TraceTransactionManagerWorkers bool
TraceTransactionManagerTransactions bool
TraceDefaultMessageCodecWorker bool
)

func init() {
TraceTransactionManagerWorkers = false
Expand Down
Loading

0 comments on commit e5acb84

Please sign in to comment.