From 55066a78fe4f2dad5fce76fd54da0afa9457144c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20R=C3=BChl?= Date: Wed, 22 Mar 2023 10:56:51 +0100 Subject: [PATCH] fix(plc4go/knx): use queues for discovery to not overwhelm small devices --- plc4go/internal/knxnetip/Discoverer.go | 206 ++++++++++++++----------- 1 file changed, 117 insertions(+), 89 deletions(-) diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go index 8a31da2de81..44aea6a1544 100644 --- a/plc4go/internal/knxnetip/Discoverer.go +++ b/plc4go/internal/knxnetip/Discoverer.go @@ -23,31 +23,38 @@ import ( "bytes" "context" "fmt" + "github.com/apache/plc4x/plc4go/spi/utils" + "github.com/rs/zerolog/log" "net" "net/url" + "sync" "time" - "github.com/apache/plc4x/plc4go/spi/options" - "github.com/pkg/errors" - apiModel "github.com/apache/plc4x/plc4go/pkg/api/model" driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model" - "github.com/apache/plc4x/plc4go/spi" internalModel "github.com/apache/plc4x/plc4go/spi/model" + "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/transports" "github.com/apache/plc4x/plc4go/spi/transports/udp" ) type Discoverer struct { - messageCodec spi.MessageCodec + transportInstanceCreationQueue utils.Executor + deviceScanningQueue utils.Executor } func NewDiscoverer() *Discoverer { - return &Discoverer{} + return &Discoverer{ + // TODO: maybe a dynamic executor would be better to not waste cycles when not in use + transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100), + deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100), + } } func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error { - // TODO: handle ctx + d.transportInstanceCreationQueue.Start() + d.deviceScanningQueue.Start() + udpTransport := udp.NewTransport() // Create a connection string for the KNX broadcast discovery address. @@ -79,66 +86,87 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel. interfaces = allInterfaces } - var transportInstances []transports.TransportInstance + transportInstances := make(chan transports.TransportInstance) + wg := &sync.WaitGroup{} // Iterate over all network devices of this system. - for _, interf := range interfaces { - addrs, err := interf.Addrs() + for _, netInterface := range interfaces { + addrs, err := netInterface.Addrs() if err != nil { return err } - // Iterate over all addresses the current interface has configured - // For KNX we're only interested in IPv4 addresses, as it doesn't - // seem to work with IPv6. - for _, addr := range addrs { - var ipv4Addr net.IP - switch addr.(type) { - // If the device is configured to communicate with a subnet - case *net.IPNet: - ipv4Addr = addr.(*net.IPNet).IP.To4() - - // If the device is configured for a point-to-point connection - case *net.IPAddr: - ipv4Addr = addr.(*net.IPAddr).IP.To4() - } - - // If we found an IPv4 address and this is not a loopback address, - // add it to the list of devices we will open ports and send discovery - // messages from. - if ipv4Addr != nil && !ipv4Addr.IsLoopback() { - // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address) - transportInstance, err := - udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, - &net.UDPAddr{IP: ipv4Addr, Port: 0}) - if err != nil { - return err + wg.Add(1) + go func(netInterface net.Interface) { + defer func() { wg.Done() }() + // Iterate over all addresses the current interface has configured + // For KNX we're only interested in IPv4 addresses, as it doesn't + // seem to work with IPv6. + for _, addr := range addrs { + var ipv4Addr net.IP + switch addr.(type) { + // If the device is configured to communicate with a subnet + case *net.IPNet: + ipv4Addr = addr.(*net.IPNet).IP.To4() + + // If the device is configured for a point-to-point connection + case *net.IPAddr: + ipv4Addr = addr.(*net.IPAddr).IP.To4() } - err = transportInstance.ConnectWithContext(ctx) - if err != nil { + + // If we found an IPv4 address and this is not a loopback address, + // add it to the list of devices we will open ports and send discovery + // messages from. + if ipv4Addr == nil || ipv4Addr.IsLoopback() { continue } - - transportInstances = append(transportInstances, transportInstance) + d.transportInstanceCreationQueue.Submit(ctx, 0, d.createTransportInstanceDispatcher(ctx, wg, connectionUrl, ipv4Addr, udpTransport, transportInstances)) } - } + }(netInterface) } + go func() { + wg.Wait() + log.Trace().Msg("Closing transport instance channel") + close(transportInstances) + }() + + go func() { + for transportInstance := range transportInstances { + d.deviceScanningQueue.Submit(ctx, 0, d.createDeviceScanDispatcher(transportInstance.(*udp.TransportInstance), callback)) + } + }() + return nil +} - if len(transportInstances) <= 0 { - return nil +func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable { + wg.Add(1) + return func() { + defer wg.Done() + // Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address) + transportInstance, err := + udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, + &net.UDPAddr{IP: ipv4Addr, Port: 0}) + if err != nil { + log.Error().Err(err).Msg("error creating transport instance") + return + } + err = transportInstance.ConnectWithContext(ctx) + if err != nil { + log.Debug().Err(err).Msg("Error Connecting") + return + } + transportInstances <- transportInstance } +} - for _, transportInstance := range transportInstances { +func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable { + return func() { // Create a codec for sending and receiving messages. - codec := NewMessageCodec(transportInstance, nil) + codec := NewMessageCodec(udpTransportInstance, nil) // Explicitly start the worker if err := codec.Connect(); err != nil { - return errors.Wrap(err, "Error connecting") + log.Error().Err(err).Msg("Error connecting") + return } - // Cast to the UDP transport instance, so we can access information on the local port. - udpTransportInstance, ok := transportInstance.(*udp.TransportInstance) - if !ok { - return errors.New("couldn't cast transport instance to UDP transport instance") - } localAddress := udpTransportInstance.LocalAddress localAddr := driverModel.NewIPAddress(localAddress.IP) @@ -147,49 +175,49 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel. driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port)) searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint) // Send the search request. - err = codec.Send(searchRequestMessage) - go func() { - // Keep on reading responses till the timeout is done. - // TODO: Make this configurable - timeout := time.NewTimer(time.Second * 1) - timeout.Stop() - for start := time.Now(); time.Since(start) < time.Second*5; { - timeout.Reset(time.Second * 1) - select { - case message := <-codec.GetDefaultIncomingMessageChannel(): - { - if !timeout.Stop() { - <-timeout.C + if err := codec.Send(searchRequestMessage); err != nil { + log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage) + return + } + // Keep on reading responses till the timeout is done. + // TODO: Make this configurable + timeout := time.NewTimer(time.Second * 1) + timeout.Stop() + for start := time.Now(); time.Since(start) < time.Second*5; { + timeout.Reset(time.Second * 1) + select { + case message := <-codec.GetDefaultIncomingMessageChannel(): + { + if !timeout.Stop() { + <-timeout.C + } + searchResponse := message.(driverModel.SearchResponse) + if searchResponse != nil { + addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr() + remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d", + addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort())) + if err != nil { + continue } - searchResponse := message.(driverModel.SearchResponse) - if searchResponse != nil { - addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr() - remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d", - addr[0], addr[1], addr[2], addr[3], searchResponse.GetHpaiControlEndpoint().GetIpPort())) - if err != nil { - continue - } - deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00")) - discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{ - ProtocolCode: "knxnet-ip", - TransportCode: "udp", - TransportUrl: *remoteUrl, - Options: nil, - Name: deviceName, - } - // Pass the event back to the callback - callback(discoveryEvent) + deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00")) + discoveryEvent := &internalModel.DefaultPlcDiscoveryItem{ + ProtocolCode: "knxnet-ip", + TransportCode: "udp", + TransportUrl: *remoteUrl, + Options: nil, + Name: deviceName, } - continue - } - case <-timeout.C: - { - timeout.Stop() - continue + // Pass the event back to the callback + callback(discoveryEvent) } + continue + } + case <-timeout.C: + { + timeout.Stop() + continue } } - }() + } } - return nil }