Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'JGRP-1581'

  • Loading branch information...
commit b6250a47ae61fbe40fba033d5b9754f983b7ee93 2 parents 81591ff + 1254c88
Bela Ban authored February 27, 2013

Showing 65 changed files with 1,448 additions and 891 deletions. Show diff stats Hide diff stats

  1. 2  conf/fast-local.xml
  2. 9  doc/manual/en/modules/advanced.xml
  3. 14  src/org/jgroups/Message.java
  4. 30  src/org/jgroups/demos/Draw.java
  5. 2  src/org/jgroups/demos/ProgrammaticChat.java
  6. 277  src/org/jgroups/protocols/ENCRYPT.java
  7. 84  src/org/jgroups/protocols/FC.java
  8. 95  src/org/jgroups/protocols/FRAG.java
  9. 120  src/org/jgroups/protocols/FRAG2.java
  10. 89  src/org/jgroups/protocols/FlowControl.java
  11. 6  src/org/jgroups/protocols/FragHeader.java
  12. 82  src/org/jgroups/protocols/RELAY.java
  13. 27  src/org/jgroups/protocols/RSVP.java
  14. 8  src/org/jgroups/protocols/SCOPE.java
  15. 28  src/org/jgroups/protocols/SEQUENCER.java
  16. 90  src/org/jgroups/protocols/STOMP.java
  17. 199  src/org/jgroups/protocols/UNICAST.java
  18. 48  src/org/jgroups/protocols/UNICAST2.java
  19. 51  src/org/jgroups/protocols/UNICAST3.java
  20. 76  src/org/jgroups/protocols/pbcast/FLUSH.java
  21. 40  src/org/jgroups/protocols/pbcast/NAKACK2.java
  22. 185  src/org/jgroups/protocols/pbcast/STABLE.java
  23. 20  src/org/jgroups/protocols/pbcast/STATE.java
  24. 29  src/org/jgroups/protocols/relay/RELAY2.java
  25. 24  src/org/jgroups/protocols/rules/SUPERVISOR.java
  26. 12  src/org/jgroups/util/BlockingInputStream.java
  27. 46  src/org/jgroups/util/MessageBatch.java
  28. 10  src/org/jgroups/util/Util.java
  29. 4  tests/byteman/org/jgroups/tests/byteman/BecomeServerTest.java
  30. 4  tests/byteman/org/jgroups/tests/byteman/ForwardToCoordFailoverTest.java
  31. 4  tests/byteman/org/jgroups/tests/byteman/MessageBeforeConnectedTest.java
  32. 6  tests/byteman/org/jgroups/tests/helpers/MessageBeforeConnectedTestHelper.java
  33. 6  tests/junit-functional/org/jgroups/protocols/FORWARD_TO_COORD_Test.java
  34. 12  tests/junit-functional/org/jgroups/protocols/NAKACK2_RetransmitTest.java
  35. 75  tests/junit-functional/org/jgroups/protocols/NAKACK_Delivery_Test.java
  36. 60  tests/junit-functional/org/jgroups/protocols/NAKACK_StressTest.java
  37. 2  tests/junit-functional/org/jgroups/protocols/SUPERVISOR_Test.java
  38. 2  tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
  39. 2  tests/junit-functional/org/jgroups/protocols/UNICAST_ContentionTest.java
  40. 91  tests/junit-functional/org/jgroups/tests/BlockingInputStreamTest.java
  41. 45  tests/junit-functional/org/jgroups/tests/FCTest.java
  42. 4  tests/junit-functional/org/jgroups/tests/FdMonitorTest.java
  43. 2  tests/junit-functional/org/jgroups/tests/FragTest.java
  44. 8  tests/junit-functional/org/jgroups/tests/LargeMergeTest.java
  45. 4  tests/junit-functional/org/jgroups/tests/MergeTest2.java
  46. 2  tests/junit-functional/org/jgroups/tests/MergeTest3.java
  47. 55  tests/junit-functional/org/jgroups/tests/MessageBatchTest.java
  48. 2  tests/junit-functional/org/jgroups/tests/MessageDispatcherRSVPTest.java
  49. 2  tests/junit-functional/org/jgroups/tests/NakackTest.java
  50. 2  tests/junit-functional/org/jgroups/tests/OrderingTest.java
  51. 2  tests/junit-functional/org/jgroups/tests/ProgrammaticApiTest.java
  52. 9  tests/junit-functional/org/jgroups/tests/RSVPTest.java
  53. 6  tests/junit-functional/org/jgroups/tests/Relay2Test.java
  54. 4  tests/junit-functional/org/jgroups/tests/RpcDispatcherAsyncInvocationTest.java
  55. 127  tests/junit/org/jgroups/tests/FlushCloseOpenTest.java
  56. 2  tests/junit/org/jgroups/tests/GossipRouterTest.java
  57. 13  tests/junit/org/jgroups/tests/LargeStateTransferTest.java
  58. 11  tests/junit/org/jgroups/tests/ReconciliationTest.java
  59. 50  tests/junit/org/jgroups/tests/SCOPE_Test.java
  60. 2  tests/junit/org/jgroups/tests/SequencerMergeTest.java
  61. 6  tests/junit/org/jgroups/tests/StateTransferTest.java
  62. 2  tests/junit/org/jgroups/tests/TCPGOSSIP_Test.java
  63. 2  tests/junit/org/jgroups/tests/TUNNELDeadLockTest.java
  64. 2  tests/junit/org/jgroups/tests/TUNNEL_Test.java
  65. 4  tests/perf/org/jgroups/tests/perf/MPerf.java
2  conf/fast-local.xml
@@ -18,7 +18,7 @@
18 18
          mcast_recv_buf_size="25M"
19 19
          mcast_send_buf_size="640K"
20 20
          loopback="false"
21  
-         max_bundle_size="64000"
  21
+         max_bundle_size="64K"
22 22
          max_bundle_timeout="30"
23 23
          ip_ttl="${jgroups.udp.ip_ttl:0}"
24 24
          enable_diagnostics="true"
9  doc/manual/en/modules/advanced.xml
@@ -1083,6 +1083,15 @@ for(Future<RspList> future: futures) {
1083 1083
 
1084 1084
         <section id="Scopes">
1085 1085
             <title>Scopes: concurrent message delivery for messages from the same sender</title>
  1086
+
  1087
+            <note>
  1088
+                <title>Deprecated in 3.3</title>
  1089
+                <para>
  1090
+                    In 3.3, SCOPE is replaced with the <xref linkend="AsyncInvocation">Asynchronous Invocation API</xref>.
  1091
+                    SCOPE will probably be removed in 4.x.
  1092
+                </para>
  1093
+            </note>
  1094
+
1086 1095
             <para>
1087 1096
                 In the previous paragraph, we showed how the concurrent stack delivers messages from different senders
1088 1097
                 concurrently. But all (non-OOB) messages from the same sender P are delivered in the order in which
14  src/org/jgroups/Message.java
@@ -244,6 +244,13 @@ public Message(boolean create_headers) {
244 244
         }
245 245
     }
246 246
 
  247
+    /**
  248
+     * <em>
  249
+     * Note that the byte[] buffer passed as argument must not be modified. Reason: if we retransmit the
  250
+     * message, it would still have a ref to the original byte[] buffer passed in as argument, and so we would
  251
+     * retransmit a changed byte[] buffer !
  252
+     * </em>
  253
+     */
247 254
     final public Message setBuffer(byte[] b) {
248 255
         buf=b;
249 256
         if(buf != null) {
@@ -256,7 +263,12 @@ final public Message setBuffer(byte[] b) {
256 263
     }
257 264
 
258 265
     /**
259  
-     * Set the internal buffer to point to a subset of a given buffer
  266
+     * Sets the internal buffer to point to a subset of a given buffer.<p/>
  267
+     * <em>
  268
+     * Note that the byte[] buffer passed as argument must not be modified. Reason: if we retransmit the
  269
+     * message, it would still have a ref to the original byte[] buffer passed in as argument, and so we would
  270
+     * retransmit a changed byte[] buffer !
  271
+     * </em>
260 272
      *
261 273
      * @param b The reference to a given buffer. If null, we'll reset the buffer to null
262 274
      * @param offset The initial position
30  src/org/jgroups/demos/Draw.java
@@ -422,7 +422,7 @@ public void writeState(OutputStream outstream) throws IOException {
422 422
             if(state == null)
423 423
                 return;
424 424
             synchronized(state) {
425  
-                DataOutputStream dos=new DataOutputStream(new BufferedOutputStream(outstream, 4096));
  425
+                DataOutputStream dos=new DataOutputStream(new BufferedOutputStream(outstream));
426 426
                 // DataOutputStream dos=new DataOutputStream(outstream);
427 427
                 dos.writeInt(state.size());
428 428
                 for(Map.Entry<Point,Color> entry: state.entrySet()) {
@@ -433,13 +433,14 @@ public void writeState(OutputStream outstream) throws IOException {
433 433
                     dos.writeInt(col.getRGB());
434 434
                 }
435 435
                 dos.flush();
  436
+                System.out.println("wrote " + state.size() + " elements");
436 437
             }
437 438
         }
438 439
 
439 440
 
440 441
         public void readState(InputStream instream) throws IOException {
441  
-            DataInputStream in=new DataInputStream(instream);
442  
-            Map<Point,Color> new_state=new HashMap<Point,Color>();
  442
+            DataInputStream in=new DataInputStream(new BufferedInputStream(instream));
  443
+            Map<Point,Color> new_state=new LinkedHashMap<Point,Color>();
443 444
             int num=in.readInt();
444 445
             for(int i=0; i < num; i++) {
445 446
                 Point point=new Point(in.readInt(), in.readInt());
@@ -450,7 +451,7 @@ public void readState(InputStream instream) throws IOException {
450 451
             synchronized(state) {
451 452
                 state.clear();
452 453
                 state.putAll(new_state);
453  
-                System.out.println("read state: " + state.size() + " entries");
  454
+                System.out.println("read " + state.size() + " elements");
454 455
                 createOffscreenImage(true);
455 456
             }
456 457
         }
@@ -575,5 +576,26 @@ public void paintComponent(Graphics g) {
575 576
 
576 577
     }
577 578
 
  579
+
  580
+   /* protected class MyPoint extends Point implements Comparable<Point> {
  581
+        private static final long serialVersionUID=4171855995316340839L;
  582
+
  583
+        public MyPoint() {
  584
+        }
  585
+
  586
+        public MyPoint(Point p) {
  587
+            super(p);
  588
+        }
  589
+
  590
+        public MyPoint(int x, int y) {
  591
+            super(x,y);
  592
+        }
  593
+
  594
+
  595
+        public int compareTo(Point o) {
  596
+            return x > o.x? 1 : x < o.x? -1 : y > o.y? 1 : y < o.y ? -1 :0;
  597
+        }
  598
+    }*/
  599
+
578 600
 }
579 601
 
2  src/org/jgroups/demos/ProgrammaticChat.java
@@ -30,7 +30,7 @@ public static void main(String[] args) throws Exception {
30 30
                 .addProtocol(new VERIFY_SUSPECT())
31 31
                 .addProtocol(new BARRIER())
32 32
                 .addProtocol(new NAKACK2())
33  
-                .addProtocol(new UNICAST2())
  33
+                .addProtocol(new UNICAST3())
34 34
                 .addProtocol(new STABLE())
35 35
                 .addProtocol(new GMS())
36 36
                 .addProtocol(new UFC())
277  src/org/jgroups/protocols/ENCRYPT.java
@@ -6,6 +6,7 @@
6 6
 import org.jgroups.annotations.MBean;
7 7
 import org.jgroups.annotations.Property;
8 8
 import org.jgroups.stack.Protocol;
  9
+import org.jgroups.util.MessageBatch;
9 10
 import org.jgroups.util.QueueClosedException;
10 11
 import org.jgroups.util.Util;
11 12
 
@@ -447,6 +448,74 @@ public Object up(Event evt) {
447 448
         return passItUp(evt);
448 449
     }
449 450
 
  451
+
  452
+    public void up(MessageBatch batch) {
  453
+        for(Message msg: batch) {
  454
+            if(msg.getLength() == 0 && !encrypt_entire_message)
  455
+                continue;
  456
+
  457
+            EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
  458
+            if(hdr == null) {
  459
+                if(log.isTraceEnabled())
  460
+                    log.trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, " +
  461
+                                "headers are " + msg.printHeaders());
  462
+                batch.remove(msg);
  463
+                continue;
  464
+            }
  465
+
  466
+            switch(hdr.getType()) {
  467
+                case EncryptHeader.ENCRYPT:
  468
+                    // if msg buffer is empty, and we didn't encrypt the entire message, just pass up
  469
+                    if(!hdr.encrypt_entire_msg && msg.getLength() == 0)
  470
+                        break;
  471
+
  472
+                    // if queueing then pass into queue to be dealt with later
  473
+                    if(queue_up) {
  474
+                        if(log.isTraceEnabled())
  475
+                            log.trace("queueing up message as no session key established: " + msg);
  476
+                        try {
  477
+                            upMessageQueue.put(msg);
  478
+                        }
  479
+                        catch(InterruptedException e) {
  480
+                        }
  481
+                    }
  482
+                    else {
  483
+                        // make sure we pass up any queued messages first
  484
+                        // could be more optimised but this can wait we only need this if not using supplied key
  485
+                        if(!suppliedKey) {
  486
+                            try {
  487
+                                drainUpQueue();
  488
+                            }
  489
+                            catch(Exception e) {
  490
+                                log.error("failed draining up queue", e);
  491
+                            }
  492
+                        }
  493
+
  494
+                        // try and decrypt the message - we need to copy msg as we modify its
  495
+                        // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
  496
+                        try {
  497
+                            Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
  498
+                            if(tmpMsg != null)
  499
+                                batch.replace(msg, tmpMsg);
  500
+                            else
  501
+                                log.warn("Unrecognised cipher discarding message");
  502
+                        }
  503
+                        catch(Exception e) {
  504
+                            log.error("failed decrypting message", e);
  505
+                        }
  506
+                    }
  507
+                    break;
  508
+                default:
  509
+                    batch.remove(msg); // a control message will get handled by ENCRYPT and should not be passed up
  510
+                    handleUpEvent(msg, hdr);
  511
+                    break;
  512
+            }
  513
+        }
  514
+
  515
+        if(!batch.isEmpty())
  516
+            up_prot.up(batch);
  517
+    }
  518
+
450 519
     public Object passItUp(Event evt) {
451 520
         if(observer != null)
452 521
             observer.passUp(evt);
@@ -516,20 +585,10 @@ private void handleNewKeyServer(Address newKeyServer) {
516 585
         sendKeyRequest();
517 586
     }
518 587
 
519  
-    /**
520  
-     * @param evt
521  
-     */
  588
+
522 589
     private void handleUpMessage(Event evt) throws Exception {
523 590
         Message msg=(Message)evt.getArg();
524  
-
525  
-        if(msg == null) {
526  
-            if(log.isTraceEnabled())
527  
-                log.trace("null message - passing straight up");
528  
-            passItUp(evt);
529  
-            return;
530  
-        }
531  
-
532  
-        if(msg.getLength() == 0 && !encrypt_entire_message) {
  591
+        if(msg == null || (msg.getLength() == 0 && !encrypt_entire_message)) {
533 592
             passItUp(evt);
534 593
             return;
535 594
         }
@@ -537,7 +596,7 @@ private void handleUpMessage(Event evt) throws Exception {
537 596
         EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
538 597
         if(hdr == null) {
539 598
             if(log.isTraceEnabled())
540  
-                log.trace("dropping message as ENCRYPT header is null  or has not been recognized, msg will not be passed up, " +
  599
+                log.trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, " +
541 600
                             "headers are " + msg.printHeaders());
542 601
             return;
543 602
         }
@@ -545,100 +604,104 @@ private void handleUpMessage(Event evt) throws Exception {
545 604
         if(log.isTraceEnabled())
546 605
             log.trace("header received " + hdr);
547 606
 
548  
-        // if a normal message try and decrypt it
549  
-        if(hdr.getType() == EncryptHeader.ENCRYPT) {
550  
-            // if msg buffer is empty, and we didn't encrypt the entire message, just pass up
551  
-            if(!hdr.encrypt_entire_msg && msg.getLength() == 0) {
552  
-                if(log.isTraceEnabled())
553  
-                    log.trace("passing up message as it has an empty buffer ");
554  
-                passItUp(evt);
555  
-                return;
556  
-            }
  607
+        switch(hdr.getType()) {
  608
+            case EncryptHeader.ENCRYPT:
  609
+                handleEncryptedMessage(msg, evt, hdr);
  610
+                break;
  611
+            default:
  612
+                handleUpEvent(msg, hdr);
  613
+                break;
  614
+        }
  615
+    }
557 616
 
558  
-            // if queueing then pass into queue to be dealt with later
559  
-            if(queue_up) {
560  
-                if(log.isTraceEnabled())
561  
-                    log.trace("queueing up message as no session key established: " + msg);
562  
-                upMessageQueue.put(msg);
563  
-            }
564  
-            else {
565  
-                // make sure we pass up any queued messages first
566  
-                // could be more optimised but this can wait we only need this if not using supplied key
567  
-                if(!suppliedKey) {
568  
-                    drainUpQueue();
569  
-                }
570  
-                // try and decrypt the message - we need to copy msg as we modify its
571  
-                // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
572  
-                Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
573  
-                if(tmpMsg != null) {
574  
-                    if(log.isTraceEnabled())
575  
-                        log.trace("decrypted message " + tmpMsg);
576  
-                    passItUp(new Event(Event.MSG, tmpMsg));
577  
-                }
578  
-                else {
579  
-                    log.warn("Unrecognised cipher discarding message");
580  
-                }
581  
-            }
  617
+
  618
+
  619
+    protected void handleEncryptedMessage(Message msg, Event evt, EncryptHeader hdr) throws Exception {
  620
+        // if msg buffer is empty, and we didn't encrypt the entire message, just pass up
  621
+        if(!hdr.encrypt_entire_msg && msg.getLength() == 0) {
  622
+            if(log.isTraceEnabled())
  623
+                log.trace("passing up message as it has an empty buffer ");
  624
+            passItUp(evt);
  625
+            return;
  626
+        }
  627
+
  628
+        // if queueing then pass into queue to be dealt with later
  629
+        if(queue_up) {
  630
+            if(log.isTraceEnabled())
  631
+                log.trace("queueing up message as no session key established: " + msg);
  632
+            upMessageQueue.put(msg);
582 633
         }
583 634
         else {
584  
-            // check if we had some sort of encrypt control header if using supplied key we should not process it
585  
-            if(suppliedKey) {
586  
-                if(log.isWarnEnabled())
587  
-                    log.warn("We received an encrypt header of " + hdr.getType() + " while in configured mode");
  635
+            // make sure we pass up any queued messages first
  636
+            // could be more optimised but this can wait we only need this if not using supplied key
  637
+            if(!suppliedKey)
  638
+                drainUpQueue();
  639
+
  640
+            // try and decrypt the message - we need to copy msg as we modify its
  641
+            // buffer (http://jira.jboss.com/jira/browse/JGRP-538)
  642
+            Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
  643
+            if(tmpMsg != null) {
  644
+                if(log.isTraceEnabled())
  645
+                    log.trace("decrypted message " + tmpMsg);
  646
+                passItUp(new Event(Event.MSG, tmpMsg));
588 647
             }
589  
-            else {
590  
-                // see what sort of encrypt control message we have received
591  
-                switch(hdr.getType()) {
592  
-                    // if a key request
593  
-                    case EncryptHeader.KEY_REQUEST:
594  
-                        if(log.isDebugEnabled()) {
595  
-                            log.debug("received a key request from peer");
596  
-                        }
  648
+            else
  649
+                log.warn("Unrecognised cipher discarding message");
  650
+        }
  651
+    }
597 652
 
598  
-                        //if a key request send response key back
599  
-                        try {
600  
-                            // extract peer's public key
601  
-                            PublicKey tmpKey=generatePubKey(msg.getBuffer());
602  
-                            // send back the secret key we have
603  
-                            sendSecretKey(getSecretKey(), tmpKey, msg.getSrc());
604  
-                        }
605  
-                        catch(Exception e) {
606  
-                            log.warn("unable to reconstitute peer's public key");
607  
-                        }
608  
-                        break;
609  
-                    case EncryptHeader.SECRETKEY:
610  
-                        if(log.isDebugEnabled()) {
611  
-                            log.debug("received a secretkey response from keyserver");
612  
-                        }
  653
+    protected void handleUpEvent(Message msg, EncryptHeader hdr) {
  654
+        // check if we had some sort of encrypt control header if using supplied key we should not process it
  655
+        if(suppliedKey) {
  656
+            if(log.isWarnEnabled())
  657
+                log.warn("We received an encrypt header of " + hdr.getType() + " while in configured mode");
  658
+            return;
  659
+        }
613 660
 
614  
-                        try {
615  
-                            SecretKey tmp=decodeKey(msg.getBuffer());
616  
-                            if(tmp == null) {
617  
-                                // unable to understand response
618  
-                                // lets try again
619  
-                                sendKeyRequest();
620  
-                            }
621  
-                            else {
622  
-                                // otherwise lets set the reurned key
623  
-                                // as the shared key
624  
-                                setKeys(tmp, hdr.getVersion());
625  
-                                if(log.isDebugEnabled()) {
626  
-                                    log.debug("Decoded secretkey response");
627  
-                                }
628  
-                            }
629  
-                        }
630  
-                        catch(Exception e) {
631  
-                            log.warn("unable to process received public key");
632  
-                        }
633  
-                        break;
634  
-                    default:
635  
-                        log.warn("Received ignored encrypt header of " + hdr.getType());
636  
-                        break;
  661
+        // see what sort of encrypt control message we have received
  662
+        switch(hdr.getType()) {
  663
+            // if a key request
  664
+            case EncryptHeader.KEY_REQUEST:
  665
+                if(log.isDebugEnabled())
  666
+                    log.debug("received a key request from peer");
  667
+
  668
+                // if a key request send response key back
  669
+                try {
  670
+                    // extract peer's public key
  671
+                    PublicKey tmpKey=generatePubKey(msg.getBuffer());
  672
+                    // send back the secret key we have
  673
+                    sendSecretKey(getSecretKey(), tmpKey, msg.getSrc());
637 674
                 }
638  
-            }
  675
+                catch(Exception e) {
  676
+                    log.warn("unable to reconstitute peer's public key");
  677
+                }
  678
+                break;
  679
+            case EncryptHeader.SECRETKEY:
  680
+                if(log.isDebugEnabled())
  681
+                    log.debug("received a secretkey response from keyserver");
  682
+
  683
+                try {
  684
+                    SecretKey tmp=decodeKey(msg.getBuffer());
  685
+                    if(tmp == null)
  686
+                        sendKeyRequest(); // unable to understand response, let's try again
  687
+                    else {
  688
+                        // otherwise lets set the returned key as the shared key
  689
+                        setKeys(tmp, hdr.getVersion());
  690
+                        if(log.isDebugEnabled())
  691
+                            log.debug("Decoded secretkey response");
  692
+                    }
  693
+                }
  694
+                catch(Exception e) {
  695
+                    log.warn("unable to process received public key");
  696
+                }
  697
+                break;
  698
+            default:
  699
+                log.warn("Received ignored encrypt header of " + hdr.getType());
  700
+                break;
639 701
         }
640 702
     }
641 703
 
  704
+
642 705
     /**
643 706
      * used to drain the up queue - synchronized so we can call it safely
644 707
      * despite access from potentially two threads at once
@@ -799,8 +862,8 @@ private Message sendKeyRequest() {
799 862
         // server's public key
800 863
         Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded());
801 864
 
802  
-        newMsg.putHeader(this.id, new EncryptHeader(EncryptHeader.KEY_REQUEST, getSymVersion()));
803  
-        passItDown(new Event(Event.MSG, newMsg));
  865
+        newMsg.putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion()));
  866
+        passItDown(new Event(Event.MSG,newMsg));
804 867
         return newMsg;
805 868
     }
806 869
 
@@ -938,7 +1001,7 @@ private void sendDown(Message msg) throws Exception {
938 1001
      * @throws Exception
939 1002
      */
940 1003
     private synchronized byte[] encryptMessage(Cipher cipher, byte[] plain, int offset, int length) throws Exception {
941  
-        return cipher.doFinal(plain, offset, length);
  1004
+        return cipher.doFinal(plain,offset,length);
942 1005
     }
943 1006
 
944 1007
     private SecretKeySpec decodeKey(byte[] encodedKey) throws Exception {
@@ -1141,11 +1204,9 @@ protected void setKeyServerAddr(Address keyServerAddr) {
1141 1204
 
1142 1205
     public static class EncryptHeader extends org.jgroups.Header {
1143 1206
         short type;
1144  
-        public static final short ENCRYPT=0;
1145  
-        public static final short KEY_REQUEST=1;
1146  
-        public static final short SERVER_PUBKEY=2;
1147  
-        public static final short SECRETKEY=3;
1148  
-        public static final short SECRETKEY_READY=4;
  1207
+        public static final short ENCRYPT     = 0;
  1208
+        public static final short KEY_REQUEST = 1;
  1209
+        public static final short SECRETKEY   = 2;
1149 1210
 
1150 1211
         String version;
1151 1212
         boolean encrypt_entire_msg=false;
@@ -1153,7 +1214,6 @@ protected void setKeyServerAddr(Address keyServerAddr) {
1153 1214
         public EncryptHeader() {}
1154 1215
 
1155 1216
         public EncryptHeader(short type) {
1156  
-            //this(type, 0l);
1157 1217
             this.type=type;
1158 1218
             this.version="";
1159 1219
         }
@@ -1177,10 +1237,7 @@ public void readFrom(DataInput in) throws Exception {
1177 1237
         }
1178 1238
 
1179 1239
         public String toString() {
1180  
-            return "ENCRYPT [type=" + type
1181  
-                   + " version=\""
1182  
-                   + (version != null? version.length() + " bytes" : "n/a")
1183  
-                   + "\"]";
  1240
+            return "ENCRYPT [type=" + type + " version=\"" + (version != null? version.length() + " bytes" : "n/a") + "\"]";
1184 1241
         }
1185 1242
 
1186 1243
         public int size() {
84  src/org/jgroups/protocols/FC.java
@@ -7,6 +7,7 @@
7 7
 import org.jgroups.annotations.*;
8 8
 import org.jgroups.stack.Protocol;
9 9
 import org.jgroups.util.BoundedList;
  10
+import org.jgroups.util.MessageBatch;
10 11
 import org.jgroups.util.Util;
11 12
 
12 13
 import java.util.*;
@@ -35,8 +36,10 @@
35 36
  * <li>Receivers don't send the full credits (max_credits), but rather tha actual number of bytes received
36 37
  * <ol/>
37 38
  * @author Bela Ban
  39
+ * @deprecated Succeeded by MFC and UFC
38 40
  */
39 41
 @MBean(description="Simple flow control protocol based on a credit system")
  42
+@Deprecated
40 43
 public class FC extends Protocol {
41 44
 
42 45
     private final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH);
@@ -449,22 +452,7 @@ public Object up(Event evt) {
449 452
                     break;
450 453
                 FcHeader hdr=(FcHeader)msg.getHeader(this.id);
451 454
                 if(hdr != null) {
452  
-                    switch(hdr.type) {
453  
-                        case FcHeader.REPLENISH:
454  
-                            num_credit_responses_received++;
455  
-                            handleCredit(msg.getSrc(), (Number)msg.getObject());
456  
-                            break;
457  
-                        case FcHeader.CREDIT_REQUEST:
458  
-                            num_credit_requests_received++;
459  
-                            Address sender=msg.getSrc();
460  
-                            Long sent_credits=(Long)msg.getObject();
461  
-                            if(sent_credits != null)
462  
-                                handleCreditRequest(received, sender, sent_credits.longValue());
463  
-                            break;
464  
-                        default:
465  
-                            log.error("header type " + hdr.type + " not known");
466  
-                            break;
467  
-                    }
  455
+                    handleUpEvent(hdr, msg);
468 456
                     return null; // don't pass message up
469 457
                 }
470 458
 
@@ -500,6 +488,63 @@ public Object up(Event evt) {
500 488
     }
501 489
 
502 490
 
  491
+    public void up(MessageBatch batch) {
  492
+        int length=0;
  493
+        for(Message msg: batch) {
  494
+            if(msg.isFlagSet(Message.NO_FC))
  495
+                continue;
  496
+            FcHeader hdr=(FcHeader)msg.getHeader(this.id);
  497
+            if(hdr != null) {
  498
+                batch.remove(msg); // don't pass message up as part of the batch
  499
+                handleUpEvent(hdr, msg);
  500
+                continue;
  501
+            }
  502
+            length+=msg.getLength();
  503
+        }
  504
+
  505
+        Address sender=batch.sender();
  506
+        long new_credits=0;
  507
+        if(length > 0)
  508
+            new_credits=adjustCredit(received, sender, length);
  509
+
  510
+
  511
+        if(!batch.isEmpty()) {
  512
+            // JGRP-928: changed ignore_thread to a ThreadLocal: multiple threads can access it with the
  513
+            // introduction of the concurrent stack
  514
+            if(ignore_synchronous_response)
  515
+                ignore_thread.set(true);
  516
+            try {
  517
+                up_prot.up(batch);
  518
+            }
  519
+            finally {
  520
+                if(ignore_synchronous_response)
  521
+                    ignore_thread.remove(); // need to revert because the thread is placed back into the pool
  522
+                if(new_credits > 0)
  523
+                    sendCredit(sender, new_credits);
  524
+            }
  525
+        }
  526
+    }
  527
+
  528
+    protected void handleUpEvent(FcHeader hdr, Message msg) {
  529
+        switch(hdr.type) {
  530
+            case FcHeader.REPLENISH:
  531
+                num_credit_responses_received++;
  532
+                handleCredit(msg.getSrc(), (Number)msg.getObject());
  533
+                break;
  534
+            case FcHeader.CREDIT_REQUEST:
  535
+                num_credit_requests_received++;
  536
+                Address sender=msg.getSrc();
  537
+                Long sent_credits=(Long)msg.getObject();
  538
+                if(sent_credits != null)
  539
+                    handleCreditRequest(received, sender,sent_credits);
  540
+                break;
  541
+            default:
  542
+                log.error("header type " + hdr.type + " not known");
  543
+                break;
  544
+        }
  545
+    }
  546
+
  547
+
503 548
     private void handleConfigEvent(Map<String,Object> info) {
504 549
         if(info != null) {
505 550
             Integer frag_size=(Integer)info.get("frag_size");
@@ -736,9 +781,7 @@ private void sendCredit(Address dest, long credit) {
736 781
             number=(int)credit;
737 782
         else
738 783
             number=credit;
739  
-        Message msg=new Message(dest, null, number);
740  
-        msg.setFlag(Message.OOB);
741  
-        msg.putHeader(this.id, REPLENISH_HDR);
  784
+        Message msg=new Message(dest, number).setFlag(Message.OOB, Message.Flag.DONT_BUNDLE).putHeader(this.id,REPLENISH_HDR);
742 785
         down_prot.down(new Event(Event.MSG, msg));
743 786
         num_credit_responses_sent++;
744 787
     }
@@ -752,8 +795,7 @@ private void sendCredit(Address dest, long credit) {
752 795
     private void sendCreditRequest(final Address dest, Long credits_left) {
753 796
         if(log.isTraceEnabled())
754 797
             log.trace("sending credit request to " + dest);
755  
-        Message msg=new Message(dest, null, credits_left);
756  
-        msg.putHeader(this.id, CREDIT_REQUEST_HDR);
  798
+        Message msg=new Message(dest, credits_left).setFlag(Message.Flag.DONT_BUNDLE).putHeader(this.id,CREDIT_REQUEST_HDR);
757 799
         down_prot.down(new Event(Event.MSG, msg));
758 800
         num_credit_requests_sent++;
759 801
     }
95  src/org/jgroups/protocols/FRAG.java
@@ -9,10 +9,7 @@
9 9
 import org.jgroups.annotations.ManagedAttribute;
10 10
 import org.jgroups.annotations.Property;
11 11
 import org.jgroups.stack.Protocol;
12  
-import org.jgroups.util.ExposedByteArrayOutputStream;
13  
-import org.jgroups.util.ExposedDataOutputStream;
14  
-import org.jgroups.util.Util;
15  
-import org.jgroups.util.ExposedByteArrayInputStream;
  12
+import org.jgroups.util.*;
16 13
 
17 14
 import java.io.ByteArrayInputStream;
18 15
 import java.io.DataInputStream;
@@ -138,12 +135,13 @@ public Object down(Event evt) {
138 135
      */
139 136
     public Object up(Event evt) {
140 137
         switch(evt.getType()) {
141  
-
142 138
             case Event.MSG:
143 139
                 Message msg=(Message)evt.getArg();
144 140
                 FragHeader hdr=(FragHeader)msg.getHeader(this.id);
145 141
                 if(hdr != null) { // needs to be defragmented
146  
-                    unfragment(msg, hdr); // Unfragment and possibly pass up
  142
+                    Message assembled_msg=unfragment(msg, hdr);
  143
+                    if(assembled_msg != null)
  144
+                        up_prot.up(new Event(Event.MSG, assembled_msg));
147 145
                     return null;
148 146
                 }
149 147
                 else {
@@ -165,6 +163,20 @@ public Object up(Event evt) {
165 163
         return up_prot.up(evt); // Pass up to the layer above us by default
166 164
     }
167 165
 
  166
+    public void up(MessageBatch batch) {
  167
+        for(Message msg: batch) {
  168
+            FragHeader hdr=(FragHeader)msg.getHeader(this.id);
  169
+            if(hdr != null) { // needs to be defragmented
  170
+                batch.remove(msg);
  171
+                Message assembled_msg=unfragment(msg,hdr);
  172
+                if(assembled_msg != null)
  173
+                    batch.add(assembled_msg); // the newly added message will not get iterated over by the current iterator !
  174
+            }
  175
+        }
  176
+        if(!batch.isEmpty())
  177
+            up_prot.up(batch);
  178
+    }
  179
+
168 180
 
169 181
     private void handleViewChange(View view) {
170 182
         List<Address> new_mbrs=view.getMembers();
@@ -237,7 +249,7 @@ private void fragment(Message msg, long size) {
237 249
      * 4. Set headers and buffer in msg
238 250
      * 5. Pass msg up the stack
239 251
      */
240  
-    private void unfragment(Message msg, FragHeader hdr) {
  252
+    private Message unfragment(Message msg, FragHeader hdr) {
241 253
         Address            sender=msg.getSrc();
242 254
         FragmentationTable frag_table=fragment_list.get(sender);
243 255
         if(frag_table == null) {
@@ -251,24 +263,26 @@ private void unfragment(Message msg, FragHeader hdr) {
251 263
         }
252 264
         num_received_frags++;
253 265
         byte[] buf=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg.getBuffer());
254  
-        if(buf != null) {
255  
-            DataInputStream in=null;
256  
-            try {
257  
-                ByteArrayInputStream bis=new ExposedByteArrayInputStream(buf);
258  
-                in=new DataInputStream(bis);
259  
-                Message assembled_msg=new Message(false);
260  
-                assembled_msg.readFrom(in);
261  
-                assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
262  
-                if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
263  
-                num_received_msgs++;
264  
-                up_prot.up(new Event(Event.MSG, assembled_msg));
265  
-            }
266  
-            catch(Exception e) {
267  
-                log.error("failed unfragmenting a message", e);
268  
-            }
269  
-            finally {
270  
-                Util.close(in);
271  
-            }
  266
+        if(buf == null)
  267
+            return null;
  268
+
  269
+        DataInputStream in=null;
  270
+        try {
  271
+            ByteArrayInputStream bis=new ExposedByteArrayInputStream(buf);
  272
+            in=new DataInputStream(bis);
  273
+            Message assembled_msg=new Message(false);
  274
+            assembled_msg.readFrom(in);
  275
+            assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
  276
+            if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
  277
+            num_received_msgs++;
  278
+            return assembled_msg;
  279
+        }
  280
+        catch(Exception e) {
  281
+            log.error("failed unfragmenting a message", e);
  282
+            return null;
  283
+        }
  284
+        finally {
  285
+            Util.close(in);
272 286
         }
273 287
     }
274 288
 
@@ -276,7 +290,7 @@ private void unfragment(Message msg, FragHeader hdr) {
276 290
     void handleConfigEvent(Map<String,Object> map) {
277 291
         if(map == null) return;
278 292
         if(map.containsKey("frag_size")) {
279  
-            frag_size=((Integer)map.get("frag_size")).intValue();
  293
+            frag_size=(Integer)map.get("frag_size");
280 294
             if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
281 295
         }
282 296
     }
@@ -400,7 +414,7 @@ public String toString() {
400 414
     static class FragmentationTable {
401 415
         private final Address sender;
402 416
         /* the hashtable that holds the fragmentation entries for this sender*/
403  
-        private final Hashtable<Long,FragEntry> h=new Hashtable<Long,FragEntry>(11);  // keys: frag_ids, vals: Entrys
  417
+        private final Map<Long,FragEntry> table=new HashMap<Long,FragEntry>(11);  // keys: frag_ids, vals: Entrys
404 418
 
405 419
 
406 420
         FragmentationTable(Address sender) {
@@ -493,10 +507,8 @@ public int hashCode() {
493 507
 
494 508
 
495 509
         /**
496  
-         * Creates a new entry if not yet present. Adds the fragment.
497  
-         * If all fragements for a given message have been received,
498  
-         * an entire message is reassembled and returned.
499  
-         * Otherwise null is returned.
  510
+         * Creates a new entry if not yet present. Adds the fragment. If all fragements for a given message have been
  511
+         * received, an entire message is reassembled and returned. Otherwise null is returned.
500 512
          *
501 513
          * @param id        - the message ID, unique for a sender
502 514
          * @param frag_id   the index of this fragmentation (0..tot_frags-1)
@@ -504,39 +516,30 @@ public int hashCode() {
504 516
          * @param fragment  - the byte buffer for this fragment
505 517
          */
506 518
         public synchronized byte[] add(long id, int frag_id, int tot_frags, byte[] fragment) {
  519
+            byte[] retval=null; // initialize the return value to default not complete
507 520
 
508  
-            /*initialize the return value to default not complete */
509  
-            byte[] retval=null;
510  
-
511  
-            FragEntry e=h.get(new Long(id));
512  
-
  521
+            FragEntry e=table.get(id);
513 522
             if(e == null) {   // Create new entry if not yet present
514 523
                 e=new FragEntry(id, tot_frags);
515  
-                h.put(new Long(id), e);
  524
+                table.put(id,e);
516 525
             }
517 526
 
518 527
             e.set(frag_id, fragment);
519 528
             if(e.isComplete()) {
520 529
                 retval=e.assembleBuffer();
521  
-                h.remove(new Long(id));
  530
+                table.remove(id);
522 531
             }
523  
-
524 532
             return retval;
525 533
         }
526 534
 
527  
-        public void reset() {
528  
-        }
529 535
 
530 536
         public String toString() {
531 537
             StringBuilder buf=new StringBuilder("Fragmentation Table Sender:").append(sender).append("\n\t");
532  
-            Enumeration<FragEntry> e=this.h.elements();
533  
-            while(e.hasMoreElements()) {
534  
-                FragEntry entry=e.nextElement();
  538
+            for(FragEntry entry: table.values()) {
535 539
                 int count=0;
536 540
                 for(int i=0; i < entry.fragments.length; i++) {
537  
-                    if(entry.fragments[i] != null) {
  541
+                    if(entry.fragments[i] != null)
538 542
                         count++;
539  
-                    }
540 543
                 }
541 544
                 buf.append("Message ID:").append(entry.msg_id).append("\n\t");
542 545
                 buf.append("Total Frags:").append(entry.tot_frags).append("\n\t");
120  src/org/jgroups/protocols/FRAG2.java
@@ -6,6 +6,7 @@
6 6
 import org.jgroups.View;
7 7
 import org.jgroups.annotations.*;
8 8
 import org.jgroups.stack.Protocol;
  9
+import org.jgroups.util.MessageBatch;
9 10
 import org.jgroups.util.Range;
10 11
 import org.jgroups.util.Util;
11 12
 
@@ -44,7 +45,7 @@
44 45
     /* -----------------------------------------    Properties     -------------------------------------------------- */
45 46
     
46 47
     @Property(description="The max number of bytes in a message. Larger messages will be fragmented")
47  
-    int frag_size=60000;
  48
+    protected int                 frag_size=60000;
48 49
   
49 50
     /* --------------------------------------------- Fields ------------------------------------------------------ */
50 51
     
@@ -52,28 +53,24 @@
52 53
     /*the fragmentation list contains a fragmentation table per sender
53 54
      *this way it becomes easier to clean up if a sender (member) leaves or crashes
54 55
      */
55  
-    private final ConcurrentMap<Address,ConcurrentMap<Long,FragEntry>> fragment_list=Util.createConcurrentMap(11);
  56
+    protected final ConcurrentMap<Address,ConcurrentMap<Long,FragEntry>> fragment_list=Util.createConcurrentMap(11);
56 57
 
57 58
     /** Used to assign fragmentation-specific sequence IDs (monotonically increasing) */
58  
-    private int curr_id=1;
  59
+    protected int                 curr_id=1;
59 60
 
60  
-    private final List<Address> members=new ArrayList<Address>(11);
  61
+    protected final List<Address> members=new ArrayList<Address>(11);
  62
+
  63
+    protected Address             local_addr;
61 64
 
62  
-    @ManagedAttribute(description="Number of sent messages")
63  
-    AtomicLong num_sent_msgs=new AtomicLong(0);
64  
-    @ManagedAttribute(description="Number of received messages")
65  
-    AtomicLong num_received_msgs=new AtomicLong(0);
66 65
     @ManagedAttribute(description="Number of sent fragments")
67  
-    AtomicLong num_sent_frags=new AtomicLong(0);
  66
+    AtomicLong                    num_frags_sent=new AtomicLong(0);
68 67
     @ManagedAttribute(description="Number of received fragments")
69  
-    AtomicLong num_received_frags=new AtomicLong(0);
  68
+    AtomicLong                    num_frags_received=new AtomicLong(0);
70 69
 
71 70
     public int   getFragSize()                  {return frag_size;}
72 71
     public void  setFragSize(int s)             {frag_size=s;}
73  
-    public long  getNumberOfSentMessages()      {return num_sent_msgs.get();}
74  
-    public long  getNumberOfSentFragments()     {return num_sent_frags.get();}
75  
-    public long  getNumberOfReceivedMessages()  {return num_received_msgs.get();}
76  
-    public long  getNumberOfReceivedFragments() {return num_received_frags.get();}
  72
+    public long  getNumberOfSentFragments()     {return num_frags_sent.get();}
  73
+    public long  getNumberOfReceivedFragments() {return num_frags_received.get();}
77 74
     public int   fragSize()                     {return frag_size;}
78 75
     public FRAG2 fragSize(int size)             {frag_size=size; return this;}
79 76
 
@@ -90,11 +87,11 @@ public void init() throws Exception {
90 87
             throw new Exception("frag_size=" + old_frag_size + ", new frag_size=" + frag_size + ": new frag_size is invalid");
91 88
 
92 89
         TP transport=getTransport();
93  
-        if(transport != null && transport.isEnableBundling()) {
  90
+        if(transport != null) {
94 91
             int max_bundle_size=transport.getMaxBundleSize();
95 92
             if(frag_size >= max_bundle_size)
96 93
                 throw new IllegalArgumentException("frag_size (" + frag_size + ") has to be < TP.max_bundle_size (" +
97  
-                        max_bundle_size + ")");
  94
+                                                     max_bundle_size + ")");
98 95
         }
99 96
 
100 97
         Map<String,Object> info=new HashMap<String,Object>(1);
@@ -106,10 +103,8 @@ public void init() throws Exception {
106 103
 
107 104
     public void resetStats() {
108 105
         super.resetStats();
109  
-        num_sent_msgs.set(0);
110  
-        num_sent_frags.set(0);
111  
-        num_received_frags.set(0);
112  
-        num_received_msgs.set(0);
  106
+        num_frags_sent.set(0);
  107
+        num_frags_received.set(0);
113 108
     }
114 109
 
115 110
 
@@ -124,12 +119,7 @@ public Object down(Event evt) {
124 119
             case Event.MSG:
125 120
                 Message msg=(Message)evt.getArg();
126 121
                 long size=msg.getLength();
127  
-                num_sent_msgs.incrementAndGet();
128 122
                 if(size > frag_size) {
129  
-                    if(log.isTraceEnabled()) {
130  
-                        log.trace(new StringBuilder("message's buffer size is ").append(size)
131  
-                                .append(", will fragment ").append("(frag_size=").append(frag_size).append(')'));
132  
-                    }
133 123
                     fragment(msg);  // Fragment and pass down
134 124
                     return null;
135 125
                 }
@@ -139,6 +129,10 @@ public Object down(Event evt) {
139 129
                 handleViewChange((View)evt.getArg());
140 130
                 break;
141 131
 
  132
+            case Event.SET_LOCAL_ADDRESS:
  133
+                local_addr=(Address)evt.getArg();
  134
+                break;
  135
+
142 136
             case Event.CONFIG:
143 137
                 Object ret=down_prot.down(evt);
144 138
                 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg());
@@ -151,8 +145,7 @@ public Object down(Event evt) {
151 145
 
152 146
 
153 147
     /**
154  
-     * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up
155  
-     * the stack.
  148
+     * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
156 149
      */
157 150
     public Object up(Event evt) {
158 151
         switch(evt.getType()) {
@@ -161,12 +154,14 @@ public Object up(Event evt) {
161 154
                 Message msg=(Message)evt.getArg();
162 155
                 FragHeader hdr=(FragHeader)msg.getHeader(this.id);
163 156
                 if(hdr != null) { // needs to be defragmented
164  
-                    unfragment(msg, hdr); // Unfragment and possibly pass up
  157
+                    Message assembled_msg=unfragment(msg, hdr);
  158
+                    if(assembled_msg != null) {
  159
+                        if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
  160
+                        assembled_msg.setSrc(msg.getSrc()); // needed ? YES, because fragments have a null src !!
  161
+                        up_prot.up(new Event(Event.MSG, assembled_msg));
  162
+                    }
165 163
                     return null;
166 164
                 }
167  
-                else {
168  
-                    num_received_msgs.incrementAndGet();
169  
-                }
170 165
                 break;
171 166
 
172 167
             case Event.VIEW_CHANGE:
@@ -183,8 +178,21 @@ public Object up(Event evt) {
183 178
         return up_prot.up(evt); // Pass up to the layer above us by default
184 179
     }
185 180
 
  181
+    public void up(MessageBatch batch) {
  182
+        for(Message msg: batch) {
  183
+            FragHeader hdr=(FragHeader)msg.getHeader(this.id);
  184
+            if(hdr != null) { // needs to be defragmented
  185
+                batch.remove(msg);
  186
+                Message assembled_msg=unfragment(msg,hdr);
  187
+                if(assembled_msg != null)
  188
+                    batch.add(assembled_msg); // the newly added message will not get iterated over by the current iterator !
  189
+            }
  190
+        }
  191
+        if(!batch.isEmpty())
  192
+            up_prot.up(batch);
  193
+    }
186 194
 
187  
-    private void handleViewChange(View view) {
  195
+    protected void handleViewChange(View view) {
188 196
         List<Address> new_mbrs=view.getMembers();
189 197
         List<Address> left_mbrs=Util.determineLeftMembers(members, new_mbrs);
190 198
         members.clear();
@@ -223,23 +231,23 @@ public void clearAllFragments() {
223 231
      [2344,3,2]{dst,src,buf3}
224 232
      </pre>
225 233
      */
226  
-    private void fragment(Message msg) {
  234
+    protected void fragment(Message msg) {
227 235
         try {
228 236
             byte[] buffer=msg.getRawBuffer();
229  
-            List<Range> fragments=Util.computeFragOffsets(msg.getOffset(), msg.getLength(), frag_size);
  237
+            final List<Range> fragments=Util.computeFragOffsets(msg.getOffset(), msg.getLength(), frag_size);
230 238
             int num_frags=fragments.size();
231  
-            num_sent_frags.addAndGet(num_frags);
  239
+            num_frags_sent.addAndGet(num_frags);
232 240
 
233 241
             if(log.isTraceEnabled()) {
234 242
                 Address dest=msg.getDest();
235  
-                StringBuilder sb=new StringBuilder("fragmenting packet to ");
  243
+                StringBuilder sb=new StringBuilder(local_addr +  ": fragmenting message to ");
236 244
                 sb.append((dest != null ? dest.toString() : "<all members>")).append(" (size=").append(buffer.length);
237 245
                 sb.append(") into ").append(num_frags).append(" fragment(s) [frag_size=").append(frag_size).append(']');
238 246
                 log.trace(sb.toString());
239 247
             }
240 248
 
241 249
             long frag_id=getNextId(); // used as a seqno
242  
-            for(int i=0; i < fragments.size(); i++) {
  250
+            for(int i=0; i < num_frags; i++) {
243 251
                 Range r=fragments.get(i);
244 252
                 // don't copy the buffer, only src, dest and headers. Only copy the headers one time !
245 253
                 Message frag_msg=msg.copy(false, i == 0);
@@ -260,11 +268,11 @@ private void fragment(Message msg) {
260 268
      2. When all are received -> Assemble them into one big buffer
261 269
      3. Read headers and byte buffer from big buffer
262 270
      4. Set headers and buffer in msg
263  
-     5. Pass msg up the stack
  271
+     5. Return the message
264 272
      */
265  
-    private void unfragment(Message msg, FragHeader hdr) {
266  
-        Address            sender=msg.getSrc();
267  
-        Message            assembled_msg=null;
  273
+    protected Message unfragment(Message msg, FragHeader hdr) {
  274
+        Address   sender=msg.getSrc();
  275
+        Message   assembled_msg=null;
268 276
 
269 277
         ConcurrentMap<Long,FragEntry> frag_table=fragment_list.get(sender);
270 278
         if(frag_table == null) {
@@ -273,7 +281,7 @@ private void unfragment(Message msg, FragHeader hdr) {
273 281
             if(tmp != null) // value was already present
274 282
                 frag_table=tmp;
275 283
         }
276  
-        num_received_frags.incrementAndGet();
  284
+        num_frags_received.incrementAndGet();
277 285
 
278 286
         FragEntry entry=frag_table.get(hdr.id);
279 287
         if(entry == null) {
@@ -289,31 +297,23 @@ private void unfragment(Message msg, FragHeader hdr) {
289 297
             if(entry.isComplete()) {
290 298
                 assembled_msg=entry.assembleMessage();
291 299
                 frag_table.remove(hdr.id);
  300
+                if(log.isTraceEnabled())
  301
+                    log.trace(local_addr + ": unfragmented message to " + sender + " (size=" + assembled_msg.getLength() +
  302
+                                ") from " + entry.number_of_frags_recvd + " fragments");
292 303
             }
293 304
         }
294 305
         finally {
295 306
             entry.unlock();
296 307
         }
297 308
 
298  
-        // assembled_msg=frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg);
299  
-        if(assembled_msg != null) {
300  
-            try {
301  
-                if(log.isTraceEnabled()) log.trace("assembled_msg is " + assembled_msg);
302  
-                assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
303  
-                num_received_msgs.incrementAndGet();
304  
-                up_prot.up(new Event(Event.MSG, assembled_msg));
305  
-            }
306  
-            catch(Exception e) {
307  
-                if(log.isErrorEnabled()) log.error("unfragmentation failed", e);
308  
-            }
309  
-        }
  309
+        return assembled_msg;
310 310
     }
311 311
 
312 312
 
313 313
     void handleConfigEvent(Map<String,Object> map) {
314 314
         if(map == null) return;
315 315
         if(map.containsKey("frag_size")) {
316  
-            frag_size=((Integer)map.get("frag_size")).intValue();
  316
+            frag_size=(Integer)map.get("frag_size");
317 317
             if(log.isDebugEnabled()) log.debug("setting frag_size=" + frag_size);
318 318
         }
319 319
     }
@@ -327,23 +327,21 @@ void handleConfigEvent(Map<String,Object> map) {
327 327
      * once all the byte buffer entries have been filled the fragmentation is considered complete.<br/>
328 328
      * All methods are unsynchronized, use getLock() to obtain a lock for concurrent access.
329 329
      */
330  
-    private static class FragEntry {
  330
+    protected static class FragEntry {
331 331
         // each fragment is a byte buffer
332 332
         final Message fragments[];
333 333
         //the number of fragments we have received
334 334
         int number_of_frags_recvd=0;
335 335
 
336  
-        private final Lock lock=new ReentrantLock();
  336
+        protected final Lock lock=new ReentrantLock();
337 337
 
338 338
 
339 339
         /**
340 340
          * Creates a new entry
341 341
          * @param tot_frags the number of fragments to expect for this message
342 342
          */
343  
-        private FragEntry(int tot_frags) {
  343
+        protected FragEntry(int tot_frags) {
344 344
             fragments=new Message[tot_frags];
345