Skip to content

Commit

Permalink
feat(plc4go/cbus): added source to events
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Sep 6, 2022
1 parent 0e01d37 commit 31438eb
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 11 deletions.
11 changes: 7 additions & 4 deletions plc4go/internal/cbus/Subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
}
}

func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
// TODO: handle context
func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
internalPlcSubscriptionRequest := subscriptionRequest.(spiModel.DefaultPlcSubscriptionRequest)
Expand Down Expand Up @@ -112,6 +111,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
fieldName := subscriptionHandle.fieldName

Expand All @@ -122,6 +122,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
continue
}
}
sources[fieldName] = unitAddressString

subscriptionType := subscriptionHandle.fieldType
// TODO: handle subscriptionType
Expand Down Expand Up @@ -217,7 +218,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {

// Assemble a PlcSubscription event
if len(plcValues) > 0 {
event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
consumer(event)
}
}
Expand All @@ -239,6 +240,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
fieldName := subscriptionHandle.fieldName

Expand Down Expand Up @@ -278,6 +280,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
continue
}
}
sources[fieldName] = unitAddressString

if application := field.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
Expand All @@ -302,7 +305,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {

// Assemble a PlcSubscription event
if len(plcValues) > 0 {
event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
consumer(event)
}
}
Expand Down
21 changes: 17 additions & 4 deletions plc4go/internal/cbus/SubscriptionEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,29 @@ import (
type SubscriptionEvent struct {
internalMode.DefaultPlcSubscriptionEvent
address map[string]string
sources map[string]string
}

func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
address map[string]string, values map[string]values.PlcValue) SubscriptionEvent {
subscriptionEvent := SubscriptionEvent{address: address}
func NewSubscriptionEvent(
fields map[string]apiModel.PlcField,
types map[string]internalMode.SubscriptionType,
intervals map[string]time.Duration,
responseCodes map[string]apiModel.PlcResponseCode,
address map[string]string,
sources map[string]string,
values map[string]values.PlcValue) SubscriptionEvent {
subscriptionEvent := SubscriptionEvent{
address: address,
sources: sources,
}
subscriptionEvent.DefaultPlcSubscriptionEvent = internalMode.NewDefaultPlcSubscriptionEvent(subscriptionEvent, fields, types, intervals, responseCodes, values)
return subscriptionEvent
}

func (m SubscriptionEvent) GetAddress(name string) string {
return m.address[name]
}

func (m SubscriptionEvent) GetSource(name string) string {
return m.sources[name]
}
60 changes: 57 additions & 3 deletions plc4go/spi/model/DefaultPlcSubscriptionEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,66 @@ func (m DefaultPlcSubscriptionEvent) GetValue(name string) values.PlcValue {
}

func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) error {
if err := writeBuffer.PushContext("PlcReadResponse"); err != nil {
if err := writeBuffer.PushContext("PlcSubscriptionEvent"); err != nil {
return err
}

if err := writeBuffer.WriteSerializable(m.DefaultResponse); err != nil {
return err
}

if err := writeBuffer.PushContext("fields"); err != nil {
return err
}
for _, fieldName := range m.GetFieldNames() {
if err := writeBuffer.PushContext(fieldName); err != nil {
return err
}
valueResponse := m.GetField(fieldName)
if err := writeBuffer.WriteString("addressString", uint32(len(valueResponse.GetAddressString())*8), "UTF-8", valueResponse.GetAddressString()); err != nil {
return err
}
if err := writeBuffer.WriteString("typeName", uint32(len(valueResponse.GetTypeName())*8), "UTF-8", valueResponse.GetTypeName()); err != nil {
return err
}
if err := writeBuffer.WriteUint16(fieldName, 8, uint16(valueResponse.GetQuantity())); err != nil {
return err
}
if err := writeBuffer.PopContext(fieldName); err != nil {
return err
}
}
if err := writeBuffer.PopContext("fields"); err != nil {
return err
}
if err := writeBuffer.PushContext("types"); err != nil {
return err
}
for _, fieldName := range m.GetFieldNames() {
fieldType := m.GetType(fieldName)
if err := writeBuffer.WriteUint8(fieldName, 8, uint8(fieldType), utils.WithAdditionalStringRepresentation(fieldType.String())); err != nil {
return err
}
}
if err := writeBuffer.PopContext("types"); err != nil {
return err
}
if err := writeBuffer.PushContext("intervals"); err != nil {
return err
}
for _, fieldName := range m.GetFieldNames() {
interval := m.GetInterval(fieldName)
if err := writeBuffer.WriteInt64(fieldName, 8, int64(interval), utils.WithAdditionalStringRepresentation(interval.String())); err != nil {
return err
}
}
if err := writeBuffer.PopContext("intervals"); err != nil {
return err
}

if err := writeBuffer.PushContext("values"); err != nil {
return err
}
for _, fieldName := range m.GetFieldNames() {
if err := writeBuffer.PushContext(fieldName); err != nil {
return err
Expand All @@ -104,10 +157,11 @@ func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) er
return err
}
}
if err := writeBuffer.PopContext("fields"); err != nil {
if err := writeBuffer.PopContext("values"); err != nil {
return err
}
if err := writeBuffer.PopContext("PlcReadResponse"); err != nil {

if err := writeBuffer.PopContext("PlcSubscriptionEvent"); err != nil {
return err
}
return nil
Expand Down
13 changes: 13 additions & 0 deletions plc4go/spi/model/DefaultPlcSubscriptionRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ const (
SubscriptionEvent SubscriptionType = 0x03
)

func (s SubscriptionType) String() string {
switch s {
case SubscriptionCyclic:
return "SubscriptionCyclic"
case SubscriptionChangeOfState:
return "SubscriptionChangeOfState"
case SubscriptionEvent:
return "SubscriptionEvent"
default:
return "Unknown"
}
}

type DefaultPlcSubscriptionRequestBuilder struct {
subscriber spi.PlcSubscriber
fieldHandler spi.PlcFieldHandler
Expand Down

0 comments on commit 31438eb

Please sign in to comment.