Skip to content

Commit

Permalink
feat(plc4go): added SO_REUSE support for udp
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jun 23, 2022
1 parent bed9aa6 commit f219a65
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 48 deletions.
94 changes: 53 additions & 41 deletions plc4go/internal/bacnetip/Discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (

"github.com/apache/plc4x/plc4go/internal/spi"
"github.com/apache/plc4x/plc4go/internal/spi/options"
"github.com/apache/plc4x/plc4go/internal/spi/transports"
"github.com/apache/plc4x/plc4go/internal/spi/transports/udp"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
)
Expand Down Expand Up @@ -86,14 +84,6 @@ func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), d
func broadcastAndDiscover(ctx context.Context, communicationChannels []communicationChannel, whoIsLowLimit *uint, whoIsHighLimit *uint) (chan receivedBvlcMessage, error) {
incomingBVLCChannel := make(chan receivedBvlcMessage, 0)
for _, communicationChannelInstance := range communicationChannels {
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(communicationChannelInstance.unicastTransport)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
log.Warn().Err(err).Msg("Error connecting")
continue
}

// Prepare the discovery packet data
var lowLimit driverModel.BACnetContextTagUnsignedInteger
if whoIsLowLimit != nil {
Expand All @@ -111,17 +101,42 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
bvlc := driverModel.NewBVLCOriginalUnicastNPDU(npdu, 0)

// Send the search request.
if err := codec.Send(bvlc); err != nil {
wbbb := utils.NewWriteBufferByteBased()
if err := bvlc.Serialize(wbbb); err != nil {
panic(err)
}
if _, err := communicationChannelInstance.broadcastConnection.WriteTo(wbbb.GetBytes(), communicationChannelInstance.broadcastConnection.LocalAddr()); err != nil {
log.Debug().Err(err).Msg("Error sending broadcast")
}

go func(communicationChannelInstance communicationChannel) {
for {
blockingReadChan := make(chan bool, 0)
go func() {
buf := make([]byte, 4096)
n, addr, err := communicationChannelInstance.unicastConnection.ReadFrom(buf)
if err != nil {
log.Debug().Err(err).Msg("Ending unicast receive")
blockingReadChan <- false
return
}
log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
if err != nil {
log.Warn().Err(err).Msg("Could not parse bvlc")
blockingReadChan <- true
return
}
incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
blockingReadChan <- true
}()
select {
case message := <-codec.GetDefaultIncomingMessageChannel():
if incomingBvlc, ok := message.(driverModel.BVLC); ok {
// TODO: how to get the receiverd ip from that?
incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, nil}
case ok := <-blockingReadChan:
if !ok {
log.Debug().Msg("Ending reading")
return
}
log.Trace().Msg("Received something")
case <-ctx.Done():
log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
return
Expand All @@ -136,18 +151,25 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
buf := make([]byte, 4096)
n, addr, err := communicationChannelInstance.broadcastConnection.ReadFrom(buf)
if err != nil {
panic(err)
log.Debug().Err(err).Msg("Ending unicast receive")
blockingReadChan <- false
return
}
log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
if err != nil {
panic(err)
log.Warn().Err(err).Msg("Could not parse bvlc")
blockingReadChan <- true
}
incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
blockingReadChan <- true
}()
select {
case <-blockingReadChan:
case ok := <-blockingReadChan:
if !ok {
log.Debug().Msg("Ending reading")
return
}
log.Trace().Msg("Received something")
case <-ctx.Done():
log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
Expand Down Expand Up @@ -206,7 +228,6 @@ func handleIncomingBVLCs(ctx context.Context, callback func(event apiModel.PlcDi
}

func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (communicationChannels []communicationChannel, err error) {
udpTransport := udp.NewTransport()
// Iterate over all network devices of this system.
for _, networkInterface := range interfaces {
unicastInterfaceAddress, err := networkInterface.Addrs()
Expand Down Expand Up @@ -242,37 +263,28 @@ func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (c
continue
}

_, cidr, _ := net.ParseCIDR(unicastAddress.String())
broadcastAddr := netaddr.BroadcastAddr(cidr)
udpAddr := &net.UDPAddr{IP: broadcastAddr, Port: bacNetPort}
connectionUrl, err := url.Parse(fmt.Sprintf("udp://%s", udpAddr))
if err != nil {
log.Debug().Err(err).Msg("error parsing url")
continue
}
localAddr := &net.UDPAddr{IP: ipAddr, Port: bacNetPort}
transportInstance, err :=
udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, localAddr)
// Handle undirected
unicastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", ipAddr, bacNetPort))
if err != nil {
return nil, errors.Wrap(err, "error creating transport instance")
}
if err := transportInstance.Connect(); err != nil {
log.Warn().Err(err).Msgf("Can't connect to %v", localAddr)
log.Debug().Err(err).Msg("Error building unicast Port")
continue
}

_, cidr, _ := net.ParseCIDR(unicastAddress.String())
broadcastAddr := netaddr.BroadcastAddr(cidr)
// Handle undirected
pc, err := reuseport.ListenPacket("udp4", broadcastAddr.String()+":47808")
broadcastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", broadcastAddr, bacNetPort))
if err != nil {
if err := transportInstance.Close(); err != nil {
if err := unicastConnection.Close(); err != nil {
log.Debug().Err(err).Msg("Error closing transport instance")
}
return nil, err
log.Debug().Err(err).Msg("Error building broadcast Port")
continue
}
communicationChannels = append(communicationChannels, communicationChannel{
networkInterface: networkInterface,
unicastTransport: transportInstance,
broadcastConnection: pc,
unicastConnection: unicastConnection,
broadcastConnection: broadcastConnection,
})
}
}
Expand All @@ -286,12 +298,12 @@ type receivedBvlcMessage struct {

type communicationChannel struct {
networkInterface net.Interface
unicastTransport transports.TransportInstance
unicastConnection net.PacketConn
broadcastConnection net.PacketConn
}

func (c communicationChannel) Close() error {
_ = c.unicastTransport.Close()
_ = c.unicastConnection.Close()
_ = c.broadcastConnection.Close()
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions plc4go/internal/bacnetip/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
options["defaultUdpPort"] = []string{"47808"}
// Set so_reuse by default
if _, ok := options["so-reuse"]; !ok {
options["so-reuse"] = []string{"true"}
}
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
Expand Down
36 changes: 29 additions & 7 deletions plc4go/internal/spi/transports/udp/Transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bufio"
"github.com/apache/plc4x/plc4go/internal/spi/transports"
"github.com/apache/plc4x/plc4go/internal/spi/utils"
"github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"net"
"net/url"
Expand Down Expand Up @@ -80,39 +81,47 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
}
var connectTimeout uint32 = 1000
if val, ok := options["connect-timeout"]; ok {
parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32)
if err != nil {
if parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32); err != nil {
return nil, errors.Wrap(err, "error setting connect-timeout")
} else {
connectTimeout = uint32(parsedConnectTimeout)
}
}

// TODO: get reuse option from options
var soReUse bool
if val, ok := options["so-reuse"]; ok {
if parseBool, err := strconv.ParseBool(val[0]); err != nil {
return nil, errors.Wrap(err, "error setting so-reuse")
} else {
soReUse = parseBool
}
}

// Potentially resolve the ip address, if a hostname was provided
remoteAddress, err := net.ResolveUDPAddr("udp", remoteAddressString+":"+strconv.Itoa(remotePort))
if err != nil {
return nil, errors.Wrap(err, "error resolving typ address")
}

return NewTransportInstance(localAddress, remoteAddress, connectTimeout, &m), nil
return NewTransportInstance(localAddress, remoteAddress, connectTimeout, soReUse, &m), nil
}

type TransportInstance struct {
LocalAddress *net.UDPAddr
RemoteAddress *net.UDPAddr
ConnectTimeout uint32
SoReUse bool
transport *Transport
udpConn *net.UDPConn
reader *bufio.Reader
}

func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, soReUse bool, transport *Transport) *TransportInstance {
return &TransportInstance{
LocalAddress: localAddress,
RemoteAddress: remoteAddress,
ConnectTimeout: connectTimeout,
SoReUse: soReUse,
transport: transport,
}
}
Expand All @@ -134,8 +143,21 @@ func (m *TransportInstance) Connect() error {

// "connect" to the remote
var err error
if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
return errors.Wrap(err, "error connecting to remote address")
if m.SoReUse {
if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
return errors.Wrap(err, "error connecting to remote address")
}
rawConn, err := m.udpConn.SyscallConn()
if err != nil {
return errors.Wrap(err, "Error getting syscall connection")
}
if err := reuseport.Control("", "", rawConn); err != nil {
return errors.Wrap(err, "Error setting re-use control")
}
} else {
if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
return errors.Wrap(err, "error connecting to remote address")
}
}

// TODO: Start a worker that uses m.udpConn.ReadFromUDP() to fill a buffer
Expand Down

0 comments on commit f219a65

Please sign in to comment.