Skip to content

Commit

Permalink
fix(plc4go/knx): use queues for discovery to not overwhelm small devices
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Mar 22, 2023
1 parent 8be4de1 commit 55066a7
Showing 1 changed file with 117 additions and 89 deletions.
206 changes: 117 additions & 89 deletions plc4go/internal/knxnetip/Discoverer.go
Expand Up @@ -23,31 +23,38 @@ import (
"bytes"
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog/log"
"net"
"net/url"
"sync"
"time"

"github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"

apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
)

type Discoverer struct {
messageCodec spi.MessageCodec
transportInstanceCreationQueue utils.Executor
deviceScanningQueue utils.Executor
}

func NewDiscoverer() *Discoverer {
return &Discoverer{}
return &Discoverer{
// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
}
}

func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
// TODO: handle ctx
d.transportInstanceCreationQueue.Start()
d.deviceScanningQueue.Start()

udpTransport := udp.NewTransport()

// Create a connection string for the KNX broadcast discovery address.
Expand Down Expand Up @@ -79,66 +86,87 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
interfaces = allInterfaces
}

var transportInstances []transports.TransportInstance
transportInstances := make(chan transports.TransportInstance)
wg := &sync.WaitGroup{}
// Iterate over all network devices of this system.
for _, interf := range interfaces {
addrs, err := interf.Addrs()
for _, netInterface := range interfaces {
addrs, err := netInterface.Addrs()
if err != nil {
return err
}
// Iterate over all addresses the current interface has configured
// For KNX we're only interested in IPv4 addresses, as it doesn't
// seem to work with IPv6.
for _, addr := range addrs {
var ipv4Addr net.IP
switch addr.(type) {
// If the device is configured to communicate with a subnet
case *net.IPNet:
ipv4Addr = addr.(*net.IPNet).IP.To4()

// If the device is configured for a point-to-point connection
case *net.IPAddr:
ipv4Addr = addr.(*net.IPAddr).IP.To4()
}

// If we found an IPv4 address and this is not a loopback address,
// add it to the list of devices we will open ports and send discovery
// messages from.
if ipv4Addr != nil && !ipv4Addr.IsLoopback() {
// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
transportInstance, err :=
udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
&net.UDPAddr{IP: ipv4Addr, Port: 0})
if err != nil {
return err
wg.Add(1)
go func(netInterface net.Interface) {
defer func() { wg.Done() }()
// Iterate over all addresses the current interface has configured
// For KNX we're only interested in IPv4 addresses, as it doesn't
// seem to work with IPv6.
for _, addr := range addrs {
var ipv4Addr net.IP
switch addr.(type) {
// If the device is configured to communicate with a subnet
case *net.IPNet:
ipv4Addr = addr.(*net.IPNet).IP.To4()

// If the device is configured for a point-to-point connection
case *net.IPAddr:
ipv4Addr = addr.(*net.IPAddr).IP.To4()
}
err = transportInstance.ConnectWithContext(ctx)
if err != nil {

// If we found an IPv4 address and this is not a loopback address,
// add it to the list of devices we will open ports and send discovery
// messages from.
if ipv4Addr == nil || ipv4Addr.IsLoopback() {
continue
}

transportInstances = append(transportInstances, transportInstance)
d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances))
}
}
}(netInterface)
}
go func() {
wg.Wait()
log.Trace().Msg("Closing transport instance channel")
close(transportInstances)
}()

go func() {
for transportInstance := range transportInstances {
d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*udp.TransportInstance), callback))
}
}()
return nil
}

if len(transportInstances) <= 0 {
return nil
func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
wg.Add(1)
return func() {
defer wg.Done()
// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
transportInstance, err :=
udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
&net.UDPAddr{IP: ipv4Addr, Port: 0})
if err != nil {
log.Error().Err(err).Msg("error creating transport instance")
return
}
err = transportInstance.ConnectWithContext(ctx)
if err != nil {
log.Debug().Err(err).Msg("Error Connecting")
return
}
transportInstances <- transportInstance
}
}

for _, transportInstance := range transportInstances {
func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
return func() {
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(transportInstance, nil)
codec := NewMessageCodec(udpTransportInstance, nil)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
return errors.Wrap(err, "Error connecting")
log.Error().Err(err).Msg("Error connecting")
return
}

// Cast to the UDP transport instance, so we can access information on the local port.
udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
if !ok {
return errors.New("couldn't cast transport instance to UDP transport instance")
}
localAddress := udpTransportInstance.LocalAddress
localAddr := driverModel.NewIPAddress(localAddress.IP)

Expand All @@ -147,49 +175,49 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
// Send the search request.
err = codec.Send(searchRequestMessage)
go func() {
// Keep on reading responses till the timeout is done.
// TODO: Make this configurable
timeout := time.NewTimer(time.Second * 1)
timeout.Stop()
for start := time.Now(); time.Since(start) < time.Second*5; {
timeout.Reset(time.Second * 1)
select {
case message := <-codec.GetDefaultIncomingMessageChannel():
{
if !timeout.Stop() {
<-timeout.C
if err := codec.Send(searchRequestMessage); err != nil {
log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage)
return
}
// Keep on reading responses till the timeout is done.
// TODO: Make this configurable
timeout := time.NewTimer(time.Second * 1)
timeout.Stop()
for start := time.Now(); time.Since(start) < time.Second*5; {
timeout.Reset(time.Second * 1)
select {
case message := <-codec.GetDefaultIncomingMessageChannel():
{
if !timeout.Stop() {
<-timeout.C
}
searchResponse := message.(driverModel.SearchResponse)
if searchResponse != nil {
addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
if err != nil {
continue
}
searchResponse := message.(driverModel.SearchResponse)
if searchResponse != nil {
addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort()))
if err != nil {
continue
}
deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
ProtocolCode: "knxnet-ip",
TransportCode: "udp",
TransportUrl: *remoteUrl,
Options: nil,
Name: deviceName,
}
// Pass the event back to the callback
callback(discoveryEvent)
deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{
ProtocolCode: "knxnet-ip",
TransportCode: "udp",
TransportUrl: *remoteUrl,
Options: nil,
Name: deviceName,
}
continue
}
case <-timeout.C:
{
timeout.Stop()
continue
// Pass the event back to the callback
callback(discoveryEvent)
}
continue
}
case <-timeout.C:
{
timeout.Stop()
continue
}
}
}()
}
}
return nil
}

0 comments on commit 55066a7

Please sign in to comment.