Skip to content

Commit

Permalink
removed DIAG_GROUP
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 20, 2006
1 parent 0e185a1 commit e363e99
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 106 deletions.
120 changes: 16 additions & 104 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* The {@link #receive(Address, Address, byte[], int, int)} method must
* be called by subclasses when a unicast or multicast message has been received.
* @author Bela Ban
* @version $Id: TP.java,v 1.96 2006/12/20 16:28:56 belaban Exp $
* @version $Id: TP.java,v 1.97 2006/12/20 16:42:04 belaban Exp $
*/
public abstract class TP extends Protocol {

Expand Down Expand Up @@ -583,11 +583,11 @@ public void stop() {
// 3. Stop the thread pools

if(oob_thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool((ThreadPoolExecutor)oob_thread_pool, "oob_thread_pool");
shutdownThreadPool((ThreadPoolExecutor)oob_thread_pool);
}

if(thread_pool instanceof ThreadPoolExecutor) {
shutdownThreadPool((ThreadPoolExecutor)thread_pool, "thread_pool");
shutdownThreadPool((ThreadPoolExecutor)thread_pool);
}
}

Expand Down Expand Up @@ -1003,11 +1003,8 @@ protected final void receive(Address dest, Address sender, byte[] data, int offs
}

try {
// determine whether OOB or not by looking at first byte of 'data'
boolean oob=false;


// todo: determine whether OOB or not by looking at first byte of 'data'

byte oob_flag=data[Global.SHORT_SIZE]; // we need to skip the first 2 bytes (version)
if((oob_flag & OOB) == OOB)
oob=true;
Expand Down Expand Up @@ -1158,8 +1155,7 @@ private void handleIncomingMessage(Message msg) {

// Discard if message's group name is not the same as our group name unless the
// message is a diagnosis message (special group name DIAG_GROUP)
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
!ch_name.equals(Util.DIAG_GROUP)) {
if(channel_name != null && !channel_name.equals(ch_name)) {
if(warn)
log.warn(new StringBuffer("discarded message from different group \"").append(ch_name).
append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
Expand Down Expand Up @@ -1505,7 +1501,7 @@ else if(rejection_policy.equals("discardoldest"))
}


private void shutdownThreadPool(ThreadPoolExecutor thread_pool, String name) {
private void shutdownThreadPool(ThreadPoolExecutor thread_pool) {
thread_pool.shutdownNow();
try {
thread_pool.awaitTermination(POOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -1537,9 +1533,9 @@ class IncomingPacket implements Runnable {

/** Code copied from handleIncomingPacket */
public void run() {
short version;
boolean is_message_list, multicast;
byte flags;
short version;
boolean is_message_list, multicast;
byte flags;
ExposedByteArrayInputStream in_stream=null;
ExposedBufferedInputStream buf_in_stream=null;
DataInputStream dis=null;
Expand Down Expand Up @@ -1598,9 +1594,6 @@ public void run() {


private void handleMyMessage(Message msg, boolean multicast) {
Event evt;
TpHeader hdr;

if(stats) {
num_msgs_received++;
num_bytes_received+=msg.getLength();
Expand All @@ -1611,27 +1604,12 @@ private void handleMyMessage(Message msg, boolean multicast) {
return; // drop message that was already looped back and delivered
}

evt=new Event(Event.MSG, msg);
if(trace) {
StringBuffer sb=new StringBuffer("message is ").append(msg).append(", headers are ").append(msg.getHeaders());
log.trace(sb);
}

/* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
This allows e.g. PerfObserver to get the time of reception of a message */
if(observer != null)
observer.up(evt);

hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
TpHeader hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
if(hdr != null) {

/* Discard all messages destined for a channel with a different name */
String ch_name=hdr.channel_name;

// Discard if message's group name is not the same as our group name unless the
// message is a diagnosis message (special group name DIAG_GROUP)
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
!ch_name.equals(Util.DIAG_GROUP)) {
// Discard if message's group name is not the same as our group name
if(channel_name != null && !channel_name.equals(ch_name)) {
if(warn)
log.warn(new StringBuffer("discarded message from different group \"").append(ch_name).
append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
Expand All @@ -1644,55 +1622,8 @@ private void handleMyMessage(Message msg, boolean multicast) {
append(", headers are ").append(msg.getHeaders()).append(", will be discarded"));
return;
}
passUp(evt);
}


private void submitToThreadPool(Message msg, boolean multicast) {
Address src=msg.getSrc();
if(loopback && multicast && src != null && local_addr.equals(src)) {
return; // drop message that was already looped back and delivered
}
boolean oob_flag=msg.isFlagSet(Message.OOB);
if(oob_flag) {
num_oob_msgs_received++;

oob_thread_pool.execute(new IncomingMessage(msg));
}
else {
num_incoming_msgs_received++;
thread_pool.execute(new IncomingMessage(msg));
}
}
}


public class IncomingMessage implements Runnable {
Message msg;

public IncomingMessage(Message msg) {
this.msg=msg;
}

public int getLength() {
return msg.getLength();
}

public Message getMessage() {
return msg;
}

/** Code copied from handleIncomingMessage */
public void run() {
Event evt;
TpHeader hdr;

if(stats) {
num_msgs_received++;
num_bytes_received+=msg.getLength();
}

evt=new Event(Event.MSG, msg);
Event evt=new Event(Event.MSG, msg);
if(trace) {
StringBuffer sb=new StringBuffer("message is ").append(msg).append(", headers are ").append(msg.getHeaders());
log.trace(sb);
Expand All @@ -1703,35 +1634,16 @@ public void run() {
if(observer != null)
observer.up(evt);

hdr=(TpHeader)msg.getHeader(name); // replaced removeHeader() with getHeader()
if(hdr != null) {

/* Discard all messages destined for a channel with a different name */
String ch_name=hdr.channel_name;

// Discard if message's group name is not the same as our group name unless the
// message is a diagnosis message (special group name DIAG_GROUP)
if(ch_name != null && channel_name != null && !channel_name.equals(ch_name) &&
!ch_name.equals(Util.DIAG_GROUP)) {
if(warn)
log.warn(new StringBuffer("discarded message from different group \"").append(ch_name).
append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc()));
return;
}
}
else {
if(trace)
log.trace(new StringBuffer("message does not have a transport header, msg is ").append(msg).
append(", headers are ").append(msg.getHeaders()).append(", will be discarded"));
return;
}
passUp(evt);
}


}





/**
* This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
* to the higher layer (done in handleIncomingUdpPacket()).
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* Collection of various utility routines that can not be assigned to other classes.
* @author Bela Ban
* @version $Id: Util.java,v 1.108 2006/12/15 17:04:11 belaban Exp $
* @version $Id: Util.java,v 1.109 2006/12/20 16:41:20 belaban Exp $
*/
public class Util {
private static final ByteArrayOutputStream out_stream=new ByteArrayOutputStream(512);
Expand All @@ -51,7 +51,6 @@ public class Util {

// constants
public static final int MAX_PORT=65535; // highest port allocatable
public static final String DIAG_GROUP="DIAG_GROUP-BELA-322649"; // unique
static boolean resolve_dns=false;

static boolean JGROUPS_COMPAT=false;
Expand Down

0 comments on commit e363e99

Please sign in to comment.