Skip to content

Commit

Permalink
refactor(plc4go/bacnet): added more application code for protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Nov 15, 2022
1 parent c5639a6 commit 25c5e94
Show file tree
Hide file tree
Showing 9 changed files with 987 additions and 151 deletions.
137 changes: 69 additions & 68 deletions plc4go/internal/bacnetip/ApplicationLayer.go

Large diffs are not rendered by default.

219 changes: 207 additions & 12 deletions plc4go/internal/bacnetip/ApplicationModule.go
Expand Up @@ -23,6 +23,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
"github.com/apache/plc4x/plc4go/internal/bacnetip/service"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -199,30 +201,223 @@ func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
return nil
}

// TODO: implement
// TODO: finish
type Application struct {
ApplicationServiceElement
*ApplicationServiceElement
Collector

objectName map[string]*local.LocalDeviceObject
objectIdentifier map[string]*local.LocalDeviceObject
localDevice *local.LocalDeviceObject
deviceInfoCache *DeviceInfoCache
controllers map[string]interface{}
helpers map[string]func(apdu readWriteModel.APDU) error
}

func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%v aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
a := &Application{}
var err error
a.ApplicationServiceElement, err = NewApplicationServiceElement(aseID, a)
if err != nil {
return nil, err
}

// local objects by ID and name
a.objectName = map[string]*local.LocalDeviceObject{}
a.objectIdentifier = map[string]*local.LocalDeviceObject{}

// keep track of the local device
if localDevice != nil {
a.localDevice = localDevice

// bind the device object to this application
localDevice.App = a

// local objects by ID and name
a.objectName[localDevice.ObjectName] = localDevice
a.objectName[localDevice.ObjectIdentifier] = localDevice
}

// use the provided cache or make a default one
if deviceInfoCache == nil {
var newDeviceInfoCache DeviceInfoCache
deviceInfoCache = &newDeviceInfoCache
}
a.deviceInfoCache = deviceInfoCache

// controllers for managing confirmed requests as a client
a.controllers = map[string]interface{}{}

// now set up the rest of the capabilities
a.Collector = Collector{}

// TODO: no idea how to handle the capabilities
return a, nil
}

func (a *Application) Request(apdu readWriteModel.APDU) error {
log.Debug().Msgf("Request\n%s", apdu)

// double-check the input is the right kind of APDU
switch apdu.(type) {
case readWriteModel.APDUUnconfirmedRequestExactly, readWriteModel.APDUConfirmedRequestExactly:
default:
return errors.New("APDU expected")
}
return a.ApplicationServiceElement.Request(apdu)
}

// TODO: implement
type IOController struct {
func (a *Application) Indication(apdu readWriteModel.APDU) error {
log.Debug().Msgf("Indication\n%s", apdu)

// get a helper function
helperName := fmt.Sprintf("do_%T", apdu)
helperFn := a.helpers[helperName]
log.Debug().Msgf("helperFn: %s == %t", helperName, helperFn != nil)

// send back a reject for unrecognized services
if helperFn == nil {
if _, ok := apdu.(readWriteModel.APDUConfirmedRequestExactly); ok {
return errors.Errorf("no function %s", helperName)
}
return nil
}

if err := helperFn(apdu); err != nil {
log.Debug().Err(err).Msgf("err result")
// TODO: do proper mapping
a.Response(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0))
}

return nil
}

// TODO: implement
// TODO: finish
type ApplicationIOController struct {
IOController
Application
*IOController
*Application
queueByAddress map[string]SieveQueue
}

func NewApplicationIOController(interface{}, interface{}, interface{}, *int) (*ApplicationIOController, error) {
return &ApplicationIOController{}, nil
func NewApplicationIOController(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*ApplicationIOController, error) {
a := &ApplicationIOController{
// queues for each address
queueByAddress: make(map[string]SieveQueue),
}
var err error
a.IOController, err = NewIOController("", a)
if err != nil {
return nil, errors.Wrap(err, "error creating io controller")
}
a.Application, err = NewApplication(localDevice, localAddress, deviceInfoCache, aseID)
if err != nil {
return nil, errors.Wrap(err, "error creating application")
}
return a, nil
}

func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
log.Debug().Msgf("ProcessIO %s", iocb)

// get the destination address from the pdu
destinationAddress := iocb.getDestination()
log.Debug().Msgf("destinationAddress %s", destinationAddress)

// look up the queue
queue, ok := a.queueByAddress[destinationAddress.String()]
if !ok {
newQueue, _ := NewSieveQueue(a._AppRequest, destinationAddress)
queue = *newQueue
a.queueByAddress[destinationAddress.String()] = queue
}
log.Debug().Msgf("queue %v", queue)

// ask the queue to process the request
return queue.RequestIO(iocb)
}

func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteModel.APDU) error {
log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)

// look up the queue
queue, ok := a.queueByAddress[address.String()]
if !ok {
log.Debug().Msgf("no queue for %s", address)
return nil
}
log.Debug().Msgf("queue %v", queue)

// make sure it has an active iocb
if queue.activeIOCB == nil {
log.Debug().Msgf("no active request for %s", address)
return nil
}

// this request is complete
switch apdu.(type) {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
queue.CompleteIO(queue.activeIOCB, apdu)
case readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly, readWriteModel.APDUAbortExactly:
// TODO: extract error
queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu))
default:
return errors.New("unrecognized APDU type")
}
log.Debug().Msg("controller finished")
// if the queue is empty and idle, forget about the controller
if len(queue.ioQueue.queue) == 0 && queue.activeIOCB == nil {
delete(a.queueByAddress, address.String())
}
return nil
}

func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
log.Debug().Msgf("_AppRequest\n%s", apdu)

if err := a.Request(apdu); err != nil {
log.Error().Err(err).Msg("Uh oh")
return
}

// send it downstream, bypass the guard
if err := a.Application.Request(apdu); err != nil {
log.Error().Err(err).Msg("Uh oh")
return
}

// if this was an unconfirmed request, it's complete, no message
if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); ok {
// TODO: where to get the destination now again??
a._AppComplete(nil, apdu)
}
}

func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
log.Debug().Msgf("Request\n%s", apdu)

// if this is not unconfirmed request, tell the application to use the IOCB interface
if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); !ok {
return errors.New("use IOCB for confirmed requests")
}

// send it downstream
return a.Application.Request(apdu)
}

func (a *ApplicationIOController) Confirmation(apdu readWriteModel.APDU) error {
log.Debug().Msgf("Confirmation\n%s", apdu)

// this is an ack, error, reject or abort
// TODO: where to get the destination now again??
a._AppComplete(nil, apdu)
return nil
}

type BIPSimpleApplication struct {
*ApplicationIOController
*WhoIsIAmServices
*ReadWritePropertyServices
*service.WhoIsIAmServices
*service.ReadWritePropertyServices
localAddress interface{}
asap *ApplicationServiceAccessPoint
smap *StateMachineAccessPoint
Expand All @@ -233,7 +428,7 @@ type BIPSimpleApplication struct {
mux *UDPMultiplexer
}

func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
b := &BIPSimpleApplication{}
var err error
b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
Expand Down
63 changes: 0 additions & 63 deletions plc4go/internal/bacnetip/Device.go

This file was deleted.

8 changes: 4 additions & 4 deletions plc4go/internal/bacnetip/Driver.go
Expand Up @@ -112,18 +112,18 @@ func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event ap

type ApplicationManager struct {
sync.Mutex
applications map[string]spi.MessageCodec
applications map[string]*ApplicationLayerMessageCodec
}

func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
var localAddress *net.UDPAddr
var remoteAddr *net.UDPAddr
{
host := transportUrl.Host
port := transportUrl.Port()
if transportUrl.Port() == "" {
port = options["defaultUdpPort"][0]
}
var remoteAddr *net.UDPAddr
if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
panic(err)
} else {
Expand All @@ -141,7 +141,7 @@ func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Trans
defer a.Unlock()
messageCodec, ok := a.applications[localAddress.String()]
if !ok {
newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress, remoteAddr)
if err != nil {
return nil, errors.Wrap(err, "error creating application layer code")
}
Expand Down

0 comments on commit 25c5e94

Please sign in to comment.