Skip to content

Commit

Permalink
fix(plc4go/cbus): fix error when reader doesn't get a alpha capable r…
Browse files Browse the repository at this point in the history
…esponse
  • Loading branch information
sruehl committed Jun 16, 2023
1 parent 04662cf commit bf275e2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 57 deletions.
15 changes: 11 additions & 4 deletions plc4go/internal/cbus/Reader.go
Expand Up @@ -139,14 +139,16 @@ func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToS
func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
m.log.Trace().Msg("send over the wire")
ttl := time.Second * 5
if deadline, ok := ctx.Deadline(); ok {
m.log.Debug().Msgf("Message expires in %s", deadline.Sub(time.Now()))
ttl = -time.Since(deadline)
m.log.Debug().Msgf("setting ttl to %s", ttl)
}
if err := m.messageCodec.SendRequest(
ctx,
messageToSend,
func(cbusMessage spi.Message) bool {
m.log.Trace().Msgf("Checking\n%T", cbusMessage)
m.log.Trace().Msgf("Checking %T", cbusMessage)
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
m.log.Trace().Msg("Not a message to client")
Expand All @@ -166,7 +168,12 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
}
actualAlpha := confirmation.GetConfirmation().GetAlpha().GetCharacter()
// TODO: assert that this is a CBusMessageToServer indeed (by changing param for example)
expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
alphaRetriever, ok := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha })
if !ok {
m.log.Trace().Msg("no alpha there")
return false
}
expectedAlpha := alphaRetriever.GetAlpha().GetCharacter()
m.log.Trace().Msgf("Comparing expected alpha '%c' to actual alpha '%c'", expectedAlpha, actualAlpha)
return actualAlpha == expectedAlpha
},
Expand Down Expand Up @@ -223,7 +230,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
return transaction.FailRequest(err)
},
time.Second*5); err != nil {
ttl); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
Expand Down
67 changes: 14 additions & 53 deletions plc4go/internal/cbus/Reader_test.go
Expand Up @@ -234,11 +234,7 @@ func TestReader_readSync(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
readRequest: spiModel.NewDefaultPlcReadRequest(
map[string]apiModel.PlcTag{
"blub": NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Type, 1),
Expand Down Expand Up @@ -397,11 +393,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: nil,
addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
return func(name string, responseCode apiModel.PlcResponseCode) {
Expand Down Expand Up @@ -440,16 +432,12 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
},
},
{
name: "Send message which responds with message to client",
name: "Send message which responds with message to server",
fields: fields{
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestReset(
readWriteModel.RequestType_RESET,
Expand All @@ -470,7 +458,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
return func(name string, responseCode apiModel.PlcResponseCode) {
t.Logf("Got response code %s for %s", responseCode, name)
assert.Equal(t, "horst", name)
assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
}
},
tagName: "horst",
Expand Down Expand Up @@ -527,11 +515,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestReset(
readWriteModel.RequestType_RESET,
Expand Down Expand Up @@ -610,11 +594,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -695,11 +675,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -780,11 +756,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -865,11 +837,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -950,11 +918,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -1035,11 +999,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
},
args: args{
ctx: func() context.Context {
timeout, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
return timeout
}(),
ctx: testutils.TestContext(t),
messageToSend: readWriteModel.NewCBusMessageToServer(
readWriteModel.NewRequestDirectCommandAccess(
readWriteModel.NewCALDataIdentify(
Expand Down Expand Up @@ -1125,10 +1085,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
alphaGenerator: tt.fields.alphaGenerator,
messageCodec: tt.fields.messageCodec,
tm: tt.fields.tm,
log: testutils.ProduceTestingLogger(t),
}
m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t))
t.Log("Waiting now")
timer := time.NewTimer(3 * time.Second)
timer := time.NewTimer(10 * time.Second)
defer utils.CleanupTimer(timer)
select {
case <-ch:
Expand Down

0 comments on commit bf275e2

Please sign in to comment.