Skip to content

Commit

Permalink
feat(plc4go/cbus): improve logging of discoverer
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Apr 18, 2023
1 parent 9643831 commit 61045e6
Showing 1 changed file with 42 additions and 19 deletions.
61 changes: 42 additions & 19 deletions plc4go/internal/cbus/Discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"net"
"net/url"
"sync"
Expand Down Expand Up @@ -66,8 +67,8 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
if log.Debug().Enabled() {
for _, provider := range interfaces {
log.Debug().Msgf("Discover on %v", provider.name())
log.Debug().Msgf("Discover on %#v", provider.containedInterface())
log.Debug().Msgf("Discover on %s", provider)
log.Trace().Msgf("Discover on %#v", provider.containedInterface())
}
}

Expand All @@ -76,15 +77,19 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
tcpTransport := tcp.NewTransport()
// Iterate over all network devices of this system.
for _, netInterface := range interfaces {
interfaceLog := log.With().Stringer("interface", netInterface).Logger()
interfaceLog.Debug().Msg("Scanning")
addrs, err := netInterface.Addrs()
if err != nil {
return err
}
wg.Add(1)
go func(netInterface addressProvider) {
go func(netInterface addressProvider, interfaceLog zerolog.Logger) {
defer func() { wg.Done() }()
// Iterate over all addresses the current interface has configured
for _, addr := range addrs {
addressLogger := interfaceLog.With().Stringer("address", addr).Logger()
addressLogger.Debug().Msg("looking into")
var ipv4Addr net.IP
switch addr.(type) {
// If the device is configured to communicate with a subnet
Expand All @@ -104,19 +109,31 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
addresses, err := utils.GetIPAddresses(ctx, netInterface.containedInterface(), false)
if err != nil {
log.Warn().Err(err).Msgf("Can't get addresses for %v", netInterface)
addressLogger.Warn().Err(err).Msgf("Can't get addresses for %v", netInterface)
continue
}
wg.Add(1)
go func() {
go func(addressLogger zerolog.Logger) {
defer func() { wg.Done() }()
for ip := range addresses {
log.Trace().Msgf("Handling found ip %v", ip)
d.transportInstanceCreationQueue.Submit(ctx, d.transportInstanceCreationWorkItemId.Add(1), d.createTransportInstanceDispatcher(ctx, wg, ip, tcpTransport, transportInstances, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
addressLogger.Trace().Msgf("Handling found ip %v", ip)
d.transportInstanceCreationQueue.Submit(
ctx,
d.transportInstanceCreationWorkItemId.Add(1),
d.createTransportInstanceDispatcher(
ctx,
wg,
ip,
tcpTransport,
transportInstances,
readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT,
addressLogger,
),
)
}
}()
}(addressLogger)
}
}(netInterface)
}(netInterface, interfaceLog)
}
go func() {
wg.Wait()
Expand All @@ -132,7 +149,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
return nil
}

func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance, cBusPort uint16) utils.Runnable {
func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance, cBusPort uint16, addressLogger zerolog.Logger) utils.Runnable {
wg.Add(1)
return func() {
defer wg.Done()
Expand All @@ -141,39 +158,40 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *
{
connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, cBusPort))
if err != nil {
log.Error().Err(err).Msgf("Error parsing url for lookup")
addressLogger.Error().Err(err).Msgf("Error parsing url for lookup")
return
}
connectionUrl = *connectionUrlParsed
}

transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
if err != nil {
log.Error().Err(err).Msgf("Error creating transport instance")
addressLogger.Error().Err(err).Msgf("Error creating transport instance")
return
}
log.Trace().Msgf("trying %v", connectionUrl)
addressLogger.Trace().Msgf("trying %v", connectionUrl)
err = transportInstance.ConnectWithContext(ctx)
if err != nil {
secondErr := transportInstance.ConnectWithContext(ctx)
if secondErr != nil {
log.Trace().Err(err).Msgf("Error connecting transport instance")
addressLogger.Trace().Err(err).Msgf("Error connecting transport instance")
return
}
}
log.Debug().Msgf("Adding transport instance to scan %v", transportInstance)
addressLogger.Debug().Msgf("Adding transport instance to scan %v", transportInstance)
transportInstances <- transportInstance
}
}

func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
return func() {
log.Debug().Msgf("Scanning %v", tcpTransportInstance)
transportInstanceLogger := log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
transportInstanceLogger.Debug().Msgf("Scanning %v", tcpTransportInstance)
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(tcpTransportInstance)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
log.Debug().Err(err).Msg("Error connecting")
transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
return
}

Expand All @@ -186,7 +204,7 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
// Send the search request.
if err := codec.Send(cBusMessageToServer); err != nil {
log.Debug().Err(err).Msgf("Error sending message:\n%s", cBusMessageToServer)
transportInstanceLogger.Debug().Err(err).Msgf("Error sending message:\n%s", cBusMessageToServer)
return
}
// Keep on reading responses till the timeout is done.
Expand Down Expand Up @@ -241,7 +259,7 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
// TODO: we could check for the exact response
remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
if err != nil {
log.Error().Err(err).Msg("Error creating url")
transportInstanceLogger.Error().Err(err).Msg("Error creating url")
continue
}
remoteUrl = *remoteUrlParse
Expand Down Expand Up @@ -277,6 +295,7 @@ func (d *Discoverer) extractDeviceNames(discoveryOptions ...options.WithDiscover

// addressProvider is used to make discover testable
type addressProvider interface {
fmt.Stringer
// Addrs is implemented by net.Interface#Addrs
Addrs() ([]net.Addr, error)
name() string
Expand All @@ -296,6 +315,10 @@ func (w *wrappedInterface) containedInterface() net.Interface {
return *w.Interface
}

func (w *wrappedInterface) String() string {
return w.name()
}

// allInterfaceRetriever can be exchanged in tests
var allInterfaceRetriever = func() ([]addressProvider, error) {
interfaces, err := net.Interfaces()
Expand Down

0 comments on commit 61045e6

Please sign in to comment.