Skip to content

Commit

Permalink
fix(plc4go/bacnet): fixed a bunch of broken code parts
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jan 13, 2023
1 parent 75872b4 commit 609f6af
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 117 deletions.
55 changes: 36 additions & 19 deletions plc4go/internal/bacnetip/ApplicationLayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ type SSMSAPRequirements interface {
GetDefaultMaximumApduLengthAccepted() readWriteModel.MaxApduLengthAccepted
}

type SSMProcessingRequirements interface {
processTask() error
}

// SSM - Segmentation State Machine
type SSM struct {
*OneShotTask
SSMProcessingRequirements

ssmSAP SSMSAPRequirements

Expand Down Expand Up @@ -120,7 +125,10 @@ type SSM struct {
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
}

func NewSSM(sap SSMSAPRequirements, pduAddress *Address) (SSM, error) {
func NewSSM(sap interface {
SSMSAPRequirements
SSMProcessingRequirements
}, pduAddress *Address) (*SSM, error) {
log.Debug().Interface("sap", sap).Interface("pdu_address", pduAddress).Msg("init")
var deviceInfo *DeviceInfo
deviceInfoTemp, ok := sap.GetDeviceInfoCache().GetDeviceInfo(DeviceInfoCacheKey{PduSource: pduAddress})
Expand Down Expand Up @@ -162,8 +170,7 @@ func NewSSM(sap SSMSAPRequirements, pduAddress *Address) (SSM, error) {
} else {
maxApduLengthAccepted = sap.GetDefaultMaximumApduLengthAccepted()
}
return SSM{
OneShotTask: NewOneShotTask(nil),
ssm := &SSM{
ssmSAP: sap,
pduAddress: pduAddress,
deviceInfo: deviceInfo,
Expand All @@ -174,7 +181,10 @@ func NewSSM(sap SSMSAPRequirements, pduAddress *Address) (SSM, error) {
segmentTimeout: segmentTimeout,
maxSegmentsAccepted: maxSegmentsAccepted,
maxApduLengthAccepted: maxApduLengthAccepted,
}, nil
}
ssm.OneShotTask = NewOneShotTask(ssm, nil)
ssm.SSMProcessingRequirements = sap
return ssm, nil
}

func (s *SSM) StartTimer(millis uint) {
Expand Down Expand Up @@ -363,7 +373,7 @@ func (s *SSM) fillWindow(sequenceNumber uint8) error {
if err != nil {
return errors.Wrapf(err, "Error sending out segment %d", i)
}
if err := s.ssmSAP.Request(NewPDU(apdu, WithPDUDestination(s.pduAddress))); err != nil {
if err := s.ssmSAP.Request(NewPDU(apdu.GetMessage(), WithPDUDestination(s.pduAddress))); err != nil {
log.Debug().Err(err).Msg("error sending request")
}
if moreFollows {
Expand All @@ -374,12 +384,16 @@ func (s *SSM) fillWindow(sequenceNumber uint8) error {
}

type ClientSSM struct {
SSM
*SSM
}

func NewClientSSM(sap SSMSAPRequirements, pduAddress *Address) (*ClientSSM, error) {
log.Debug().Interface("sap", sap).Interface("pduAddress", pduAddress).Msg("init")
ssm, err := NewSSM(sap, pduAddress)
c := &ClientSSM{}
ssm, err := NewSSM(struct {
SSMSAPRequirements
SSMProcessingRequirements
}{sap, c}, pduAddress)
if err != nil {
return nil, err
}
Expand All @@ -388,14 +402,13 @@ func NewClientSSM(sap SSMSAPRequirements, pduAddress *Address) (*ClientSSM, erro
// TODO: get entry for device, store it in inventory
log.Debug().Msg("Accquire device information")
}
return &ClientSSM{
SSM: ssm,
}, nil
c.SSM = ssm
return c, nil
}

// setState This function is called when the client wants to change state
func (c *ClientSSM) setState(newState SSMState, timer *uint) error {
log.Debug().Msgf("setState %c timer=%d", newState, timer)
log.Debug().Msgf("setState %s timer=%d", newState, timer)
// do the regular state change
if err := c.SSM.setState(newState, timer); err != nil {
return errors.Wrap(err, "error during SSM state transition")
Expand Down Expand Up @@ -556,7 +569,7 @@ func (c *ClientSSM) Confirmation(apdu _PDU) error {

// processTask This function is called when something has taken too long
func (c *ClientSSM) processTask() error {
log.Debug().Msg("processTask")
log.Debug().Msgf("processTask (currentState: %s)", c.state)
switch c.state {
case SSMState_SEGMENTED_REQUEST:
return c.segmentedRequestTimeout()
Expand Down Expand Up @@ -826,7 +839,7 @@ func (c *ClientSSM) awaitConfirmationTimeout() error {
}
c.retryCount = saveCount
} else {
log.Debug().Msg("retry count exceeded")
log.Debug().Msgf("retry count exceeded: %d >= %d", c.retryCount, c.numberOfApduRetries)

abort, err := c.abort(readWriteModel.BACnetAbortReason(65)) // Note: this is a proprietary code used by bacpypes for no response. We just use that here too to keep consistent
if err != nil {
Expand Down Expand Up @@ -945,13 +958,19 @@ func (c *ClientSSM) segmentedConfirmationTimeout() error {
}

type ServerSSM struct {
SSM
*SSM
segmentedResponseAccepted bool
}

func NewServerSSM(sap SSMSAPRequirements, pduAddress *Address) (*ServerSSM, error) {
log.Debug().Interface("sap", sap).Interface("pduAddress", pduAddress).Msg("init")
ssm, err := NewSSM(sap, pduAddress)
s := &ServerSSM{
segmentedResponseAccepted: true,
}
ssm, err := NewSSM(struct {
SSMSAPRequirements
SSMProcessingRequirements
}{sap, s}, pduAddress)
if err != nil {
return nil, err
}
Expand All @@ -960,10 +979,8 @@ func NewServerSSM(sap SSMSAPRequirements, pduAddress *Address) (*ServerSSM, erro
// TODO: get entry for device, store it in inventory
log.Debug().Msg("Accquire device information")
}
return &ServerSSM{
SSM: ssm,
segmentedResponseAccepted: true,
}, nil
s.SSM = ssm
return s, nil
}

// setState This function is called when the client wants to change state
Expand Down
29 changes: 17 additions & 12 deletions plc4go/internal/bacnetip/ApplicationModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"hash/fnv"
"net"
)

type DeviceInfo struct {
Expand Down Expand Up @@ -214,7 +213,7 @@ type Application struct {
controllers map[string]interface{}
helpers map[string]func(pdu _PDU) error

_startup_disabled bool
_startupDisabled bool
}

func NewApplication(localDevice *LocalDeviceObject, localAddress Address, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
Expand Down Expand Up @@ -256,7 +255,7 @@ func NewApplication(localDevice *LocalDeviceObject, localAddress Address, device
a.Collector = Collector{}

// if starting up is enabled, find all the startup functions
if !a._startup_disabled {
if !a._startupDisabled {
for _, fn := range a.CapabilityFunctions("startup") {
log.Debug().Msgf("startup fn %t", fn != nil)
fn()
Expand Down Expand Up @@ -296,7 +295,9 @@ func (a *Application) Indication(apdu _PDU) error {
if err := helperFn(apdu); err != nil {
log.Debug().Err(err).Msgf("err result")
// TODO: do proper mapping
a.Response(NewPDU(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0)))
if err := a.Response(NewPDU(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0))); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -346,7 +347,7 @@ func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
return queue.RequestIO(iocb)
}

func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu _PDU) error {
func (a *ApplicationIOController) _AppComplete(address *Address, apdu _PDU) error {
log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)

// look up the queue
Expand All @@ -366,10 +367,14 @@ func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu _PDU) erro
// this request is complete
switch apdu.(type) {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
queue.CompleteIO(queue.activeIOCB, apdu)
if err := queue.CompleteIO(queue.activeIOCB, apdu); err != nil {
return err
}
case readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly, readWriteModel.APDUAbortExactly:
// TODO: extract error
queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu))
if err := queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu)); err != nil {
return err
}
default:
return errors.New("unrecognized APDU type")
}
Expand All @@ -392,8 +397,10 @@ func (a *ApplicationIOController) _AppRequest(apdu _PDU) {

// if this was an unconfirmed request, it's complete, no message
if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); ok {
// TODO: where to get the destination now again??
a._AppComplete(nil, apdu)
if err := a._AppComplete(apdu.GetPDUDestination(), apdu); err != nil {
log.Error().Err(err).Msg("AppRequest failed")
return
}
}
}

Expand All @@ -413,9 +420,7 @@ func (a *ApplicationIOController) Confirmation(apdu _PDU) error {
log.Debug().Msgf("Confirmation\n%s", apdu)

// this is an ack, error, reject or abort
// TODO: where to get the destination now again??
a._AppComplete(nil, apdu)
return nil
return a._AppComplete(apdu.GetPDUDestination(), apdu)
}

type BIPSimpleApplication struct {
Expand Down
5 changes: 4 additions & 1 deletion plc4go/internal/bacnetip/Core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package bacnetip

func Deferred(fn func()) {
import "github.com/rs/zerolog/log"

func Deferred(fn func() error) {
log.Debug().Msg("Deferred")
// TODO: implement me
}
16 changes: 9 additions & 7 deletions plc4go/internal/bacnetip/IOCBModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,19 +741,19 @@ func (i *IOQController) AbortIO(iocb _IOCB, err error) error {
}

// _trigger Called to launch the next request in the queue
func (i *IOQController) _trigger() {
func (i *IOQController) _trigger() error {
log.Debug().Msg("_trigger")

// if we are busy, do nothing
if i.state != IOQControllerStates_CTRL_IDLE {
log.Debug().Msg("not idle")
return
return nil
}

// if there is nothing to do, return
if len(i.ioQueue.queue) == 0 {
log.Debug().Msg("empty queue")
return
return nil
}

// get the next iocb
Expand All @@ -766,25 +766,26 @@ func (i *IOQController) _trigger() {
// if there was an error, abort the request
if err := i.Abort(err); err != nil {
log.Debug().Err(err).Msg("error aborting")
return
return nil
}
return
return nil
}

// if we're idle, call again
if i.state == IOQControllerStates_CTRL_IDLE {
Deferred(i._trigger)
}
return nil
}

// _waitTrigger is called to launch the next request in the queue
func (i *IOQController) _waitTrigger() {
func (i *IOQController) _waitTrigger() error {
log.Debug().Msg("_waitTrigger")

// make sure we are waiting
if i.state != IOQControllerStates_CTRL_WAITING {
log.Debug().Msg("not waiting")
return
return nil
}

// change our state
Expand All @@ -793,6 +794,7 @@ func (i *IOQController) _waitTrigger() {

// look for more to do
i._trigger()
return nil
}

type SieveQueue struct {
Expand Down
28 changes: 19 additions & 9 deletions plc4go/internal/bacnetip/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type ApplicationLayerMessageCodec struct {
}

func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
// TODO: currently this is done by the BIP down below
// Have the transport create a new transport-instance.
transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
if err != nil {
return nil, errors.Wrap(err, "error creating transport instance")
}
//transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
//if err != nil {
// return nil, errors.Wrap(err, "error creating transport instance")
//}
a := &ApplicationLayerMessageCodec{
localAddress: localAddress,
remoteAddress: remoteAddress,
Expand All @@ -61,12 +62,15 @@ func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl u
// TODO: workaround for strange address parsing
at := AddressTuple[string, uint16]{fmt.Sprintf("%d.%d.%d.%d", address.AddrAddress[0], address.AddrAddress[1], address.AddrAddress[2], address.AddrAddress[3]), *address.AddrPort}
address.AddrTuple = &at
application, err := NewBIPSimpleApplication(&LocalDeviceObject{}, *address, &a.deviceInfoCache, nil)
application, err := NewBIPSimpleApplication(&LocalDeviceObject{
NumberOfAPDURetries: func() *uint { retries := uint(10); return &retries }(),
}, *address, &a.deviceInfoCache, nil)
if err != nil {
return nil, err
}
a.bipSimpleApplication = application
a.messageCode = NewMessageCodec(transportInstance)
// TODO: this is currently done by the BIP
//a.messageCode = NewMessageCodec(transportInstance)
return a, nil
}

Expand All @@ -75,11 +79,15 @@ func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec {
}

func (m *ApplicationLayerMessageCodec) Connect() error {
return m.messageCode.Connect()
// TODO: this is currently done by the BIP
// return m.messageCode.Connect()
return nil
}

func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error {
return m.messageCode.ConnectWithContext(ctx)
// TODO: this is currently done by the BIP
// return m.messageCode.ConnectWithContext(ctx)
return nil
}

func (m *ApplicationLayerMessageCodec) Disconnect() error {
Expand Down Expand Up @@ -132,7 +140,9 @@ func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message
}

func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
return m.messageCode.GetDefaultIncomingMessageChannel()
// TODO: this is currently done by the BIP
//return m.messageCode.GetDefaultIncomingMessageChannel()
return make(chan spi.Message)
}

type MessageCodec struct {
Expand Down
5 changes: 3 additions & 2 deletions plc4go/internal/bacnetip/NetworkService.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,14 @@ func NewNetworkServiceElement(eid *int) (*NetworkServiceElement, error) {
return n, nil
}

func (n *NetworkServiceElement) Startup() {
func (n *NetworkServiceElement) Startup() error {
log.Debug().Msg("Startup")

// reference the service access point
sap := n.elementService.(*NetworkServiceAccessPoint) // TODO: hard cast but seems like adapters apears first in network service access point (so hard binding)
sap := n.elementService.(*NetworkServiceAccessPoint) // TODO: hard cast but seems like adapters appears first in network service access point (so hard binding)
log.Debug().Msgf("sap: %v", sap)

// loop through all the adapters
// TODO: no adapters yet
return nil
}
Loading

0 comments on commit 609f6af

Please sign in to comment.