Skip to content

Commit

Permalink
-Added synchronization around removal and re-creation of a window in…
Browse files Browse the repository at this point in the history
… UNICAST and UNICAST2

 - Added testMultipleConcurrentResets() to UNICAST(2)_ConnectionTests
 [https://issues.jboss.org/browse/JGRP-1347]
  • Loading branch information
belaban committed Jul 25, 2011
1 parent 8b98023 commit 0be52c0
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 62 deletions.
59 changes: 35 additions & 24 deletions src/org/jgroups/protocols/UNICAST.java
Expand Up @@ -67,6 +67,8 @@ public class UNICAST extends Protocol implements AckSenderWindow.RetransmitComma
private final ConcurrentMap<Address, SenderEntry> send_table=Util.createConcurrentMap();
private final ConcurrentMap<Address, ReceiverEntry> recv_table=Util.createConcurrentMap();

protected final ReentrantLock recv_table_lock=new ReentrantLock();

private final List<Address> members=new ArrayList<Address>(11);

private Address local_addr=null;
Expand Down Expand Up @@ -496,7 +498,7 @@ public void expired(Address key) {
* e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
* add message. Set e.received_msgs to the new window. Else just add the message.
*/
private void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
protected void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
if(log.isTraceEnabled()) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno);
Expand All @@ -506,36 +508,45 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boole
log.trace(sb);
}

ReceiverEntry entry=recv_table.get(sender);
AckReceiverWindow win=entry != null? entry.received_msgs : null;
AckReceiverWindow win;

if(first) {
if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else { // entry != null && win != null
if(conn_id != entry.recv_conn_id) {
if(log.isTraceEnabled())
log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");

ReceiverEntry entry2=recv_table.remove(sender);
if(entry2 != null)
entry2.received_msgs.reset();

recv_table_lock.lock();
try {
ReceiverEntry entry=recv_table.get(sender);
win=entry != null? entry.received_msgs : null;
if(first) {
if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else {
;
else { // entry != null && win != null
if(conn_id != entry.recv_conn_id) {
if(log.isTraceEnabled())
log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");

ReceiverEntry entry2=recv_table.remove(sender);
if(entry2 != null)
entry2.received_msgs.reset();

entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else {
;
}
}
}
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
if(win == null || entry.recv_conn_id != conn_id) {
recv_table_lock.unlock();
sendRequestForFirstSeqno(sender); // drops the message and returns (see below)
return;
}
}
}
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
if(win == null || entry.recv_conn_id != conn_id) {
sendRequestForFirstSeqno(sender); // drops the message and returns (see below)
return;
}
finally {
if(recv_table_lock.isHeldByCurrentThread())
recv_table_lock.unlock();
}

byte result=win.add2(seqno, msg); // win is guaranteed to be non-null if we get here
Expand Down
60 changes: 36 additions & 24 deletions src/org/jgroups/protocols/UNICAST2.java
Expand Up @@ -90,6 +90,8 @@ public class UNICAST2 extends Protocol implements Retransmitter.RetransmitComman
private final ConcurrentMap<Address, SenderEntry> send_table=Util.createConcurrentMap();
private final ConcurrentMap<Address, ReceiverEntry> recv_table=Util.createConcurrentMap();

protected final ReentrantLock recv_table_lock=new ReentrantLock();

private final List<Address> members=new ArrayList<Address>(11);

private Address local_addr=null;
Expand Down Expand Up @@ -602,7 +604,7 @@ public void expired(Address key) {
* e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
* add message. Set e.received_msgs to the new window. Else just add the message.
*/
private void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
protected void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
if(log.isTraceEnabled()) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno);
Expand All @@ -612,37 +614,47 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
log.trace(sb);
}

ReceiverEntry entry=recv_table.get(sender);
NakReceiverWindow win=entry != null? entry.received_msgs : null;

if(first) {
if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else { // entry != null && win != null
if(conn_id != entry.recv_conn_id) {
if(log.isTraceEnabled())
log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");

ReceiverEntry entry2=recv_table.remove(sender);
if(entry2 != null)
entry2.received_msgs.destroy();
ReceiverEntry entry;
NakReceiverWindow win;

recv_table_lock.lock();
try {
entry=recv_table.get(sender);
win=entry != null? entry.received_msgs : null;
if(first) {
if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else {
;
else { // entry != null && win != null
if(conn_id != entry.recv_conn_id) {
if(log.isTraceEnabled())
log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");

ReceiverEntry entry2=recv_table.remove(sender);
if(entry2 != null)
entry2.received_msgs.destroy();

entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
else {
;
}
}
}
}
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
if(win == null || entry.recv_conn_id != conn_id) {
sendRequestForFirstSeqno(sender, seqno); // drops the message and returns (see below)
return;
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
if(win == null || entry.recv_conn_id != conn_id) {
recv_table_lock.unlock();
sendRequestForFirstSeqno(sender, seqno); // drops the message and returns (see below)
return;
}
}
}
finally {
if(recv_table_lock.isHeldByCurrentThread())
recv_table_lock.unlock();
}

boolean added=win.add(seqno, msg); // win is guaranteed to be non-null if we get here
num_msgs_received++;
Expand Down
Expand Up @@ -11,6 +11,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;

/**
* Tests unilateral closings of UNICAST2 connections. The test scenarios are described in doc/design.UNICAST.new.txt.
Expand Down Expand Up @@ -123,6 +124,58 @@ public void testAClosingUnilaterallyButLosingFirstMessage() throws Exception {
}


/** Tests concurrent reception of multiple messages with a different conn_id (https://issues.jboss.org/browse/JGRP-1347) */
public void testMultipleConcurrentResets() throws Exception {
sendAndCheck(a, b_addr, 1, r2);

// now close connection on A unilaterally
System.out.println("==== Closing the connection on A");
u1.removeConnection(b_addr);

r2.clear();

final UNICAST2 unicast=(UNICAST2)b.getProtocolStack().findProtocol(UNICAST2.class);

int NUM=10;

final List<Message> msgs=new ArrayList<Message>(NUM);

for(int i=1; i <= NUM; i++) {
Message msg=new Message(b_addr, a_addr, "m" + i);
UNICAST2.Unicast2Header hdr=UNICAST2.Unicast2Header.createDataHeader(1, (short)2, true);
msg.putHeader(unicast.getId(), hdr);
msgs.add(msg);
}


Thread[] threads=new Thread[NUM];
final CyclicBarrier barrier=new CyclicBarrier(NUM+1);
for(int i=0; i < NUM; i++) {
final int index=i;
threads[i]=new Thread() {
public void run() {
try {
barrier.await();
unicast.up(new Event(Event.MSG, msgs.get(index)));
}
catch(Exception e) {
e.printStackTrace();
}
}
};
threads[i].start();
}

barrier.await();
for(Thread thread: threads)
thread.join();

List<Message> list=r2.getMessages();
System.out.println("list = " + print(list));

assert list.size() == 1 : "list must have 1 element but has " + list.size() + ": " + print(list);
}


/**
* Send num unicasts on both channels and verify the other end received them
Expand Down

0 comments on commit 0be52c0

Please sign in to comment.