Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Replaced seqno with AtomicLong and dropped seqno_lock (https://issues…

  • Loading branch information...
commit 17ec4d2898e43b16df23c85b83ef3cd8cb966006 1 parent 10688c9
@belaban authored
Showing with 14 additions and 31 deletions.
  1. +14 −31 src/org/jgroups/protocols/pbcast/NAKACK.java
View
45 src/org/jgroups/protocols/pbcast/NAKACK.java
@@ -158,9 +158,7 @@
private Address local_addr=null;
private final List<Address> members=new CopyOnWriteArrayList<Address>();
private View view;
- @GuardedBy("seqno_lock")
- private long seqno=0; // current message sequence number (starts with 1)
- private final Lock seqno_lock=new ReentrantLock();
+ private final AtomicLong seqno=new AtomicLong(0); // current message sequence number (starts with 1)
/** Map to store sent and received messages (keyed by sender) */
private final ConcurrentMap<Address,NakReceiverWindow> xmit_table=Util.createConcurrentMap();
@@ -240,7 +238,7 @@ public long getSizeOfAllMessagesInclHeaders() {
@ManagedAttribute
- public long getCurrentSeqno() {return seqno;}
+ public long getCurrentSeqno() {return seqno.get();}
@ManagedOperation
public String printRetransmitStats() {
@@ -641,21 +639,20 @@ protected void send(Event evt, Message msg, boolean pass_down) {
if(msg.getSrc() == null)
msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr
- seqno_lock.lock();
- try {
- try { // incrementing seqno and adding the msg to sent_msgs needs to be atomic
- msg_id=seqno +1;
+ msg_id=seqno.incrementAndGet();
+ long sleep=500;
+ while(running) {
+ try {
msg.putHeader(this.id, NakAckHeader.createMessageHeader(msg_id));
win.add(msg_id, msg);
- seqno=msg_id;
+ break;
}
catch(Throwable t) {
- throw new RuntimeException("failure adding msg " + msg + " to the retransmit table for " + local_addr, t);
+ if(running)
+ Util.sleep(sleep);
+ sleep=Math.min(5000, sleep*2);
}
}
- finally {
- seqno_lock.unlock();
- }
if(!pass_down)
return;
@@ -1125,14 +1122,8 @@ private void setDigest(Digest digest, boolean merge) {
win.destroy(); // stops retransmission
// to get here, merge must be false !
if(member.equals(local_addr)) { // Adjust the seqno: https://jira.jboss.org/browse/JGRP-1251
- seqno_lock.lock();
- try {
- seqno=highest_delivered_seqno;
- set_own_seqno=true;
- }
- finally {
- seqno_lock.unlock();
- }
+ seqno.set(highest_delivered_seqno);
+ set_own_seqno=true;
}
}
win=createNakReceiverWindow(member, highest_delivered_seqno);
@@ -1277,17 +1268,9 @@ protected void retransmit(long first_seqno, long last_seqno, final Address sende
private void reset() {
- seqno_lock.lock();
- try {
- seqno=0;
- }
- finally {
- seqno_lock.unlock();
- }
-
- for(NakReceiverWindow win: xmit_table.values()) {
+ seqno.set(0);
+ for(NakReceiverWindow win: xmit_table.values())
win.destroy();
- }
xmit_table.clear();
}
Please sign in to comment.
Something went wrong with that request. Please try again.