Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: belaban/JGroups
base: c03a94f
...
head fork: belaban/JGroups
compare: bf85af7
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
View
2  src/org/jgroups/blocks/ReplicatedHashMap.java
@@ -51,7 +51,7 @@
void entryRemoved(K key);
- void viewChange(View view, Vector<Address> new_mbrs, Vector<Address> old_mbrs);
+ void viewChange(View view, Vector<Address> mbrs_joined, Vector<Address> mbrs_left);
void contentsSet(Map<K,V> new_entries);
View
59 src/org/jgroups/protocols/UNICAST.java
@@ -75,6 +75,8 @@
private final ConcurrentMap<Address, SenderEntry> send_table=Util.createConcurrentMap();
private final ConcurrentMap<Address, ReceiverEntry> recv_table=Util.createConcurrentMap();
+ protected final ReentrantLock recv_table_lock=new ReentrantLock();
+
private final Vector<Address> members=new Vector<Address>(11);
private Address local_addr=null;
@@ -509,7 +511,7 @@ public void expired(Address key) {
* e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
* add message. Set e.received_msgs to the new window. Else just add the message.
*/
- private void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
+ protected void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
if(log.isTraceEnabled()) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno);
@@ -519,36 +521,45 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boole
log.trace(sb);
}
- ReceiverEntry entry=recv_table.get(sender);
- AckReceiverWindow win=entry != null? entry.received_msgs : null;
+ AckReceiverWindow win;
- if(first) {
- if(entry == null) {
- entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
- win=entry.received_msgs;
- }
- else { // entry != null && win != null
- if(conn_id != entry.recv_conn_id) {
- if(log.isTraceEnabled())
- log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
-
- ReceiverEntry entry2=recv_table.remove(sender);
- if(entry2 != null)
- entry2.received_msgs.reset();
-
+ recv_table_lock.lock();
+ try {
+ ReceiverEntry entry=recv_table.get(sender);
+ win=entry != null? entry.received_msgs : null;
+ if(first) {
+ if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
- else {
- ;
+ else { // entry != null && win != null
+ if(conn_id != entry.recv_conn_id) {
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
+
+ ReceiverEntry entry2=recv_table.remove(sender);
+ if(entry2 != null)
+ entry2.received_msgs.reset();
+
+ entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
+ win=entry.received_msgs;
+ }
+ else {
+ ;
+ }
+ }
+ }
+ else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
+ if(win == null || entry.recv_conn_id != conn_id) {
+ recv_table_lock.unlock();
+ sendRequestForFirstSeqno(sender); // drops the message and returns (see below)
+ return;
}
}
}
- else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
- if(win == null || entry.recv_conn_id != conn_id) {
- sendRequestForFirstSeqno(sender); // drops the message and returns (see below)
- return;
- }
+ finally {
+ if(recv_table_lock.isHeldByCurrentThread())
+ recv_table_lock.unlock();
}
byte result=win.add2(seqno, msg); // win is guaranteed to be non-null if we get here
View
60 src/org/jgroups/protocols/UNICAST2.java
@@ -91,6 +91,8 @@
private final ConcurrentMap<Address, SenderEntry> send_table=Util.createConcurrentMap();
private final ConcurrentMap<Address, ReceiverEntry> recv_table=Util.createConcurrentMap();
+ protected final ReentrantLock recv_table_lock=new ReentrantLock();
+
private final Vector<Address> members=new Vector<Address>(11);
private Address local_addr=null;
@@ -603,7 +605,7 @@ public void expired(Address key) {
* e.received_msgs is null and <code>first</code> is true: create a new AckReceiverWindow(seqno) and
* add message. Set e.received_msgs to the new window. Else just add the message.
*/
- private void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
+ protected void handleDataReceived(Address sender, long seqno, long conn_id, boolean first, Message msg, Event evt) {
if(log.isTraceEnabled()) {
StringBuilder sb=new StringBuilder();
sb.append(local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno);
@@ -613,37 +615,47 @@ private void handleDataReceived(Address sender, long seqno, long conn_id, boolea
log.trace(sb);
}
- ReceiverEntry entry=recv_table.get(sender);
- NakReceiverWindow win=entry != null? entry.received_msgs : null;
-
- if(first) {
- if(entry == null) {
- entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
- win=entry.received_msgs;
- }
- else { // entry != null && win != null
- if(conn_id != entry.recv_conn_id) {
- if(log.isTraceEnabled())
- log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
-
- ReceiverEntry entry2=recv_table.remove(sender);
- if(entry2 != null)
- entry2.received_msgs.destroy();
+ ReceiverEntry entry;
+ NakReceiverWindow win;
+ recv_table_lock.lock();
+ try {
+ entry=recv_table.get(sender);
+ win=entry != null? entry.received_msgs : null;
+ if(first) {
+ if(entry == null) {
entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
win=entry.received_msgs;
}
- else {
- ;
+ else { // entry != null && win != null
+ if(conn_id != entry.recv_conn_id) {
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
+
+ ReceiverEntry entry2=recv_table.remove(sender);
+ if(entry2 != null)
+ entry2.received_msgs.destroy();
+
+ entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
+ win=entry.received_msgs;
+ }
+ else {
+ ;
+ }
}
}
- }
- else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
- if(win == null || entry.recv_conn_id != conn_id) {
- sendRequestForFirstSeqno(sender, seqno); // drops the message and returns (see below)
- return;
+ else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
+ if(win == null || entry.recv_conn_id != conn_id) {
+ recv_table_lock.unlock();
+ sendRequestForFirstSeqno(sender, seqno); // drops the message and returns (see below)
+ return;
+ }
}
}
+ finally {
+ if(recv_table_lock.isHeldByCurrentThread())
+ recv_table_lock.unlock();
+ }
boolean added=win.add(seqno, msg); // win is guaranteed to be non-null if we get here
num_msgs_received++;

No commit comments for this range

Something went wrong with that request. Please try again.