Skip to content

Commit

Permalink
removed send and forward queues (temp change)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 12, 2012
1 parent d3486c1 commit 43a1397
Showing 1 changed file with 6 additions and 50 deletions.
56 changes: 6 additions & 50 deletions src/org/jgroups/protocols/DAISYCHAIN.java
Expand Up @@ -44,9 +44,6 @@ public class DAISYCHAIN extends Protocol {
protected int view_size=0;
protected Executor default_pool=null;
protected Executor oob_pool=null;
protected BlockingQueue<Message> send_queue;
protected BlockingQueue<Message> forward_queue;
protected volatile boolean forward=false; // flipped between true and false, to ensure fairness
protected volatile boolean running=true;

@ManagedAttribute
Expand All @@ -55,12 +52,6 @@ public class DAISYCHAIN extends Protocol {
@ManagedAttribute
public int msgs_sent=0;

@ManagedAttribute
public int getElementsInForwardQueue() {return forward_queue.size();}

@ManagedAttribute
public int getElementsInSendQueue() {return send_queue.size();}



public void resetStats() {
Expand All @@ -71,8 +62,6 @@ public void resetStats() {
public void init() throws Exception {
default_pool=getTransport().getDefaultThreadPool();
oob_pool=getTransport().getOOBThreadPool();
send_queue=new ConcurrentLinkedBlockingQueue<Message>(send_queue_size);
forward_queue=new ConcurrentLinkedBlockingQueue<Message>(forward_queue_size);
}

public void start() throws Exception {
Expand Down Expand Up @@ -103,14 +92,8 @@ public Object down(final Event evt) {
copy.setDest(next);
copy.putHeader(getId(), hdr);

try {
msgs_sent++;
send_queue.put(copy);
}
catch(InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}

msgs_sent++;

if(loopback) {
if(log.isTraceEnabled()) log.trace(new StringBuilder("looping back message ").append(msg));
Expand All @@ -124,8 +107,7 @@ public void run() {
}
});
}

return processQueues();
return down_prot.down(new Event(Event.MSG, copy));


case Event.VIEW_CHANGE:
Expand Down Expand Up @@ -161,8 +143,9 @@ public Object up(Event evt) {
copy.setDest(next);
copy.putHeader(getId(), new DaisyHeader(ttl));
msgs_forwarded++;
if(forward_queue.offer(copy)) // we don't want incoming threads to block
processQueues();
if(log.isTraceEnabled())
log.trace(local_addr + ": forwarding message to " + next + " with ttl=" + ttl);
down_prot.down(new Event(Event.MSG, copy));
}

// 2. Pass up
Expand All @@ -173,33 +156,6 @@ public Object up(Event evt) {
}


protected Object processQueues() {
int cnt=0;
while(running && cnt++ < 10000) { // cnt is a second line of defense against loops and should never be used !
try {
Message msg=forward? forward_queue.poll() : send_queue.poll();
if(msg == null) {
msg=forward? send_queue.poll() : forward_queue.poll();
if(msg == null)
continue;
}
if(log.isTraceEnabled()) {
DaisyHeader hdr=(DaisyHeader)msg.getHeader(getId());
log.trace(local_addr + ": " + (forward? " forwarding" : " sending") + " message with ttl=" + hdr.getTTL() + " to " + next);
}
return down_prot.down(new Event(Event.MSG, msg));
}
catch(Throwable t) {
log.error("failed sending message down", t);
return null;
}
finally {
forward=!forward;
}
}
return null;
}


protected void handleView(View view) {
view_size=view.size();
Expand Down

0 comments on commit 43a1397

Please sign in to comment.