Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

- Added ADD_TO_XMIT_TABLE

- Handling of the above event in NAKACK
  • Loading branch information...
commit cb6916007d05785236391c103d7f6c3e5e2572e6 1 parent df490e7
Bela Ban authored November 24, 2011
2  src/org/jgroups/Event.java
@@ -56,6 +56,7 @@
56 56
     public static final int LOCK_AWAIT                         = 98; // arg=LockInfo
57 57
     public static final int LOCK_SIGNAL                        = 99; // arg=AwaitInfo
58 58
     public static final int IS_MERGE_IN_PROGRESS               = 100; // returns true or false
  59
+    public static final int ADD_TO_XMIT_TABLE                  = 101; // arg=Message
59 60
 
60 61
 
61 62
     public static final int USER_DEFINED                       = 1000; // arg = <user def., e.g. evt type + data>
@@ -137,6 +138,7 @@ public static String type2String(int t) {
137 138
             case LOCK_AWAIT:             return "LOCK_AWAIT";
138 139
             case LOCK_SIGNAL:            return "LOCK_SIGNAL";
139 140
             case IS_MERGE_IN_PROGRESS:   return "IS_MERGE_IN_PROGRESS";
  141
+            case ADD_TO_XMIT_TABLE:      return "ADD_TO_XMIT_TABLE";
140 142
             
141 143
             case USER_DEFINED:           return "USER_DEFINED";
142 144
             default:                     return "UNDEFINED(" + t + ")";
25  src/org/jgroups/protocols/pbcast/NAKACK.java
@@ -523,6 +523,15 @@ public Object down(Event evt) {
523 523
                     }
524 524
                 }
525 525
                 return null;
  526
+
  527
+            case Event.ADD_TO_XMIT_TABLE:
  528
+                msg=(Message)evt.getArg();
  529
+                dest=msg.getDest();
  530
+                if(dest != null || msg.isFlagSet(Message.NO_RELIABILITY))
  531
+                    return null; // unicast address: not null and not mcast, pass down unchanged
  532
+
  533
+                send(evt, msg, false); // add to retransmit window, but don't send (we want to avoid the unneeded traffic)
  534
+                return null;    // don't pass down the stack
526 535
         }
527 536
 
528 537
         return down_prot.down(evt);
@@ -548,7 +557,7 @@ public Object up(Event evt) {
548 557
 
549 558
             if(!is_server) { // discard messages while not yet server (i.e., until JOIN has returned)
550 559
                 if(log.isTraceEnabled())
551  
-                    log.trace("message " + msg.getSrc() + "::" + hdr.seqno + " was discarded (not yet server)");
  560
+                    log.trace(local_addr + ": message " + msg.getSrc() + "::" + hdr.seqno + " was discarded (not yet server)");
552 561
                 return null;
553 562
             }
554 563
 
@@ -611,13 +620,13 @@ public Object up(Event evt) {
611 620
      * Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to
612 621
      * sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006
613 622
      */
614  
-    private void send(Event evt, Message msg) {
  623
+    protected void send(Event evt, Message msg, boolean pass_down) {
615 624
         if(msg == null)
616 625
             throw new NullPointerException("msg is null; event is " + evt);
617 626
 
618 627
         if(!running) {
619 628
             if(log.isTraceEnabled())
620  
-                log.trace("[" + local_addr + "] discarded message as we're not in the 'running' state, message: " + msg);
  629
+                log.trace(local_addr + ": discarded message as we're not in the 'running' state, message: " + msg);
621 630
             return;
622 631
         }
623 632
 
@@ -648,6 +657,9 @@ private void send(Event evt, Message msg) {
648 657
             seqno_lock.unlock();
649 658
         }
650 659
 
  660
+        if(!pass_down)
  661
+            return;
  662
+        
651 663
         try { // moved down_prot.down() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300
652 664
             if(log.isTraceEnabled())
653 665
                 log.trace("sending " + local_addr + "#" + msg_id);
@@ -660,6 +672,9 @@ private void send(Event evt, Message msg) {
660 672
         }
661 673
     }
662 674
 
  675
+    protected void send(Event evt, Message msg) {
  676
+        send(evt, msg, true);
  677
+    }
663 678
 
664 679
 
665 680
     /**
@@ -1086,9 +1101,9 @@ private void setDigest(Digest digest, boolean merge) {
1086 1101
         if(digest == null)
1087 1102
             return;
1088 1103
 
1089  
-        StringBuilder sb=new StringBuilder(merge? "\n[mergeDigest()]\n" : "\n[setDigest()]\n");
  1104
+        StringBuilder sb=new StringBuilder(merge? "\n[" + local_addr + " mergeDigest()]\n" : "\n["+local_addr + " setDigest()]\n");
1090 1105
         sb.append("existing digest:  " + getDigest()).append("\nnew digest:       " + digest);
1091  
-
  1106
+        
1092 1107
         boolean set_own_seqno=false;
1093 1108
         for(Digest.DigestEntry entry: digest) {
1094 1109
             Address member=entry.getMember();

0 notes on commit cb69160

Please sign in to comment.
Something went wrong with that request. Please try again.