Skip to content

Commit

Permalink
- STABLE.down(): send STABILITY message when flag DONT_LOOPBACK is se…
Browse files Browse the repository at this point in the history
…t and max_bytes is exceeded

- Added STABLE_Test to test https://issues.redhat.com/browse/JGRP-2605
  • Loading branch information
belaban committed Feb 7, 2022
1 parent 71601d6 commit c79ee74
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 53 deletions.
105 changes: 52 additions & 53 deletions src/org/jgroups/protocols/pbcast/STABLE.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public List<Integer> requiredDownServices() {
protected void suspend(long timeout) {
if(!suspended) {
suspended=true;
log.debug("suspending message garbage collection");
log.debug("%s: suspending message garbage collection", local_addr);
}
startResumeTask(timeout); // will not start task if already running
}
Expand All @@ -216,7 +216,7 @@ protected void resume() {
lock.unlock();
}

log.debug("resuming message garbage collection");
log.debug("%s: resuming message garbage collection", local_addr);
stopResumeTask();
}

Expand Down Expand Up @@ -284,57 +284,23 @@ public void up(MessageBatch batch) {
}

// only if message counting is on, and only for multicast messages (http://jira.jboss.com/jira/browse/JGRP-233)
if(max_bytes > 0 && batch.dest() == null && !batch.isEmpty()) {
boolean send_stable_msg=false;
received.lock();
try {
num_bytes_received+=batch.length();
if(num_bytes_received >= max_bytes) {
log.trace("max_bytes has been reached (%s, bytes received=%s): triggers stable msg",
max_bytes, num_bytes_received);
num_bytes_received=0;
send_stable_msg=true;
}
}
finally {
received.unlock();
}

if(send_stable_msg)
sendStableMessage(true);
}
if(max_bytes > 0 && batch.dest() == null && !batch.isEmpty() && maxBytesExceeded(batch.length()))
sendStableMessage(true);

if(!batch.isEmpty())
up_prot.up(batch);
}


protected void handleRegularMessage(Message msg) {
// only if bytes counting is enabled, and only for multicast messages (http://jira.jboss.com/jira/browse/JGRP-233)
if(max_bytes <= 0)
return;
if(msg.getDest() == null) {
boolean send_stable_msg=false;
received.lock();
try {
num_bytes_received+=msg.getLength();
if(num_bytes_received >= max_bytes) {
log.trace("max_bytes has been reached (%s, bytes received=%s): triggers stable msg",
max_bytes, num_bytes_received);
num_bytes_received=0;
send_stable_msg=true;
}
}
finally {
received.unlock();
}

if(send_stable_msg)
sendStableMessage(true);
}
public Object down(Message msg) {
boolean send_stable_msg=max_bytes > 0 && msg.getDest() == null
&& msg.isTransientFlagSet(DONT_LOOPBACK) && maxBytesExceeded(msg.getLength());
Object retval=down_prot.down(msg);
if(send_stable_msg)
sendStableMessage(true);
return retval;
}


public Object down(Event evt) {
switch(evt.getType()) {
case Event.VIEW_CHANGE:
Expand Down Expand Up @@ -362,7 +328,42 @@ public Object down(Event evt) {
}


/* --------------------------------------- Private Methods ---------------------------------------- */
protected Object handle(StableHeader hdr, Address sender, Digest digest) {
switch(hdr.type) {
case StableHeader.STABLE_GOSSIP:
handleStableMessage(digest, sender, hdr.view_id);
break;
case StableHeader.STABILITY:
handleStabilityMessage(digest, sender, hdr.view_id);
break;
default:
log.error("%s: StableHeader type %s not known", local_addr, hdr.type);
}
return null;
}

protected void handleRegularMessage(Message msg) {
// only if bytes counting is enabled, and only for multicast messages (http://jira.jboss.com/jira/browse/JGRP-233)
if(max_bytes > 0 && msg.getDest() == null && maxBytesExceeded(msg.getLength()))
sendStableMessage(true);
}

protected boolean maxBytesExceeded(int len) {
received.lock();
try {
num_bytes_received+=len;
if(num_bytes_received >= max_bytes) {
log.trace("%s: max_bytes (%d) has been exceeded; bytes received=%d: triggers stable msg",
local_addr, max_bytes, num_bytes_received);
num_bytes_received=0;
return true;
}
return false;
}
finally {
received.unlock();
}
}


protected void handleViewChange(View v) {
Expand All @@ -381,7 +382,6 @@ protected void handleViewChange(View v) {




/** Update my own digest from a digest received by somebody else. Returns whether the update was successful.
* Needs to be called with a lock on digest */
@GuardedBy("lock")
Expand Down Expand Up @@ -493,7 +493,6 @@ protected void startResumeTask(long max_suspend_time) {
log.debug("%s: resume task started, max_suspend_time=%d", local_addr, max_suspend_time);
}
}

}


Expand Down Expand Up @@ -627,7 +626,7 @@ protected void sendStableMessage(boolean send_in_background) {
}
// don't send a STABLE message to self when coord, but instead update the digest directly
if(is_coord) {
log.trace("%s: updating the local figest with a stable message (coordinator): %s", local_addr, d);
log.trace("%s: updating the local digest with a stable message (coordinator): %s", local_addr, d);
num_stable_msgs_sent++;
handleStableMessage(d, local_addr, current_view.getViewId());
return;
Expand Down Expand Up @@ -683,7 +682,7 @@ protected Digest readDigest(byte[] buffer, int offset, int length) {
*/
protected void sendStabilityMessage(Digest d, final ViewId view_id) {
if(suspended) {
log.debug("STABILITY message will not be sent as suspended=%b", suspended);
log.debug("%s: STABILITY message will not be sent as suspended=%b", local_addr, suspended);
return;
}

Expand All @@ -699,7 +698,7 @@ protected void sendStabilityMessage(Digest d, final ViewId view_id) {
down_prot.down(msg);
}
catch(Exception e) {
log.warn("failed sending STABILITY message", e);
log.warn("%s: failed sending STABILITY message: %s", local_addr, e);
}
}

Expand Down Expand Up @@ -805,8 +804,8 @@ long computeSleepTime() {
protected class ResumeTask implements Runnable {
public void run() {
if(suspended)
log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
"check why this event was not received (or increase max_suspend_time for large state transfers)");
log.warn("%s: ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
"check why this event was not received (or increase max_suspend_time for large state transfers)", local_addr);
resume();
}
public String toString() {return STABLE.class.getSimpleName() + ": ResumeTask";}
Expand Down
73 changes: 73 additions & 0 deletions tests/junit-functional/org/jgroups/protocols/STABLE_Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.jgroups.protocols;

import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.util.MyReceiver;
import org.jgroups.util.Table;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.stream.Stream;

/**
* Tests sending of STABLE messages when DONT_LOOPBACK is set (https://issues.redhat.com/browse/JGRP-2605)
* @author Bela Ban
* @since 5.2.1, 4.2.20
*/
@Test(groups=Global.FUNCTIONAL)
public class STABLE_Test {
protected JChannel a,b,c;
protected MyReceiver<byte[]> r1, r2, r3;
protected final String GRP=STABLE_Test.class.getSimpleName();

@BeforeMethod protected void setup() throws Exception {
a=create("A").connect(GRP);
b=create("B").connect(GRP);
c=create("C").connect(GRP);
Util.waitUntilAllChannelsHaveSameView(10000, 500, a,b,c);
r1=new MyReceiver<byte[]>().rawMsgs(true);
r2=new MyReceiver<byte[]>().rawMsgs(true);
r3=new MyReceiver<byte[]>().rawMsgs(true);
a.setReceiver(r1);
b.setReceiver(r2);
c.setReceiver(r3);
}

@AfterMethod protected void destroy() {Util.closeReverse(a,b,c);}

public void testStableWithDontLoopback() throws Exception {
byte[] payload=new byte[5000];
for(int i=0; i < 10; i++) {
Message msg=new Message(null, payload).setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
b.send(msg);
Util.sleep(200); // prevents batches, which trigger STABLE msgs in non-10000 increments
}
Util.waitUntil(5000, 500, () -> Stream.of(r1,r3).allMatch(r -> r.size() == 10));
assert r2.size() == 0;

Util.waitUntilTrue(5000, 500, () -> Stream.of(a, b, c)
.map(c -> ((NAKACK2)c.getProtocolStack().findProtocol(NAKACK2.class)).getWindow(b.getAddress()))
.allMatch(t -> t.getHighestReceived() == 10 && t.getHighestDelivered() == 10 && t.getLow() == 10));

for(JChannel ch: Arrays.asList(a, b, c)) {
NAKACK2 n=ch.getProtocolStack().findProtocol(NAKACK2.class);
Table<Message> t=n.getWindow(b.getAddress());
assert t.getHighestReceived() == 10 && t.getHighestDelivered() == 10 && t.getLow() == 10
: String.format("table for %s is %s (low is probably 0)", ch.getName(), t);
}
}

protected static JChannel create(String name) throws Exception {
JChannel ch=new JChannel(Util.getTestStack()).name(name);
STABLE stable=ch.getProtocolStack().findProtocol(STABLE.class);
stable.setDesiredAverageGossip(0); // disabled periodical stable
stable.setMaxBytes(10000);
return ch;
}
}

0 comments on commit c79ee74

Please sign in to comment.