Skip to content

Commit

Permalink
feat(plc4go/bacnet): basic comm working
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jan 16, 2023
1 parent 6038199 commit 574dd3f
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 64 deletions.
5 changes: 3 additions & 2 deletions plc4go/internal/bacnetip/ApplicationLayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,8 +1702,9 @@ func (s *StateMachineAccessPoint) Confirmation(apdu _PDU) error { // TODO: note
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
// find the client transaction this is acking
var tr *ClientSSM
for _, tr := range s.clientTransactions {
if apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == tr.invokeId && pduSource.Equals(tr.pduAddress) {
for _, _tr := range s.clientTransactions {
if _apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == _tr.invokeId && pduSource.Equals(_tr.pduAddress) {
tr = _tr
break
}
}
Expand Down
4 changes: 2 additions & 2 deletions plc4go/internal/bacnetip/ApplicationModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (a *ApplicationIOController) _AppComplete(address *Address, apdu _PDU) erro
}

// this request is complete
switch apdu.(type) {
switch apdu.GetMessage().(type) {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
if err := queue.CompleteIO(queue.activeIOCB, apdu); err != nil {
return err
Expand Down Expand Up @@ -420,7 +420,7 @@ func (a *ApplicationIOController) Confirmation(apdu _PDU) error {
log.Debug().Msgf("Confirmation\n%s", apdu)

// this is an ack, error, reject or abort
return a._AppComplete(apdu.GetPDUDestination(), apdu)
return a._AppComplete(apdu.GetPDUSource(), apdu)
}

type BIPSimpleApplication struct {
Expand Down
46 changes: 36 additions & 10 deletions plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *_MultiplexServer) Indication(pdu _PDU) error {
}

type UDPMultiplexer struct {
address Address
address *Address
addrTuple *AddressTuple[string, uint16]
addrBroadcastTuple *AddressTuple[string, uint16]
direct *_MultiplexClient
Expand All @@ -87,21 +87,21 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
specialBroadcast := false
if address == nil {
address, _ := NewAddress()
u.address = *address
u.address = address
u.addrTuple = &AddressTuple[string, uint16]{"", 47808}
u.addrBroadcastTuple = &AddressTuple[string, uint16]{"255.255.255.255", 47808}
} else {
// allow the address to be cast
if caddress, ok := address.(*Address); ok {
u.address = *caddress
} else if caddress, ok := address.(Address); ok {
u.address = caddress
} else if caddress, ok := address.(Address); ok {
u.address = &caddress
} else {
newAddress, err := NewAddress(address)
if err != nil {
return nil, errors.Wrap(err, "error parsing address")
}
u.address = *newAddress
u.address = newAddress
}

// promote the normal and broadcast tuples
Expand Down Expand Up @@ -210,7 +210,33 @@ func (m *UDPMultiplexer) Confirmation(client *_MultiplexClient, pdu _PDU) error
return nil
}

// TODO: it is getting to messy, we need to solve the source destination topic
// TODO: upstream this is a tuple but we don't have that here so we can work with what we have
src := pduSource
var dest *Address

// match the destination in case the stack needs it
if client == m.direct {
log.Debug().Msg("direct to us")
dest = m.address
} else if client == m.broadcast {
log.Debug().Msg("broadcast to us")
dest = NewLocalBroadcast(nil)
} else {
return errors.New("Confirmation missmatch")
}
log.Debug().Msgf("dest: %s", dest)

// must have at least one octet
if pdu.GetMessage() == nil {
log.Debug().Msg("no data")
return nil
}

// TODO: we only support 0x81 at the moment
if m.annexJ != nil {
return m.annexJ.Response(NewPDU(pdu.GetMessage(), WithPDUSource(src), WithPDUDestination(dest)))
}

return nil
}

Expand Down Expand Up @@ -242,7 +268,7 @@ func (b *AnnexJCodec) Indication(pdu _PDU) error {

func (b *AnnexJCodec) Confirmation(pdu _PDU) error {
// Note: our BVLC are all annexJ at the moment
return b.Request(pdu)
return b.Response(pdu)
}

type _BIPSAP interface {
Expand Down Expand Up @@ -347,14 +373,14 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
return b.SapRequest(pdu)
case readWriteModel.BVLCOriginalUnicastNPDUExactly:
// build a vanilla PDU
xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
log.Debug().Msgf("xpdu: %s", xpdu)

// send it upstream
return b.Response(xpdu)
case readWriteModel.BVLCOriginalBroadcastNPDUExactly:
// build a PDU with a local broadcast address
xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
log.Debug().Msgf("xpdu: %s", xpdu)

// send it upstream
Expand All @@ -367,7 +393,7 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
if err != nil {
return errors.Wrap(err, "error building a ip")
}
xpdu := NewPDU(msg, WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
log.Debug().Msgf("xpdu: %s", xpdu)

// send it upstream
Expand Down
16 changes: 7 additions & 9 deletions plc4go/internal/bacnetip/IOCBModule.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"net"
"sync"
"time"
)
Expand Down Expand Up @@ -89,7 +88,7 @@ type _IOCB interface {
Trigger()
setIOError(err error)
getRequest() _PDU
getDestination() net.Addr
getDestination() *Address
getPriority() int
clearQueue()
Abort(err error) error
Expand All @@ -101,7 +100,7 @@ var _identLock sync.Mutex
type IOCB struct {
ioID int
request _PDU
destination net.Addr
destination *Address
ioState IOCBState
ioResponse _PDU
ioError error
Expand All @@ -115,7 +114,7 @@ type IOCB struct {
priority int
}

func NewIOCB(request _PDU, destination net.Addr) (*IOCB, error) {
func NewIOCB(request _PDU, destination *Address) (*IOCB, error) {
// lock the identity sequence number
_identLock.Lock()

Expand Down Expand Up @@ -276,7 +275,7 @@ func (i *IOCB) getRequest() _PDU {
return i.request
}

func (i *IOCB) getDestination() net.Addr {
func (i *IOCB) getDestination() *Address {
return i.destination
}

Expand Down Expand Up @@ -793,17 +792,16 @@ func (i *IOQController) _waitTrigger() error {
stateLog.Debug().Msgf("%s %s %s", time.Now(), i.name, "idle")

// look for more to do
i._trigger()
return nil
return i._trigger()
}

type SieveQueue struct {
*IOQController
requestFn func(apdu _PDU)
address net.Addr
address *Address
}

func NewSieveQueue(fn func(apdu _PDU), address net.Addr) (*SieveQueue, error) {
func NewSieveQueue(fn func(apdu _PDU), address *Address) (*SieveQueue, error) {
s := &SieveQueue{}
var err error
s.IOQController, err = NewIOQController(address.String(), s)
Expand Down
12 changes: 6 additions & 6 deletions plc4go/internal/bacnetip/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
}

func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
address, err2 := NewAddress(m.remoteAddress)
if err2 != nil {
panic(err2)
address, err := NewAddress(m.remoteAddress)
if err != nil {
return err
}
iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), m.remoteAddress)
iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), address)
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
Expand All @@ -115,10 +115,10 @@ func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
iocb.Wait()
if iocb.ioError != nil {
// TODO: handle error
println(iocb.ioError)
fmt.Printf("Err: %v\n", iocb.ioError)
} else if iocb.ioResponse != nil {
// TODO: response?
println(iocb.ioResponse)
fmt.Printf("Response: %v\n", iocb.ioResponse)
} else {
// TODO: what now?
}
Expand Down
Loading

0 comments on commit 574dd3f

Please sign in to comment.