Skip to content

Commit

Permalink
refactor(plc4go/bacnet): restructure code to hook in application layer
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Nov 15, 2022
1 parent a331d50 commit c5639a6
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 287 deletions.
3 changes: 2 additions & 1 deletion plc4go/internal/bacnetip/ApplicationModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"hash/fnv"
"net"
)

type DeviceInfo struct {
Expand Down Expand Up @@ -232,7 +233,7 @@ type BIPSimpleApplication struct {
mux *UDPMultiplexer
}

func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
b := &BIPSimpleApplication{}
var err error
b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
Expand Down
25 changes: 19 additions & 6 deletions plc4go/internal/bacnetip/Device.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
return nil, nil
}

var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16

// _LocalDeviceObjectDefault is a device entry with default entries
var _LocalDeviceObjectDefault = LocalDeviceObject{
MaximumApduLengthAccepted: &defaultMaxApduLength,
SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
MaxSegmentsAccepted: &defaultMaxSegmentsAccepted,
APDUSegmentTimeout: 5000,
APDUTimeout: 3000,
NumberOfAPDURetries: 3,
}

type LocalDeviceObject struct {
NumberOfAPDURetries uint
APDUTimeout uint
Expand All @@ -40,11 +53,11 @@ type LocalDeviceObject struct {

func NewLocalDeviceObject() *LocalDeviceObject {
return &LocalDeviceObject{
NumberOfAPDURetries: 0,
APDUTimeout: 0,
SegmentationSupported: 0,
APDUSegmentTimeout: 0,
MaxSegmentsAccepted: nil,
MaximumApduLengthAccepted: nil,
NumberOfAPDURetries: _LocalDeviceObjectDefault.NumberOfAPDURetries,
APDUTimeout: _LocalDeviceObjectDefault.APDUTimeout,
SegmentationSupported: _LocalDeviceObjectDefault.SegmentationSupported,
APDUSegmentTimeout: _LocalDeviceObjectDefault.APDUSegmentTimeout,
MaxSegmentsAccepted: _LocalDeviceObjectDefault.MaxSegmentsAccepted,
MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
}
}
96 changes: 0 additions & 96 deletions plc4go/internal/bacnetip/DeviceInventory.go

This file was deleted.

84 changes: 45 additions & 39 deletions plc4go/internal/bacnetip/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net"
"net/url"
"strconv"
"sync"

"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
Expand All @@ -41,15 +42,16 @@ import (

type Driver struct {
_default.DefaultDriver
applicationManager ApplicationManager
tm spi.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
DeviceInventory DeviceInventory
}

func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewTagHandler()),
applicationManager: ApplicationManager{},
tm: *spi.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
Expand Down Expand Up @@ -87,44 +89,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
return ch
}

var localAddr *net.UDPAddr
{
host := transportUrl.Host
port := transportUrl.Port()
if transportUrl.Port() == "" {
port = options["defaultUdpPort"][0]
}
var remoteAddr *net.UDPAddr
if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
panic(err)
} else {
remoteAddr = resolvedRemoteAddr
}
if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msg("host unreachable")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host))
}()
return ch
} else {
localAddr = dial.LocalAddr().(*net.UDPAddr)
localAddr.Port, _ = strconv.Atoi(port)
_ = dial.Close()
}
}
// Have the transport create a new transport-instance.
transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddr)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %v", transportUrl))
}()
return ch
}

codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options)
log.Debug().Msgf("working with codec %#v", codec)

// Create the new connection
Expand All @@ -144,3 +109,44 @@ func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem), discov
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}

type ApplicationManager struct {
sync.Mutex
applications map[string]spi.MessageCodec
}

func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
var localAddress *net.UDPAddr
{
host := transportUrl.Host
port := transportUrl.Port()
if transportUrl.Port() == "" {
port = options["defaultUdpPort"][0]
}
var remoteAddr *net.UDPAddr
if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
panic(err)
} else {
remoteAddr = resolvedRemoteAddr
}
if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
return nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host)
} else {
localAddress = dial.LocalAddr().(*net.UDPAddr)
localAddress.Port, _ = strconv.Atoi(port)
_ = dial.Close()
}
}
a.Lock()
defer a.Unlock()
messageCodec, ok := a.applications[localAddress.String()]
if !ok {
newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
if err != nil {
return nil, errors.Wrap(err, "error creating application layer code")
}
a.applications[localAddress.String()] = newMessageCodec
return newMessageCodec, nil
}
return messageCodec, nil
}
65 changes: 61 additions & 4 deletions plc4go/internal/bacnetip/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,80 @@
package bacnetip

import (
"context"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"net"
"net/url"
"time"
)

// ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
type ApplicationLayerMessageCodec struct {
TransactionStateMachine
bipSimpleApplication *BIPSimpleApplication
messageCode *MessageCodec
deviceInfoCache DeviceInfoCache
}

func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
return &ApplicationLayerMessageCodec{
NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory),
func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
// Have the transport create a new transport-instance.
transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
if err != nil {
return nil, errors.Wrap(err, "error creating transport instance")
}
_ = transportInstance
a := &ApplicationLayerMessageCodec{}
application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
if err != nil {
return nil, err
}
a.bipSimpleApplication = application
a.messageCode = NewMessageCodec(transportInstance)
return a, nil
}

func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec {
return m
}

func (m *ApplicationLayerMessageCodec) Connect() error {
return m.messageCode.Connect()
}

func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error {
return m.messageCode.ConnectWithContext(ctx)
}

func (m *ApplicationLayerMessageCodec) Disconnect() error {
if err := m.bipSimpleApplication.Close(); err != nil {
log.Error().Err(err).Msg("error closing application")
}
return m.messageCode.Disconnect()
}

func (m *ApplicationLayerMessageCodec) IsRunning() bool {
return m.messageCode.IsRunning()
}

func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
panic("not yet mapped")
}

func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
panic("not yet mapped")
}

func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
panic("not yet mapped")
}

func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
return m.messageCode.GetDefaultIncomingMessageChannel()
}

type MessageCodec struct {
Expand Down
Loading

0 comments on commit c5639a6

Please sign in to comment.