Skip to content

Commit

Permalink
feat(plc4go/spi): integrate ctx into DefaultCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 15, 2022
1 parent fbe964b commit aa93c27
Show file tree
Hide file tree
Showing 22 changed files with 1,398 additions and 1,527 deletions.
79 changes: 37 additions & 42 deletions plc4go/internal/ads/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
result := make(chan model.PlcReadRequestResult)
go func() {
if len(readRequest.GetFieldNames()) <= 1 {
m.singleRead(readRequest, result)
m.singleRead(ctx, readRequest, result)
} else {
m.multiRead(readRequest, result)
m.multiRead(ctx, readRequest, result)
}
}()
return result
}

func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
func (m *Reader) singleRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
if len(readRequest.GetFieldNames()) != 1 {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand All @@ -96,7 +96,7 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
log.Debug().Msgf("Invalid field item type %T", field)
return
}
field, err = m.resolveField(adsField)
field, err = m.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand Down Expand Up @@ -150,10 +150,10 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
}
userdata.Data = readWriteModel.NewAdsReadRequest(adsField.IndexGroup, adsField.IndexOffset, readLength)

m.sendOverTheWire(userdata, readRequest, result)
m.sendOverTheWire(ctx, userdata, readRequest, result)
}

func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
func (m *Reader) multiRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
// Calculate the size of all fields together.
// Calculate the expected size of the response data.
expectedResponseDataSize := uint32(0)
Expand Down Expand Up @@ -216,7 +216,7 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
log.Debug().Msgf("Invalid field item type %T", field)
return
}
field, err = m.resolveField(adsField)
field, err = m.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand All @@ -242,10 +242,10 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
}
userdata.Data = readWriteModel.NewAdsReadWriteRequest(uint32(readWriteModel.ReservedIndexGroups_ADSIGRP_MULTIPLE_READ), uint32(len(readRequest.GetFieldNames())), expectedResponseDataSize, items, nil)

m.sendOverTheWire(userdata, readRequest, result)
m.sendOverTheWire(ctx, userdata, readRequest, result)
}

func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
func (m *Reader) sendOverTheWire(ctx context.Context, userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
// Calculate a new transaction identifier
transactionIdentifier := atomic.AddUint32(&m.transactionIdentifier, 1)
if transactionIdentifier > math.MaxUint8 {
Expand All @@ -271,42 +271,37 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest

// Send the TCP Paket over the wire
log.Trace().Msg("Send TCP Paket")
if err := m.messageCodec.SendRequest(
amsTcpPaket,
func(message spi.Message) bool {
paket := message.(readWriteModel.AmsTCPPacket)
return paket.GetUserdata().GetInvokeId() == transactionIdentifier
},
func(message spi.Message) error {
// Convert the response into an amsTcpPaket
log.Trace().Msg("convert response to amsTcpPaket")
receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
// Convert the ads response into a PLC4X response
log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)
if err := m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
paket := message.(readWriteModel.AmsTCPPacket)
return paket.GetUserdata().GetInvokeId() == transactionIdentifier
}, func(message spi.Message) error {
// Convert the response into an amsTcpPaket
log.Trace().Msg("convert response to amsTcpPaket")
receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
// Convert the ads response into a PLC4X response
log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)

if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
// TODO: should we return the error here?
return nil
}
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return nil
},
func(err error) error {
if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
Err: errors.Wrap(err, "Error decoding response"),
}
// TODO: should we return the error here?
return nil
},
time.Second*1); err != nil {
}
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return nil
}, func(err error) error {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return nil
}, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Expand All @@ -315,7 +310,7 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest
}
}

func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, error) {
func (m *Reader) resolveField(ctx context.Context, symbolicField SymbolicPlcField) (DirectPlcField, error) {
if directPlcField, ok := m.fieldMapping[symbolicField]; ok {
return directPlcField, nil
}
Expand Down Expand Up @@ -346,7 +341,7 @@ func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, e
result := make(chan model.PlcReadRequestResult)
go func() {
dummyRequest := plc4goModel.NewDefaultPlcReadRequest(map[string]model.PlcField{"dummy": DirectPlcField{PlcField: PlcField{Datatype: readWriteModel.AdsDataType_UINT32}}}, []string{"dummy"}, nil, nil)
m.sendOverTheWire(userdata, dummyRequest, result)
m.sendOverTheWire(ctx, userdata, dummyRequest, result)
}()
// We wait synchronous for the resolution response before we can continue
response := <-result
Expand Down
55 changes: 25 additions & 30 deletions plc4go/internal/ads/Writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
log.Debug().Msgf("Invalid field item type %T", field)
return
}
field, err = m.reader.resolveField(adsField)
field, err = m.reader.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Expand Down Expand Up @@ -159,39 +159,34 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
amsTcpPaket := readWriteModel.NewAmsTCPPacket(userdata)

// Send the TCP Paket over the wire
err = m.messageCodec.SendRequest(
amsTcpPaket,
func(message spi.Message) bool {
paket := readWriteModel.CastAmsTCPPacket(message)
return paket.GetUserdata().GetInvokeId() == transactionIdentifier
},
func(message spi.Message) error {
// Convert the response into an responseAmsTcpPaket
responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
// Convert the ads response into a PLC4X response
readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)
err = m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
paket := readWriteModel.CastAmsTCPPacket(message)
return paket.GetUserdata().GetInvokeId() == transactionIdentifier
}, func(message spi.Message) error {
// Convert the response into an responseAmsTcpPaket
responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
// Convert the ads response into a PLC4X response
readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)

if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
} else {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
}
return nil
},
func(err error) error {
if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
Err: errors.Wrap(err, "Error decoding response"),
}
} else {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
return nil
},
time.Second*1)
}
return nil
}, func(err error) error {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
return nil
}, time.Second*1)
}()
return result
}
Expand Down
95 changes: 45 additions & 50 deletions plc4go/internal/bacnetip/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,63 +188,58 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)

// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(
bvlc,
func(message spi.Message) bool {
bvlc, ok := message.(readWriteModel.BVLC)
if !ok {
log.Debug().Msgf("Received strange type %T", bvlc)
return false
}
var npdu readWriteModel.NPDU
if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
npdu = npduRetriever.GetNpdu()
} else {
log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
return false
}
if npdu.GetControl().GetMessageTypeFieldPresent() {
return false
}
if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
log.Debug().Err(err).Msg("Error getting invoke id")
return false
} else {
return invokeIdFromApdu == invokeId
}
},
func(message spi.Message) error {
// Convert the response into an
log.Trace().Msg("convert response to ")
apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()
if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
bvlc, ok := message.(readWriteModel.BVLC)
if !ok {
log.Debug().Msgf("Received strange type %T", bvlc)
return false
}
var npdu readWriteModel.NPDU
if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
npdu = npduRetriever.GetNpdu()
} else {
log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
return false
}
if npdu.GetControl().GetMessageTypeFieldPresent() {
return false
}
if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
log.Debug().Err(err).Msg("Error getting invoke id")
return false
} else {
return invokeIdFromApdu == invokeId
}
}, func(message spi.Message) error {
// Convert the response into an
log.Trace().Msg("convert response to ")
apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()

// TODO: implement segment handling
// TODO: implement segment handling

// Convert the bacnet response into a PLC4X response
log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)
// Convert the bacnet response into a PLC4X response
log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)

if err != nil {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return transaction.EndRequest()
},
func(err error) error {
if err != nil {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
},
time.Second*1); err != nil {
}
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return transaction.EndRequest()
}, func(err error) error {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Expand Down
Loading

0 comments on commit aa93c27

Please sign in to comment.