Skip to content

Commit

Permalink
feat(plc4go/bacnet): first comm went through new stack
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jan 13, 2023
1 parent 609f6af commit 2265a6a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 26 deletions.
4 changes: 3 additions & 1 deletion plc4go/internal/bacnetip/NetworkService.go
Expand Up @@ -459,6 +459,7 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
var destinationNetworkAddress *uint16
var destinationLength *uint8
var destinationAddress []uint8
var destinationHopCount *uint8
if destinationSpecified {
destinationSpecified = true
destinationNetworkAddress = destination.AddrNet
Expand All @@ -473,9 +474,10 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
// If we define the len 0 we must not send the array
destinationAddress = nil
}
destinationHopCount = &hopCount
}
control := readWriteModel.NewNPDUControl(false, destinationSpecified, sourceSpecified, expectingReply, networkPriority)
return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, &hopCount, nil, apdu, 0), nil
return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, destinationHopCount, nil, apdu, 0), nil
}

func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PDU) error {
Expand Down
46 changes: 44 additions & 2 deletions plc4go/internal/bacnetip/PDU.go
Expand Up @@ -29,6 +29,7 @@ import (
"net"
"reflect"
"regexp"
"strings"
)

type AddressType int
Expand Down Expand Up @@ -163,7 +164,10 @@ func (a *Address) decodeAddress(addr interface{}) error {
case net.Addr:
// TODO: hacked in udp support
udpAddr := addr.(*net.UDPAddr)
a.AddrAddress = udpAddr.IP
a.AddrAddress = udpAddr.IP.To4()
if a.AddrAddress == nil {
a.AddrAddress = udpAddr.IP.To16()
}
length := uint32(len(a.AddrAddress))
a.AddrLen = &length
port := uint16(udpAddr.Port)
Expand Down Expand Up @@ -300,7 +304,45 @@ func (a *Address) Equals(other interface{}) bool {
}

func (a *Address) String() string {
return fmt.Sprintf("%#v", a)
if a == nil {
return "<nil>"
}
var sb strings.Builder
sb.WriteString(a.AddrType.String())
if a.AddrNet != nil {
_, _ = fmt.Fprintf(&sb, ", net: %d", *a.AddrNet)
}
if len(a.AddrAddress) > 0 {
_, _ = fmt.Fprintf(&sb, ", address: %d", a.AddrAddress)
}
if a.AddrLen != nil {
_, _ = fmt.Fprintf(&sb, " with len %d", *a.AddrLen)
}
if a.AddrRoute != nil {
_, _ = fmt.Fprintf(&sb, ", route: %s", a.AddrRoute)
}
if a.AddrIP != nil {
_, _ = fmt.Fprintf(&sb, ", ip: %d", *a.AddrIP)
}
if a.AddrMask != nil {
_, _ = fmt.Fprintf(&sb, ", mask: %d", *a.AddrMask)
}
if a.AddrHost != nil {
_, _ = fmt.Fprintf(&sb, ", host: %d", *a.AddrHost)
}
if a.AddrSubnet != nil {
_, _ = fmt.Fprintf(&sb, ", subnet: %d", *a.AddrSubnet)
}
if a.AddrPort != nil {
_, _ = fmt.Fprintf(&sb, ", port: %d", *a.AddrPort)
}
if a.AddrTuple != nil {
_, _ = fmt.Fprintf(&sb, ", tuple: %s", a.AddrTuple)
}
if a.AddrBroadcastTuple != nil {
_, _ = fmt.Fprintf(&sb, ", broadcast tuple: %s", a.AddrBroadcastTuple)
}
return sb.String()
}

func portToUint16(port []byte) uint16 {
Expand Down
73 changes: 50 additions & 23 deletions plc4go/internal/bacnetip/UDPCommunicationsModule.go
Expand Up @@ -20,10 +20,9 @@
package bacnetip

import (
"bufio"
"fmt"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"net"
Expand Down Expand Up @@ -109,7 +108,7 @@ type UDPDirector struct {
timeout uint32
reuse bool
address AddressTuple[string, uint16]
ti *udp.TransportInstance
udpConn *net.UDPConn

actorClass func(*UDPDirector, string) *UDPActor
request chan _PDU
Expand Down Expand Up @@ -149,9 +148,16 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
if err != nil {
return nil, errors.Wrap(err, "error resolving udp address")
}
d.ti = udp.NewTransportInstance(resolvedAddress, nil, d.timeout, d.reuse, nil)
if err := d.ti.Connect(); err != nil {
return nil, errors.Wrap(err, "error connecting transport instance")
if d.reuse {
if packetConn, err := reuseport.ListenPacket("udp", resolvedAddress.String()); err != nil {
return nil, errors.Wrap(err, "error connecting to local address")
} else {
d.udpConn = packetConn.(*net.UDPConn)
}
} else {
if d.udpConn, err = net.ListenUDP("udp", resolvedAddress); err != nil {
return nil, errors.Wrap(err, "error connecting to local address")
}
}

d.running = true
Expand All @@ -164,7 +170,28 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
// create the request queue
d.request = make(chan _PDU)
go func() {
// TODO: get requests and send them...
for {
pdu := <-d.request
serialize, err := pdu.GetMessage().Serialize()
if err != nil {
log.Error().Err(err).Msg("Error building message")
continue
}
// TODO: wonky address object
destination := pdu.GetPDUDestination()
addr := net.IPv4(destination.AddrAddress[0], destination.AddrAddress[1], destination.AddrAddress[2], destination.AddrAddress[3])
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, *destination.AddrPort))
if err != nil {
log.Error().Err(err).Msg("Error resolving address")
continue
}
writtenBytes, err := d.udpConn.WriteToUDP(serialize, udpAddr)
if err != nil {
log.Error().Err(err).Msg("Error writing bytes")
continue
}
log.Debug().Msgf("%d written bytes", writtenBytes)
}
}()

// start with an empty peer pool
Expand Down Expand Up @@ -213,31 +240,31 @@ func (d *UDPDirector) ActorError(err error) {

func (d *UDPDirector) Close() error {
d.running = false
return d.ti.Close()
return d.udpConn.Close()
}

func (d *UDPDirector) handleRead() {
log.Debug().Msgf("handleRead(%v)", d.address)

if err := d.ti.FillBuffer(func(pos uint, _ byte, _ *bufio.Reader) bool {
if pos >= 4 {
return false
}
return true
}); err != nil {
// pass along to a handler
d.handleError(errors.Wrap(err, "error filling buffer"))
firstFourBytes := make([]byte, 4)
if read, err := d.udpConn.Read(firstFourBytes); err != nil {
log.Error().Err(err).Msg("error reading")
return
}
peekedBytes, err := d.ti.PeekReadableBytes(4)
if err != nil {
// pass along to a handler
d.handleError(errors.Wrap(err, "error peeking 4 bytes"))
} else if read != 4 {
log.Error().Msgf("Not enough data %d", read)
return
}

length := uint32(peekedBytes[2])<<8 | uint32(peekedBytes[3])
readBytes, err := d.ti.Read(length)
length := uint32(firstFourBytes[2])<<8 | uint32(firstFourBytes[3])
remainingMessage := make([]byte, length-4)
if read, err := d.udpConn.Read(remainingMessage); err != nil {
log.Error().Err(err).Msg("error reading")
return
} else if read != int(length-4) {
log.Error().Msgf("Not enough data: actual: %d, wanted: %d", read, length-4)
return
}
readBytes := append(firstFourBytes, remainingMessage...)

bvlc, err := model.BVLCParse(readBytes)
if err != nil {
Expand Down

0 comments on commit 2265a6a

Please sign in to comment.