Skip to content

Commit

Permalink
Dynamically change UDP.{ucast,mcast}_receiver_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 15, 2016
1 parent ae79baf commit e599890
Showing 1 changed file with 82 additions and 50 deletions.
132 changes: 82 additions & 50 deletions src/org/jgroups/protocols/UDP.java
Expand Up @@ -11,6 +11,7 @@
import org.jgroups.util.SuppressLog;
import org.jgroups.util.Util;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.*;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class UDP extends TP {
@Property(description="Traffic class for sending unicast and multicast datagrams. Default is 8")
protected int tos=8; // valid values: 2, 4, 8 (default), 16

protected static final String UCAST_NAME="unicast receiver";
protected static final String MCAST_NAME="multicast receiver";

@Property(name="mcast_addr", description="The multicast address used for sending and receiving packets",
defaultValueIPv4="228.8.8.8", defaultValueIPv6="ff0e::8:8:8",
systemProperty=Global.UDP_MCAST_ADDR,writable=false)
Expand Down Expand Up @@ -98,13 +102,13 @@ public class UDP extends TP {
"a datagram packet")
protected long suppress_time_out_of_buffer_space=60000;

@Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
protected int ucast_receiver_threads=1;
// @Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " +
// "If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
protected int unicast_receiver_threads=1;

@Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
protected int mcast_receiver_threads=1;
//@Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " +
// "If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
protected int multicast_receiver_threads=1;


/* --------------------------------------------- Fields ------------------------------------------------ */
Expand Down Expand Up @@ -192,6 +196,44 @@ public void clearDroppedMessagesCache() {
suppress_log_out_of_buffer_space.getCache().clear();
}

@Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
public void setUcastReceiverThreads(int num) {
if(unicast_receiver_threads != num) {
unicast_receiver_threads=num;
if(ucast_receivers != null) {
stopUcastReceiverThreads();
ucast_receivers=createReceivers(unicast_receiver_threads, sock, UCAST_NAME);
startUcastReceiverThreads();
}
}
}

@Property(description="Number of unicast receiver threads, all reading from the same DatagramSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
public int getUcastReceiverThreads() {
return unicast_receiver_threads;
}

@Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
public void setMcastReceiverThreads(int num) {
if(multicast_receiver_threads != num) {
multicast_receiver_threads=num;
if(mcast_receivers != null) {
stopMcastReceiverThreads();
mcast_receivers=createReceivers(multicast_receiver_threads, mcast_sock, MCAST_NAME);
startMcastReceiverThreads();
}
}
}

@Property(description="Number of multicast receiver threads, all reading from the same MulticastSocket. " +
"If de-serialization is slow, increasing the number of receiver threads might yield better performance.")
public int getMcastReceiverThreads() {
return multicast_receiver_threads;
}

public String getInfo() {
StringBuilder sb=new StringBuilder();
sb.append("group_addr=").append(mcast_group_addr.getHostName()).append(':').append(mcast_port).append("\n");
Expand Down Expand Up @@ -264,37 +306,23 @@ public void start() throws Exception {
destroySockets();
throw ex;
}
ucast_receivers=new PacketReceiver[ucast_receiver_threads];
for(int i=0; i < ucast_receivers.length; i++)
ucast_receivers[i]=new PacketReceiver(sock, "unicast receiver", this::closeUnicastSocket);

if(ip_mcast) {
mcast_receivers=new PacketReceiver[mcast_receiver_threads];
for(int i=0; i < mcast_receivers.length; i++)
mcast_receivers[i]=new PacketReceiver(mcast_sock, "multicast receiver", this::closeMulticastSocket);
}
ucast_receivers=createReceivers(unicast_receiver_threads, sock, UCAST_NAME);
if(ip_mcast)
mcast_receivers=createReceivers(multicast_receiver_threads, mcast_sock, MCAST_NAME);
}


public void stop() {
log.debug("closing sockets and stopping threads");
stopThreads(); // will close sockets, closeSockets() is not really needed anymore, but...
super.stop();
}

public void destroy() {
super.destroy();
destroySockets();
stopThreads();
super.stop();
}

protected void handleConnect() throws Exception {
startThreads();
}

protected void handleDisconnect() {
stopThreads();
}

/*--------------------------- End of Protocol interface -------------------------- */


Expand Down Expand Up @@ -399,6 +427,14 @@ protected void destroySockets() {
closeUnicastSocket();
}

protected PacketReceiver[] createReceivers(int num, DatagramSocket sock, String name) {
PacketReceiver[] receivers=new PacketReceiver[num];
for(int i=0; i < num; i++)
receivers[i]=new PacketReceiver(sock, name);
return receivers;
}


protected IpAddress createLocalAddress() {
if(sock == null || sock.isClosed())
return null;
Expand Down Expand Up @@ -573,23 +609,29 @@ protected void closeUnicastSocket() {


protected void startThreads() throws Exception {
for(PacketReceiver r: ucast_receivers)
r.start();
startUcastReceiverThreads();
startMcastReceiverThreads();
}

protected void startUcastReceiverThreads() {
if(ucast_receivers != null)
for(PacketReceiver r: ucast_receivers)
r.start();
}

protected void startMcastReceiverThreads() {
if(mcast_receivers != null)
for(PacketReceiver r: mcast_receivers)
r.start();
}


protected void stopThreads() {
if(mcast_receivers != null)
for(PacketReceiver r: mcast_receivers)
r.stop();
if(ucast_receivers != null)
for(PacketReceiver r: ucast_receivers)
r.stop();
stopMcastReceiverThreads();
stopUcastReceiverThreads();
}

protected void stopUcastReceiverThreads() {Util.close(ucast_receivers);}
protected void stopMcastReceiverThreads() {Util.close(mcast_receivers);}

protected void handleConfigEvent(Map<String,Object> map) {
boolean set_buffers=false;
Expand All @@ -616,16 +658,14 @@ protected void handleConfigEvent(Map<String,Object> map) {
/* ----------------------------- Inner Classes ---------------------------------------- */


public class PacketReceiver implements Runnable {
public class PacketReceiver implements Runnable, Closeable {
private Thread thread=null;
private final DatagramSocket receiver_socket;
private final String name;
private final Runnable close_strategy;

public PacketReceiver(DatagramSocket socket, String name, Runnable close_strategy) {
public PacketReceiver(DatagramSocket socket, String name) {
this.receiver_socket=socket;
this.name=name;
this.close_strategy=close_strategy;
}

public synchronized void start() {
Expand All @@ -635,17 +675,11 @@ public synchronized void start() {
}
}

public void close() throws IOException {stop();}

public synchronized void stop() {
Thread tmp=thread;
thread=null;
try {
close_strategy.run();
}
catch(Exception e1) {
}
finally {
Util.close(receiver_socket); // second line of defense
}

if(tmp != null && tmp.isAlive()) {
tmp.interrupt();
Expand All @@ -665,7 +699,6 @@ public void run() {

while(thread != null && Thread.currentThread().equals(thread)) {
try {

// solves Android ISSUE #24748 - DatagramPacket truncated UDP in ICS
if(is_android)
packet.setLength(receive_buf.length);
Expand All @@ -680,14 +713,13 @@ public void run() {
}
catch(SocketException sock_ex) {
if(receiver_socket.isClosed()) {
if(log.isDebugEnabled()) log.debug("receiver socket is closed, exception=" + sock_ex);
log.debug("receiver socket is closed, exception=" + sock_ex);
break;
}
log.error(Util.getMessage("FailedReceivingPacket"), sock_ex);
}
catch(Throwable ex) {
if(log.isErrorEnabled())
log.error(Util.getMessage("FailedReceivingPacket"), ex);
log.error(Util.getMessage("FailedReceivingPacket"), ex);
}
}
if(log.isDebugEnabled()) log.debug(name + " thread terminated");
Expand Down

0 comments on commit e599890

Please sign in to comment.