Skip to content

Commit

Permalink
refactor(plc4go/ads): Refactoring of the go ADS drier
Browse files Browse the repository at this point in the history
- Got a first working implementation of subscriptions for ADS in Go
  • Loading branch information
chrisdutz committed Nov 29, 2022
1 parent f2a3f75 commit 554d756
Show file tree
Hide file tree
Showing 21 changed files with 562 additions and 189 deletions.
7 changes: 4 additions & 3 deletions plc4go/internal/ads/Browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"strings"

"github.com/apache/plc4x/plc4go/internal/ads/model"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
Expand Down Expand Up @@ -113,9 +114,9 @@ func (m *Connection) filterDataTypes(parentName string, currentType driverModel.
})
}
foundTag := &internalModel.DefaultPlcBrowseItem{
Tag: SymbolicPlcTag{
PlcTag: PlcTag{
arrayInfo: arrayInfo,
Tag: model.SymbolicPlcTag{
PlcTag: model.PlcTag{
ArrayInfo: arrayInfo,
},
SymbolicAddress: parentName,
},
Expand Down
68 changes: 53 additions & 15 deletions plc4go/internal/ads/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"

model2 "github.com/apache/plc4x/plc4go/internal/ads/model"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
Expand All @@ -45,12 +46,14 @@ type Connection struct {

messageCodec spi.MessageCodec
requestInterceptor interceptors.RequestInterceptor
configuration Configuration
configuration model2.Configuration
driverContext *DriverContext
tracer *spi.Tracer

subscriptions map[uint32]apiModel.PlcSubscriptionHandle
}

func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, options map[string][]string) (*Connection, error) {
func NewConnection(messageCodec spi.MessageCodec, configuration model2.Configuration, options map[string][]string) (*Connection, error) {
driverContext, err := NewDriverContext(configuration)
if err != nil {
return nil, err
Expand All @@ -59,6 +62,7 @@ func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, o
messageCodec: messageCodec,
configuration: configuration,
driverContext: driverContext,
subscriptions: map[uint32]apiModel.PlcSubscriptionHandle{},
}
if traceEnabledOption, ok := options["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
Expand Down Expand Up @@ -150,20 +154,54 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
return
}

// Start the worker for handling incoming messages
// (Messages that are not responses to outgoing messages)
defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel()
go func() {
for message := range defaultIncomingMessageChannel {
switch message.(type) {
case model.AmsTCPPacket:
amsTCPPacket := message.(model.AmsTCPPacket)
switch amsTCPPacket.GetUserdata().(type) {
// Forward all device notification requests to the subscriber component.
case model.AdsDeviceNotificationRequest:
m.handleIncomingDeviceNotificationRequest(
amsTCPPacket.GetUserdata().(model.AdsDeviceNotificationRequest))
default:
log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
}
default:
log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
}
}
}()

// Subscribe for changes to the symbol or the offline-versions
/*versionChangeRequest, err := m.SubscriptionRequestBuilder().
versionChangeRequest, err := m.SubscriptionRequestBuilder().
AddChangeOfStateTagAddress("offlineVersion", "0xF008/0x0000:USINT").
AddPreRegisteredConsumer("offlineVersion", func(event apiModel.PlcSubscriptionEvent) {
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
log.Error().Err(err).Msg("error updating data-type and symbol tables")
if event.GetResponseCode("offlineVersion") == apiModel.PlcResponseCode_OK {
newVersion := event.GetValue("offlineVersion").GetUint8()
if newVersion != m.driverContext.symbolVersion {
log.Info().Msg("detected offline version change: reloading symbol- and data-type-table.")
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
log.Error().Err(err).Msg("error updating data-type and symbol tables")
}
}
}
}).
AddChangeOfStateTagAddress("onlineVersion", "TwinCAT_SystemInfoVarList._AppInfo.OnlineChangeCnt").
AddPreRegisteredConsumer("onlineVersion", func(event apiModel.PlcSubscriptionEvent) {
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
log.Error().Err(err).Msg("error updating data-type and symbol tables")
if event.GetResponseCode("onlineVersion") == apiModel.PlcResponseCode_OK {
newVersion := event.GetValue("onlineVersion").GetUint32()
if newVersion != m.driverContext.onlineVersion {
log.Info().Msg("detected online version change: reloading symbol- and data-type-table.")
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
log.Error().Err(err).Msg("error updating data-type and symbol tables")
}
}
}
}).
Build()
Expand All @@ -172,7 +210,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
if subscriptionRequestResult.GetErr() != nil {
ch <- _default.NewDefaultPlcConnectionCloseResult(nil, subscriptionRequestResult.GetErr())
return
}*/
}

// Return the finished connection
ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
Expand Down Expand Up @@ -251,7 +289,7 @@ func (m *Connection) readSymbolTable(ctx context.Context, symbolTableSize uint32
return symbols, nil
}

func (m *Connection) resolveSymbolicTag(ctx context.Context, symbolicTag SymbolicPlcTag) (*DirectPlcTag, error) {
func (m *Connection) resolveSymbolicTag(ctx context.Context, symbolicTag model2.SymbolicPlcTag) (*model2.DirectPlcTag, error) {
// Find the initial datatype, based on the first to segments.
symbolicAddress := symbolicTag.SymbolicAddress
addressParts := strings.Split(symbolicAddress, ".")
Expand All @@ -278,7 +316,7 @@ func (m *Connection) resolveSymbolicTag(ctx context.Context, symbolicTag Symboli
return m.resolveSymbolicAddress(ctx, addressParts, dataType, symbol.GetGroup(), symbol.GetOffset())
}

func (m *Connection) resolveSymbolicAddress(ctx context.Context, addressParts []string, curDataType model.AdsDataTypeTableEntry, indexGroup uint32, indexOffset uint32) (*DirectPlcTag, error) {
func (m *Connection) resolveSymbolicAddress(ctx context.Context, addressParts []string, curDataType model.AdsDataTypeTableEntry, indexGroup uint32, indexOffset uint32) (*model2.DirectPlcTag, error) {
// If we've reached then end of the resolution, return the final entry.
if len(addressParts) == 0 {
var arrayInfo []apiModel.ArrayInfo
Expand All @@ -289,9 +327,9 @@ func (m *Connection) resolveSymbolicAddress(ctx context.Context, addressParts []
})
}
plcValueType, stringLength := m.getPlcValueForAdsDataTypeTableEntry(curDataType)
return &DirectPlcTag{
PlcTag: PlcTag{
arrayInfo: arrayInfo,
return &model2.DirectPlcTag{
PlcTag: model2.PlcTag{
ArrayInfo: arrayInfo,
},
IndexGroup: indexGroup,
IndexOffset: indexOffset,
Expand Down
3 changes: 2 additions & 1 deletion plc4go/internal/ads/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"strconv"

"github.com/apache/plc4x/plc4go/internal/ads/model"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
adsModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
codec := NewMessageCodec(transportInstance)
log.Debug().Msgf("working with codec %#v", codec)

configuration, err := ParseFromOptions(options)
configuration, err := model.ParseFromOptions(options)
if err != nil {
log.Error().Err(err).Msgf("Invalid options")
ch := make(chan plc4go.PlcConnectionConnectResult)
Expand Down
13 changes: 7 additions & 6 deletions plc4go/internal/ads/DriverContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"sync/atomic"

model3 "github.com/apache/plc4x/plc4go/internal/ads/model"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
Expand All @@ -45,7 +46,7 @@ type DriverContext struct {
awaitDisconnectComplete bool
}

func NewDriverContext(configuration Configuration) (*DriverContext, error) {
func NewDriverContext(configuration model3.Configuration) (*DriverContext, error) {
return &DriverContext{
invokeId: 0,
}, nil
Expand All @@ -64,7 +65,7 @@ func (m *DriverContext) clear() {
m.awaitDisconnectComplete = false
}

func (m *DriverContext) getDirectTagForSymbolTag(symbolicPlcTag SymbolicPlcTag) (*DirectPlcTag, error) {
func (m *DriverContext) getDirectTagForSymbolTag(symbolicPlcTag model3.SymbolicPlcTag) (*model3.DirectPlcTag, error) {
address := symbolicPlcTag.SymbolicAddress
addressSegments := strings.Split(address, ".")
var symbolName string
Expand All @@ -90,16 +91,16 @@ func (m *DriverContext) getDirectTagForSymbolTag(symbolicPlcTag SymbolicPlcTag)
return m.resolveDirectTag(remainingSegments, dataTypeEntry, symbolEntry.GetGroup(), symbolEntry.GetOffset())
}

func (m *DriverContext) resolveDirectTag(remainingSegments []string, currentDatatype driverModel.AdsDataTypeTableEntry, indexGroup uint32, indexOffset uint32) (*DirectPlcTag, error) {
func (m *DriverContext) resolveDirectTag(remainingSegments []string, currentDatatype driverModel.AdsDataTypeTableEntry, indexGroup uint32, indexOffset uint32) (*model3.DirectPlcTag, error) {
if len(remainingSegments) == 0 {
return &DirectPlcTag{
return &model3.DirectPlcTag{
IndexGroup: indexGroup,
IndexOffset: indexOffset,
ValueType: m.getDataTypeForDataTypeTableEntry(currentDatatype),
StringLength: m.getStringLengthForDataTypeTableEntry(currentDatatype),
DataType: currentDatatype,
PlcTag: PlcTag{
arrayInfo: m.getArrayInfoForDataTypeTableEntry(currentDatatype),
PlcTag: model3.PlcTag{
ArrayInfo: m.getArrayInfoForDataTypeTableEntry(currentDatatype),
},
}, nil
}
Expand Down
24 changes: 12 additions & 12 deletions plc4go/internal/ads/MessageTemplates.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,50 @@ import adsModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"

func (m *Connection) NewAdsReadDeviceInfoRequest() adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsReadDeviceInfoRequest(m.configuration.targetAmsNetId, uint16(adsModel.DefaultAmsPorts_RUNTIME_SYSTEM_01),
adsModel.NewAdsReadDeviceInfoRequest(m.configuration.TargetAmsNetId, uint16(adsModel.DefaultAmsPorts_RUNTIME_SYSTEM_01),
// TODO: Replace 800 with constant.
m.configuration.sourceAmsNetId, 800, 0, m.driverContext.getInvokeId()))
m.configuration.SourceAmsNetId, 800, 0, m.driverContext.getInvokeId()))
}

func (m *Connection) NewAdsReadRequest(indexGroup uint32, indexOffset uint32, length uint32) adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsReadRequest(indexGroup, indexOffset, length,
m.configuration.targetAmsNetId, m.configuration.targetAmsPort,
m.configuration.sourceAmsNetId, m.configuration.sourceAmsPort, 0, m.driverContext.getInvokeId()))
m.configuration.TargetAmsNetId, m.configuration.TargetAmsPort,
m.configuration.SourceAmsNetId, m.configuration.SourceAmsPort, 0, m.driverContext.getInvokeId()))
}

func (m *Connection) NewAdsWriteRequest(indexGroup uint32, indexOffset uint32, data []byte) adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsWriteRequest(
indexGroup, indexOffset, data,
m.configuration.targetAmsNetId, m.configuration.targetAmsPort,
m.configuration.sourceAmsNetId, m.configuration.sourceAmsPort,
m.configuration.TargetAmsNetId, m.configuration.TargetAmsPort,
m.configuration.SourceAmsNetId, m.configuration.SourceAmsPort,
0, m.driverContext.getInvokeId()))
}

func (m *Connection) NewAdsReadWriteRequest(indexGroup uint32, indexOffset uint32, readLength uint32, items []adsModel.AdsMultiRequestItem, writeData []byte) adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsReadWriteRequest(
indexGroup, indexOffset, readLength, items, writeData,
m.configuration.targetAmsNetId, m.configuration.targetAmsPort,
m.configuration.sourceAmsNetId, m.configuration.sourceAmsPort,
m.configuration.TargetAmsNetId, m.configuration.TargetAmsPort,
m.configuration.SourceAmsNetId, m.configuration.SourceAmsPort,
0, m.driverContext.getInvokeId()))
}

func (m *Connection) NewAdsAddDeviceNotificationRequest(indexGroup uint32, indexOffset uint32, length uint32, transmissionMode adsModel.AdsTransMode, maxDelay uint32, cycleTime uint32) adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsAddDeviceNotificationRequest(
indexGroup, indexOffset, length, transmissionMode, maxDelay, cycleTime,
m.configuration.targetAmsNetId, m.configuration.targetAmsPort,
m.configuration.sourceAmsNetId, m.configuration.sourceAmsPort,
m.configuration.TargetAmsNetId, m.configuration.TargetAmsPort,
m.configuration.SourceAmsNetId, m.configuration.SourceAmsPort,
0, m.driverContext.getInvokeId()))
}

func (m *Connection) NewAdsDeleteDeviceNotificationRequest(notificationHandle uint32) adsModel.AmsTCPPacket {
return adsModel.NewAmsTCPPacket(
adsModel.NewAdsDeleteDeviceNotificationRequest(
notificationHandle,
m.configuration.targetAmsNetId, m.configuration.targetAmsPort,
m.configuration.sourceAmsNetId, m.configuration.sourceAmsPort,
m.configuration.TargetAmsNetId, m.configuration.TargetAmsPort,
m.configuration.SourceAmsNetId, m.configuration.SourceAmsPort,
0, m.driverContext.getInvokeId()))
}
15 changes: 8 additions & 7 deletions plc4go/internal/ads/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"strings"

"github.com/apache/plc4x/plc4go/internal/ads/model"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
Expand Down Expand Up @@ -66,8 +67,8 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
// Here we can be sure that we're only handling a single request.
tagName := readRequest.GetTagNames()[0]
tag := readRequest.GetTag(tagName)
if needsResolving(tag) {
adsField, err := castToSymbolicPlcTagFromPlcTag(tag)
if model.NeedsResolving(tag) {
adsField, err := model.CastToSymbolicPlcTagFromPlcTag(tag)
if err != nil {
result <- &internalModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand All @@ -89,7 +90,7 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
return
}
}
directAdsTag, ok := tag.(*DirectPlcTag)
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
result <- &internalModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand Down Expand Up @@ -142,12 +143,12 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
// Calculate the size of all tags together.
// Calculate the expected size of the response data.
expectedResponseDataSize := uint32(0)
directAdsTags := map[string]*DirectPlcTag{}
directAdsTags := map[string]*model.DirectPlcTag{}
requestItems := make([]driverModel.AdsMultiRequestItem, 0)
for _, tagName := range readRequest.GetTagNames() {
tag := readRequest.GetTag(tagName)
if needsResolving(tag) {
adsField, err := castToSymbolicPlcTagFromPlcTag(tag)
if model.NeedsResolving(tag) {
adsField, err := model.CastToSymbolicPlcTagFromPlcTag(tag)
if err != nil {
result <- &internalModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand All @@ -169,7 +170,7 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
return
}
}
directAdsTag, ok := tag.(*DirectPlcTag)
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
result <- &internalModel.DefaultPlcReadRequestResult{
Request: readRequest,
Expand Down
Loading

0 comments on commit 554d756

Please sign in to comment.