From e5998906aac330bb1652d8a992c408f6b22aff47 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 15 Dec 2016 13:40:48 +0100 Subject: [PATCH] Dynamically change UDP.{ucast,mcast}_receiver_threads --- src/org/jgroups/protocols/UDP.java | 132 ++++++++++++++++++----------- 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/src/org/jgroups/protocols/UDP.java b/src/org/jgroups/protocols/UDP.java index 60d710e1e62..83626e39f36 100644 --- a/src/org/jgroups/protocols/UDP.java +++ b/src/org/jgroups/protocols/UDP.java @@ -11,6 +11,7 @@ import org.jgroups.util.SuppressLog; import org.jgroups.util.Util; +import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Method; import java.net.*; @@ -62,6 +63,9 @@ public class UDP extends TP { @Property(description="Traffic class for sending unicast and multicast datagrams. Default is 8") protected int tos=8; // valid values: 2, 4, 8 (default), 16 + protected static final String UCAST_NAME="unicast receiver"; + protected static final String MCAST_NAME="multicast receiver"; + @Property(name="mcast_addr", description="The multicast address used for sending and receiving packets", defaultValueIPv4="228.8.8.8", defaultValueIPv6="ff0e::8:8:8", systemProperty=Global.UDP_MCAST_ADDR,writable=false) @@ -98,13 +102,13 @@ public class UDP extends TP { "a datagram packet") protected long suppress_time_out_of_buffer_space=60000; - @Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " + - "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") - protected int ucast_receiver_threads=1; + // @Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " + + // "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + protected int unicast_receiver_threads=1; - @Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " + - "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") - protected int mcast_receiver_threads=1; + //@Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " + + // "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + protected int multicast_receiver_threads=1; /* --------------------------------------------- Fields ------------------------------------------------ */ @@ -192,6 +196,44 @@ public void clearDroppedMessagesCache() { suppress_log_out_of_buffer_space.getCache().clear(); } + @Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " + + "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + public void setUcastReceiverThreads(int num) { + if(unicast_receiver_threads != num) { + unicast_receiver_threads=num; + if(ucast_receivers != null) { + stopUcastReceiverThreads(); + ucast_receivers=createReceivers(unicast_receiver_threads, sock, UCAST_NAME); + startUcastReceiverThreads(); + } + } + } + + @Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " + + "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + public int getUcastReceiverThreads() { + return unicast_receiver_threads; + } + + @Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " + + "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + public void setMcastReceiverThreads(int num) { + if(multicast_receiver_threads != num) { + multicast_receiver_threads=num; + if(mcast_receivers != null) { + stopMcastReceiverThreads(); + mcast_receivers=createReceivers(multicast_receiver_threads, mcast_sock, MCAST_NAME); + startMcastReceiverThreads(); + } + } + } + + @Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " + + "If de-serialization is slow, increasing the number of receiver threads might yield better performance.") + public int getMcastReceiverThreads() { + return multicast_receiver_threads; + } + public String getInfo() { StringBuilder sb=new StringBuilder(); sb.append("group_addr=").append(mcast_group_addr.getHostName()).append(':').append(mcast_port).append("\n"); @@ -264,37 +306,23 @@ public void start() throws Exception { destroySockets(); throw ex; } - ucast_receivers=new PacketReceiver[ucast_receiver_threads]; - for(int i=0; i < ucast_receivers.length; i++) - ucast_receivers[i]=new PacketReceiver(sock, "unicast receiver", this::closeUnicastSocket); - - if(ip_mcast) { - mcast_receivers=new PacketReceiver[mcast_receiver_threads]; - for(int i=0; i < mcast_receivers.length; i++) - mcast_receivers[i]=new PacketReceiver(mcast_sock, "multicast receiver", this::closeMulticastSocket); - } + ucast_receivers=createReceivers(unicast_receiver_threads, sock, UCAST_NAME); + if(ip_mcast) + mcast_receivers=createReceivers(multicast_receiver_threads, mcast_sock, MCAST_NAME); } public void stop() { log.debug("closing sockets and stopping threads"); - stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but... - super.stop(); - } - - public void destroy() { - super.destroy(); destroySockets(); + stopThreads(); + super.stop(); } protected void handleConnect() throws Exception { startThreads(); } - protected void handleDisconnect() { - stopThreads(); - } - /*--------------------------- End of Protocol interface -------------------------- */ @@ -399,6 +427,14 @@ protected void destroySockets() { closeUnicastSocket(); } + protected PacketReceiver[] createReceivers(int num, DatagramSocket sock, String name) { + PacketReceiver[] receivers=new PacketReceiver[num]; + for(int i=0; i < num; i++) + receivers[i]=new PacketReceiver(sock, name); + return receivers; + } + + protected IpAddress createLocalAddress() { if(sock == null || sock.isClosed()) return null; @@ -573,23 +609,29 @@ protected void closeUnicastSocket() { protected void startThreads() throws Exception { - for(PacketReceiver r: ucast_receivers) - r.start(); + startUcastReceiverThreads(); + startMcastReceiverThreads(); + } + + protected void startUcastReceiverThreads() { + if(ucast_receivers != null) + for(PacketReceiver r: ucast_receivers) + r.start(); + } + + protected void startMcastReceiverThreads() { if(mcast_receivers != null) for(PacketReceiver r: mcast_receivers) r.start(); } - protected void stopThreads() { - if(mcast_receivers != null) - for(PacketReceiver r: mcast_receivers) - r.stop(); - if(ucast_receivers != null) - for(PacketReceiver r: ucast_receivers) - r.stop(); + stopMcastReceiverThreads(); + stopUcastReceiverThreads(); } + protected void stopUcastReceiverThreads() {Util.close(ucast_receivers);} + protected void stopMcastReceiverThreads() {Util.close(mcast_receivers);} protected void handleConfigEvent(Map map) { boolean set_buffers=false; @@ -616,16 +658,14 @@ protected void handleConfigEvent(Map map) { /* ----------------------------- Inner Classes ---------------------------------------- */ - public class PacketReceiver implements Runnable { + public class PacketReceiver implements Runnable, Closeable { private Thread thread=null; private final DatagramSocket receiver_socket; private final String name; - private final Runnable close_strategy; - public PacketReceiver(DatagramSocket socket, String name, Runnable close_strategy) { + public PacketReceiver(DatagramSocket socket, String name) { this.receiver_socket=socket; this.name=name; - this.close_strategy=close_strategy; } public synchronized void start() { @@ -635,17 +675,11 @@ public synchronized void start() { } } + public void close() throws IOException {stop();} + public synchronized void stop() { Thread tmp=thread; thread=null; - try { - close_strategy.run(); - } - catch(Exception e1) { - } - finally { - Util.close(receiver_socket); // second line of defense - } if(tmp != null && tmp.isAlive()) { tmp.interrupt(); @@ -665,7 +699,6 @@ public void run() { while(thread != null && Thread.currentThread().equals(thread)) { try { - // solves Android ISSUE #24748 - DatagramPacket truncated UDP in ICS if(is_android) packet.setLength(receive_buf.length); @@ -680,14 +713,13 @@ public void run() { } catch(SocketException sock_ex) { if(receiver_socket.isClosed()) { - if(log.isDebugEnabled()) log.debug("receiver socket is closed, exception=" + sock_ex); + log.debug("receiver socket is closed, exception=" + sock_ex); break; } log.error(Util.getMessage("FailedReceivingPacket"), sock_ex); } catch(Throwable ex) { - if(log.isErrorEnabled()) - log.error(Util.getMessage("FailedReceivingPacket"), ex); + log.error(Util.getMessage("FailedReceivingPacket"), ex); } } if(log.isDebugEnabled()) log.debug(name + " thread terminated");