Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixed discarding of OOB messages when rejection policy was "discard":

- atomically set transient flag OOB_DELIVERED when sending the message up
- do this in remove OOB message and remove reg message, too
(https://jira.jboss.org/jira/browse/JGRP-1079)
  • Loading branch information...
commit cd8f57a8ed23be81239ce0ee29d831251d00b274 1 parent ad31d26
@belaban authored
Showing with 26 additions and 10 deletions.
  1. +26 −10 src/org/jgroups/protocols/pbcast/NAKACK.java
View
36 src/org/jgroups/protocols/pbcast/NAKACK.java
@@ -32,7 +32,7 @@
* instead of the requester by setting use_mcast_xmit to true.
*
* @author Bela Ban
- * @version $Id: NAKACK.java,v 1.238 2009/11/13 15:33:02 belaban Exp $
+ * @version $Id: NAKACK.java,v 1.239 2009/11/17 12:09:08 belaban Exp $
*/
@MBean(description="Reliable transmission multipoint FIFO protocol")
@DeprecatedProperty(names={"max_xmit_size", "eager_lock_release", "stats_list_size"})
@@ -267,6 +267,16 @@ public int getReceivedTableSize() {
return getPendingRetransmissionRequests();
}
+ /**
+ * Please don't use this method; it is only provided for unit testing !
+ * @param mbr
+ * @return
+ */
+ public NakReceiverWindow getWindow(Address mbr) {
+ return xmit_table.get(mbr);
+ }
+
+
public void resetStats() {
xmit_reqs_received=xmit_reqs_sent=xmit_rsps_received=xmit_rsps_sent=missing_msgs_received=0;
sent.clear();
@@ -737,7 +747,7 @@ private void send(Event evt, Message msg) {
* Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many
* messages as possible from the NRW and passes them up the stack. Discards messages from non-members.
*/
- private void handleMessage(Message msg, NakAckHeader hdr) {
+ private void handleMessage(Message msg, NakAckHeader hdr) {
Address sender=msg.getSrc();
if(sender == null) {
if(log.isErrorEnabled())
@@ -753,7 +763,8 @@ private void handleMessage(Message msg, NakAckHeader hdr) {
if(leaving)
return;
if(log.isWarnEnabled() && log_discard_msgs)
- log.warn(local_addr + ": discarded message from non-member " + sender + ", my view is " + view);
+ log.warn(local_addr + ": dropped message from " + sender +
+ " (not in retransmission table), members are " + xmit_table.keySet() +", view=" + view);
return;
}
@@ -768,9 +779,8 @@ private void handleMessage(Message msg, NakAckHeader hdr) {
// http://jira.jboss.com/jira/browse/JGRP-379
if(added && msg.isFlagSet(Message.OOB)) {
msg=win.get(hdr.seqno);
- if(msg != null) {
- boolean pass_up=msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED);
- if(pass_up)
+ if(msg != null && msg.isFlagSet(Message.OOB)) {
+ if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED))
up_prot.up(new Event(Event.MSG, msg));
}
@@ -778,8 +788,7 @@ private void handleMessage(Message msg, NakAckHeader hdr) {
final Message oob_msg=win.removeOOBMessage();
if(oob_msg == null)
break;
- boolean pass_up=oob_msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED);
- if(pass_up) {
+ if(oob_msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) {
timer.execute(new Runnable() {
public void run() {
up_prot.up(new Event(Event.MSG, oob_msg));
@@ -822,10 +831,17 @@ public void run() {
return;
}
- for(Message msg_to_deliver: msgs) {
+ for(final Message msg_to_deliver: msgs) {
- // discard OOB msg as it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-379)
+ // discard OOB msg if it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-379)
if(msg_to_deliver.isFlagSet(Message.OOB)) {
+ if(msg_to_deliver.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) {
+ timer.execute(new Runnable() {
+ public void run() {
+ up_prot.up(new Event(Event.MSG, msg_to_deliver));
+ }
+ });
+ }
continue;
}
num_regular_msgs_removed++;

0 comments on commit cd8f57a

Please sign in to comment.
Something went wrong with that request. Please try again.