From c5639a65fddbfa6f159649330ebe0cf31742e928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20R=C3=BChl?= Date: Mon, 14 Nov 2022 17:03:26 +0100 Subject: [PATCH] refactor(plc4go/bacnet): restructure code to hook in application layer --- plc4go/internal/bacnetip/ApplicationModule.go | 3 +- plc4go/internal/bacnetip/Device.go | 25 +++- plc4go/internal/bacnetip/DeviceInventory.go | 96 ------------ plc4go/internal/bacnetip/Driver.go | 84 ++++++----- plc4go/internal/bacnetip/MessageCodec.go | 65 +++++++- .../bacnetip/TransactionStateMachine.go | 141 ------------------ 6 files changed, 127 insertions(+), 287 deletions(-) delete mode 100644 plc4go/internal/bacnetip/DeviceInventory.go delete mode 100644 plc4go/internal/bacnetip/TransactionStateMachine.go diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go index 0ae673addde..41d0107507b 100644 --- a/plc4go/internal/bacnetip/ApplicationModule.go +++ b/plc4go/internal/bacnetip/ApplicationModule.go @@ -27,6 +27,7 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog/log" "hash/fnv" + "net" ) type DeviceInfo struct { @@ -232,7 +233,7 @@ type BIPSimpleApplication struct { mux *UDPMultiplexer } -func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) { +func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) { b := &BIPSimpleApplication{} var err error b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID) diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go index cf0e825dddc..33c36205e67 100644 --- a/plc4go/internal/bacnetip/Device.go +++ b/plc4go/internal/bacnetip/Device.go @@ -29,6 +29,19 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) { return nil, nil } +var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024 +var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16 + +// _LocalDeviceObjectDefault is a device entry with default entries +var _LocalDeviceObjectDefault = LocalDeviceObject{ + MaximumApduLengthAccepted: &defaultMaxApduLength, + SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH, + MaxSegmentsAccepted: &defaultMaxSegmentsAccepted, + APDUSegmentTimeout: 5000, + APDUTimeout: 3000, + NumberOfAPDURetries: 3, +} + type LocalDeviceObject struct { NumberOfAPDURetries uint APDUTimeout uint @@ -40,11 +53,11 @@ type LocalDeviceObject struct { func NewLocalDeviceObject() *LocalDeviceObject { return &LocalDeviceObject{ - NumberOfAPDURetries: 0, - APDUTimeout: 0, - SegmentationSupported: 0, - APDUSegmentTimeout: 0, - MaxSegmentsAccepted: nil, - MaximumApduLengthAccepted: nil, + NumberOfAPDURetries: _LocalDeviceObjectDefault.NumberOfAPDURetries, + APDUTimeout: _LocalDeviceObjectDefault.APDUTimeout, + SegmentationSupported: _LocalDeviceObjectDefault.SegmentationSupported, + APDUSegmentTimeout: _LocalDeviceObjectDefault.APDUSegmentTimeout, + MaxSegmentsAccepted: _LocalDeviceObjectDefault.MaxSegmentsAccepted, + MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted, } } diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go deleted file mode 100644 index 86f3f5c6903..00000000000 --- a/plc4go/internal/bacnetip/DeviceInventory.go +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package bacnetip - -import ( - readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model" - "github.com/pkg/errors" - "sync" - "time" -) - -// TODO: migrate into device info cache -type DeviceInventory struct { - sync.RWMutex - devices map[string]DeviceEntry -} - -func (d *DeviceInventory) getEntryForDestination(destination []uint8) (*DeviceEntry, error) { - d.RLock() - defer d.RUnlock() - deviceKey := string(destination) - deviceEntry, ok := d.devices[deviceKey] - if !ok { - return nil, errors.Errorf("no entry found for device key %s", deviceKey) - } - return &deviceEntry, nil -} - -var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024 -var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16 - -// DeviceEntryDefault is a device entry with default entries -var DeviceEntryDefault = DeviceEntry{ - DeviceIdentifier: nil, - MaximumApduLengthAccepted: &defaultMaxApduLength, - MaximumNpduLength: nil, //note as we are ip we don't care about this - SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH, - MaxSegmentsAccepted: &defaultMaxSegmentsAccepted, - APDUSegmentTimeout: 5000, - APDUTimeout: 3000, - NumberOfAPDURetries: 3, -} - -// TODO: switch that to a pointer and all entries that might be missing too -type DeviceEntry struct { - DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier - MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted - MaximumNpduLength *uint - SegmentationSupported readWriteModel.BACnetSegmentation - MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted - APDUSegmentTimeout uint - APDUTimeout uint - NumberOfAPDURetries uint - VendorId readWriteModel.BACnetVendorId - DeviceObjects []DeviceObject -} - -func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject { - var deviceObjects []DeviceObject - for _, object := range d.DeviceObjects { - shouldBeAdded := true - for _, objectFilter := range filter { - shouldBeAdded = shouldBeAdded && objectFilter(object) - } - if shouldBeAdded { - deviceObjects = append(deviceObjects, object) - } - } - return deviceObjects -} - -type DeviceObjectFilter func(DeviceObject) bool - -type DeviceObject struct { - ObjectName string - ObjectIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier - CachedObjectValue interface{} - TimeOfCache time.Time -} diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go index a37aca13d71..330939d8bbf 100644 --- a/plc4go/internal/bacnetip/Driver.go +++ b/plc4go/internal/bacnetip/Driver.go @@ -26,6 +26,7 @@ import ( "net" "net/url" "strconv" + "sync" "github.com/apache/plc4x/plc4go/pkg/api" apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" @@ -41,15 +42,16 @@ import ( type Driver struct { _default.DefaultDriver + applicationManager ApplicationManager tm spi.RequestTransactionManager awaitSetupComplete bool awaitDisconnectComplete bool - DeviceInventory DeviceInventory } func NewDriver() plc4go.PlcDriver { return &Driver{ DefaultDriver: _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewTagHandler()), + applicationManager: ApplicationManager{}, tm: *spi.NewRequestTransactionManager(math.MaxInt), awaitSetupComplete: true, awaitDisconnectComplete: true, @@ -87,44 +89,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans return ch } - var localAddr *net.UDPAddr - { - host := transportUrl.Host - port := transportUrl.Port() - if transportUrl.Port() == "" { - port = options["defaultUdpPort"][0] - } - var remoteAddr *net.UDPAddr - if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil { - panic(err) - } else { - remoteAddr = resolvedRemoteAddr - } - if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil { - log.Error().Stringer("transportUrl", &transportUrl).Msg("host unreachable") - ch := make(chan plc4go.PlcConnectionConnectResult) - go func() { - ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host)) - }() - return ch - } else { - localAddr = dial.LocalAddr().(*net.UDPAddr) - localAddr.Port, _ = strconv.Atoi(port) - _ = dial.Close() - } - } - // Have the transport create a new transport-instance. - transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddr) - if err != nil { - log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"]) - ch := make(chan plc4go.PlcConnectionConnectResult) - go func() { - ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %v", transportUrl)) - }() - return ch - } - - codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory) + codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options) log.Debug().Msgf("working with codec %#v", codec) // Create the new connection @@ -144,3 +109,44 @@ func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem), discov func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error { return NewDiscoverer().Discover(ctx, callback, discoveryOptions...) } + +type ApplicationManager struct { + sync.Mutex + applications map[string]spi.MessageCodec +} + +func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) { + var localAddress *net.UDPAddr + { + host := transportUrl.Host + port := transportUrl.Port() + if transportUrl.Port() == "" { + port = options["defaultUdpPort"][0] + } + var remoteAddr *net.UDPAddr + if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil { + panic(err) + } else { + remoteAddr = resolvedRemoteAddr + } + if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil { + return nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host) + } else { + localAddress = dial.LocalAddr().(*net.UDPAddr) + localAddress.Port, _ = strconv.Atoi(port) + _ = dial.Close() + } + } + a.Lock() + defer a.Unlock() + messageCodec, ok := a.applications[localAddress.String()] + if !ok { + newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress) + if err != nil { + return nil, errors.Wrap(err, "error creating application layer code") + } + a.applications[localAddress.String()] = newMessageCodec + return newMessageCodec, nil + } + return messageCodec, nil +} diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go index 528884a7e8f..670cc00ddc5 100644 --- a/plc4go/internal/bacnetip/MessageCodec.go +++ b/plc4go/internal/bacnetip/MessageCodec.go @@ -20,23 +20,80 @@ package bacnetip import ( + "context" "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model" "github.com/apache/plc4x/plc4go/spi" "github.com/apache/plc4x/plc4go/spi/default" "github.com/apache/plc4x/plc4go/spi/transports" + "github.com/apache/plc4x/plc4go/spi/transports/udp" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "net" + "net/url" + "time" ) // ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc. type ApplicationLayerMessageCodec struct { - TransactionStateMachine + bipSimpleApplication *BIPSimpleApplication + messageCode *MessageCodec + deviceInfoCache DeviceInfoCache } -func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec { - return &ApplicationLayerMessageCodec{ - NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory), +func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) { + // Have the transport create a new transport-instance. + transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress) + if err != nil { + return nil, errors.Wrap(err, "error creating transport instance") + } + _ = transportInstance + a := &ApplicationLayerMessageCodec{} + application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil) + if err != nil { + return nil, err } + a.bipSimpleApplication = application + a.messageCode = NewMessageCodec(transportInstance) + return a, nil +} + +func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec { + return m +} + +func (m *ApplicationLayerMessageCodec) Connect() error { + return m.messageCode.Connect() +} + +func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error { + return m.messageCode.ConnectWithContext(ctx) +} + +func (m *ApplicationLayerMessageCodec) Disconnect() error { + if err := m.bipSimpleApplication.Close(); err != nil { + log.Error().Err(err).Msg("error closing application") + } + return m.messageCode.Disconnect() +} + +func (m *ApplicationLayerMessageCodec) IsRunning() bool { + return m.messageCode.IsRunning() +} + +func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error { + panic("not yet mapped") +} + +func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error { + panic("not yet mapped") +} + +func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error { + panic("not yet mapped") +} + +func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message { + return m.messageCode.GetDefaultIncomingMessageChannel() } type MessageCodec struct { diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go deleted file mode 100644 index 2298b8f2311..00000000000 --- a/plc4go/internal/bacnetip/TransactionStateMachine.go +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package bacnetip - -import ( - "context" - readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model" - "github.com/apache/plc4x/plc4go/spi" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - "time" -) - -// TransactionStateMachine is the implementation of the bacnet transaction state machine -type TransactionStateMachine struct { - *MessageCodec - deviceInventory *DeviceInventory - retryCount int - segmentRetryCount int - duplicateCount int - sentAllSegments bool - lastSequenceNumber int - initialSequenceNumber int - actualWindowSize int - proposeWindowSize int - segmentTimer int - RequestTimer int -} - -func NewTransactionStateMachine(messageCodec *MessageCodec, deviceInventory *DeviceInventory) TransactionStateMachine { - return TransactionStateMachine{ - MessageCodec: messageCodec, - deviceInventory: deviceInventory, - retryCount: 3, - segmentRetryCount: 3, - duplicateCount: 0, - sentAllSegments: false, - lastSequenceNumber: 0, - initialSequenceNumber: 0, - actualWindowSize: 0, - proposeWindowSize: 2, - segmentTimer: 1500, - RequestTimer: 3000, - } -} - -func (t *TransactionStateMachine) GetCodec() spi.MessageCodec { - return t -} - -func (t *TransactionStateMachine) Send(message spi.Message) error { - if handled, err := t.handleOutboundMessage(message); handled { - return nil - } else if err != nil { - return errors.Wrap(err, "Error handling message") - } else { - return t.MessageCodec.Send(message) - } -} - -func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error { - // TODO: detect overflow - return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl) -} - -func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error { - // Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect` - if err := ctx.Err(); err != nil { - return errors.Wrap(err, "Not sending message as context is aborted") - } - log.Trace().Msg("Sending request") - // Send the actual message - err := t.Send(message) - if err != nil { - return errors.Wrap(err, "Error sending the request") - } - return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl) -} - -func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) { - switch message := message.(type) { - case readWriteModel.BVLCExactly: - bvlc := message - var npdu readWriteModel.NPDU - if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok { - npdu = npduRetriever.GetNpdu() - } else { - log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc) - return false, nil - } - if npdu.GetControl().GetMessageTypeFieldPresent() { - log.Trace().Msg("Message type field present") - return false, nil - } - var entryForDestination = DeviceEntryDefault - if npdu.GetControl().GetDestinationSpecified() { - if retrievedEntry, err := t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil { - // Get information from the device first - // TODO: get information with who-has maybe or directed... not sure now - // TODO: set entry once received - _ = retrievedEntry - } - } - // TODO: should we continue if we don't have a destination - _ = entryForDestination - apdu := npdu.GetApdu() - switch apdu := apdu.(type) { - case readWriteModel.APDUConfirmedRequestExactly: - // TODO: this is a "client" request - // TODO: check if adpu length is the magic number (it should be "unencoded") - return false, nil - case readWriteModel.APDUComplexAckExactly: - // TODO: this is a "server" response - // TODO: check if adpu length is the magic number (it should be "unencoded") - return false, nil - default: - log.Trace().Msgf("APDU type not relevant %T present", apdu) - return false, nil - } - default: - log.Trace().Msgf("Message type not relevant %T present", message) - return false, nil - } -}