Skip to content

Commit

Permalink
feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Nov 3, 2022
1 parent 46cc1e1 commit 4955117
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 2 deletions.
82 changes: 82 additions & 0 deletions plc4go/internal/bacnetip/DeviceInventory.go
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package bacnetip

import (
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
"math"
"sync"
"time"
)

type DeviceInventory struct {
sync.RWMutex
devices map[string]DeviceEntry
}

func (d *DeviceInventory) getEntryForDestination(destination []uint8) (DeviceEntry, error) {
d.RLock()
defer d.RUnlock()
deviceKey := string(destination)
deviceEntry, ok := d.devices[deviceKey]
if !ok {
return NoDeviceEntry, errors.Errorf("no entry found for device key %s", deviceKey)
}
return deviceEntry, nil
}

var NoDeviceEntry = DeviceEntry{
MaximumApduLengthAcceptedLength: readWriteModel.NewBACnetTagPayloadUnsignedInteger(nil, nil, nil, nil, nil, nil, nil, func() *uint64 {
var maxUint64 uint64 = math.MaxUint64
return &maxUint64
}(), 4),
}

type DeviceEntry struct {
DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
MaximumApduLengthAcceptedLength readWriteModel.BACnetTagPayloadUnsignedInteger
SegmentationSupported bool
VendorId readWriteModel.BACnetVendorId
DeviceObjects []DeviceObject
}

func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
var deviceObjects []DeviceObject
for _, object := range d.DeviceObjects {
shouldBeAdded := true
for _, objectFilter := range filter {
shouldBeAdded = shouldBeAdded && objectFilter(object)
}
if shouldBeAdded {
deviceObjects = append(deviceObjects, object)
}
}
return deviceObjects
}

type DeviceObjectFilter func(DeviceObject) bool

type DeviceObject struct {
ObjectName string
ObjectIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
CachedObjectValue interface{}
TimeOfCache time.Time
}
3 changes: 2 additions & 1 deletion plc4go/internal/bacnetip/Driver.go
Expand Up @@ -42,6 +42,7 @@ type Driver struct {
tm spi.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
DeviceInventory DeviceInventory
}

func NewDriver() plc4go.PlcDriver {
Expand Down Expand Up @@ -121,7 +122,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
return ch
}

codec := NewMessageCodec(transportInstance)
codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
log.Debug().Msgf("working with codec %#v", codec)

// Create the new connection
Expand Down
14 changes: 14 additions & 0 deletions plc4go/internal/bacnetip/MessageCodec.go
Expand Up @@ -28,6 +28,20 @@ import (
"github.com/rs/zerolog/log"
)

// ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
type ApplicationLayerMessageCodec struct {
TransactionStateMachine
}

func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
return &ApplicationLayerMessageCodec{
TransactionStateMachine{
MessageCodec: NewMessageCodec(transportInstance),
deviceInventory: deviceInventory,
},
}
}

type MessageCodec struct {
_default.DefaultCodec
}
Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/bacnetip/Reader.go
Expand Up @@ -189,7 +189,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
bvlc, ok := message.(readWriteModel.BVLC)
bvlc, ok := message.(readWriteModel.BVLCExactly)
if !ok {
log.Debug().Msgf("Received strange type %T", bvlc)
return false
Expand Down
139 changes: 139 additions & 0 deletions plc4go/internal/bacnetip/TransactionStateMachine.go
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package bacnetip

import (
"context"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"time"
)

// TransactionStateMachine is the implementation of the bacnet transaction state machine
type TransactionStateMachine struct {
*MessageCodec
deviceInventory *DeviceInventory
retryCount int
segmentRetryCount int
duplicateCount int
sentAllSegments bool
lastSequenceNumber int
initialSequenceNumber int
actualWindowSize int
proposeWindowSize int
segmentTimer int
RequestTimer int
}

func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
return t
}

func (t *TransactionStateMachine) Send(message spi.Message) error {
if handled, err := t.handleOutboundMessage(message); handled {
return nil
} else if err != nil {
return errors.Wrap(err, "Error handling message")
} else {
return t.MessageCodec.Send(message)
}
}

func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
// TODO: detect overflow
return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
}

func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
// Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "Not sending message as context is aborted")
}
log.Trace().Msg("Sending request")
// Send the actual message
err := t.Send(message)
if err != nil {
return errors.Wrap(err, "Error sending the request")
}
return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
}

func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
switch message := message.(type) {
case readWriteModel.BVLCExactly:
bvlc := message
var npdu readWriteModel.NPDU
if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
npdu = npduRetriever.GetNpdu()
} else {
log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
return false, nil
}
if npdu.GetControl().GetMessageTypeFieldPresent() {
log.Trace().Msg("No message type field present")
return false, nil
}
var entryForDestination = NoDeviceEntry
if npdu.GetControl().GetDestinationSpecified() {
var err error
if entryForDestination, err = t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
// Get information from the device first
// TODO: get information with who-has maybe or directed... not sure now
// TODO: set entry once received
}
}
// TODO: should we continue if we don't have a destination
_ = entryForDestination
apdu := npdu.GetApdu()
switch apdu := apdu.(type) {
case readWriteModel.APDUConfirmedRequestExactly:
// TODO: check if adpu length is the magic number (it should be "unencoded")
maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
}
// TODO: handle potential retry
if apduLengthDoesOverflow {
// TODO: handle potential segmentation
}
return false, nil
case readWriteModel.APDUComplexAckExactly:
// TODO: check if adpu length is the magic number (it should be "unencoded")
maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
}
if apduLengthDoesOverflow {
// TODO: handle potential segmentation
}
return false, nil
default:
log.Trace().Msgf("APDU type not relevant %T present", apdu)
return false, nil
}
default:
log.Trace().Msgf("Message type not relevant %T present", message)
return false, nil
}
}

0 comments on commit 4955117

Please sign in to comment.