Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

We’re showing branches in this repository, but you can also compare across forks.

base fork: belaban/JGroups
base: 2ffffae
...
head fork: belaban/JGroups
compare: 696434b
  • 2 commits
  • 12 files changed
  • 0 commit comments
  • 1 contributor
1  build.xml
View
@@ -33,7 +33,6 @@
<property name="bin.dir" value="${root.dir}/bin"/>
<property name="keystore.dir" value="${root.dir}/keystore"/>
<property name="javadoc.packages" value="org.jgroups.*"/>
- <!--<property name="javadoc.packages" value="org.jgroups,org.jgroups.blocks"/>-->
<property name="timestamp" value=".timestamp"/>
<property name="protocols.xml" value="${manual.dir}/en/modules/protocols.xml"/>
<property name="maven.executable" value="mvn"/>
46 src/org/jgroups/blocks/Request.java
View
@@ -34,7 +34,7 @@
/** Is set as soon as the request has received all required responses */
protected final Condition completed=lock.newCondition();
- protected final Message request_msg;
+ protected final Message request_msg;
protected final RequestCorrelator corr; // either use RequestCorrelator or ...
protected final RequestOptions options;
@@ -170,7 +170,7 @@ protected static long getRequestId() {
/** This method runs with lock locked (called by <code>execute()</code>). */
@GuardedBy("lock")
- protected boolean responsesComplete(long timeout) throws InterruptedException {
+ protected boolean responsesComplete(final long timeout) throws InterruptedException {
if(timeout <= 0) {
while(!done) { /* Wait for responses: */
if(responsesComplete()) {
@@ -180,30 +180,29 @@ protected boolean responsesComplete(long timeout) throws InterruptedException {
}
completed.await();
}
- return responsesComplete();
}
else {
- long start_time=System.currentTimeMillis();
- long timeout_time=start_time + timeout;
- while(timeout > 0 && !done) { /* Wait for responses: */
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0 && !done) { /* Wait for responses: */
if(responsesComplete()) {
if(corr != null)
corr.done(req_id);
return true;
}
- timeout=timeout_time - System.currentTimeMillis();
- if(timeout > 0) {
- completed.await(timeout, TimeUnit.MILLISECONDS);
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ completed.await(wait_time, TimeUnit.NANOSECONDS);
}
}
if(corr != null)
corr.done(req_id);
- return responsesComplete();
}
+ return responsesComplete();
}
@GuardedBy("lock")
- protected boolean waitForResults(long timeout) {
+ protected boolean waitForResults(final long timeout) {
if(timeout <= 0) {
while(true) { /* Wait for responses: */
if(responsesComplete())
@@ -212,14 +211,14 @@ protected boolean waitForResults(long timeout) {
}
}
else {
- long start_time=System.currentTimeMillis();
- long timeout_time=start_time + timeout;
- while(timeout > 0) { /* Wait for responses: */
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0) { /* Wait for responses: */
if(responsesComplete())
return true;
- timeout=timeout_time - System.currentTimeMillis();
- if(timeout > 0) {
- try {completed.await(timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {}
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ try {completed.await(wait_time, TimeUnit.NANOSECONDS);} catch(Exception e) {}
}
}
return false;
@@ -227,17 +226,4 @@ protected boolean waitForResults(long timeout) {
}
-
- /*public static String modeToString(int m) {
- switch(m) {
- case GET_FIRST: return "GET_FIRST";
- case GET_ALL: return "GET_ALL";
- case GET_MAJORITY: return "GET_MAJORITY";
- case GET_ABS_MAJORITY: return "GET_ABS_MAJORITY";
- case GET_NONE: return "GET_NONE";
- default: return "<unknown> (" + m + ")";
- }
- }*/
-
-
}
8 src/org/jgroups/protocols/TP.java
View
@@ -1769,7 +1769,7 @@ private void handleMyMessage(Message msg, boolean multicast) {
int num_msgs=0;
@GuardedBy("lock")
int num_bundling_tasks=0;
- long last_bundle_time;
+ long last_bundle_time; // in nanoseconds
final ReentrantLock lock=new ReentrantLock();
final Log log=LogFactory.getLog(getClass());
@@ -1819,7 +1819,7 @@ private void addMessage(Message msg) {
SingletonAddress dest=new SingletonAddress(cluster_name, dst);
if(msgs.isEmpty())
- last_bundle_time=System.currentTimeMillis();
+ last_bundle_time=System.nanoTime();
List<Message> tmp=msgs.get(dest);
if(tmp == null) {
tmp=new LinkedList<Message>();
@@ -1837,13 +1837,13 @@ private void addMessage(Message msg) {
*/
private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs) {
if(log.isTraceEnabled()) {
- long stop=System.currentTimeMillis();
double percentage=100.0 / max_bundle_size * count;
StringBuilder sb=new StringBuilder("sending ").append(num_msgs).append(" msgs (");
num_msgs=0;
sb.append(count).append(" bytes (" + f.format(percentage) + "% of max_bundle_size)");
if(last_bundle_time > 0) {
- sb.append(", collected in ").append(stop-last_bundle_time).append("ms) ");
+ long diff=(System.nanoTime() - last_bundle_time) / 1000000;
+ sb.append(", collected in ").append(diff).append("ms) ");
}
sb.append(" to ").append(msgs.size()).append(" destination(s)");
if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
96 src/org/jgroups/util/Promise.java
View
@@ -23,17 +23,14 @@
private T result=null;
private volatile boolean hasResult=false;
- public Lock getLock() {
- return lock;
- }
-
- public Condition getCond() {
- return cond;
- }
+
+ public Lock getLock() {return lock;}
+ public Condition getCond() {return cond;}
+
/**
* Blocks until a result is available, or timeout milliseconds have elapsed
- * @param timeout
+ * @param timeout in ms
* @return An object
* @throws TimeoutException If a timeout occurred (implies that timeout > 0)
*/
@@ -48,39 +45,6 @@ public T getResultWithTimeout(long timeout) throws TimeoutException {
}
}
-
- /**
- * Blocks until a result is available, or timeout milliseconds have elapsed. Needs to be called with lock held
- * @param timeout
- * @return An object
- * @throws TimeoutException If a timeout occurred (implies that timeout > 0)
- */
- private T _getResultWithTimeout(long timeout) throws TimeoutException {
- long time_to_wait=timeout, start;
- boolean timeout_occurred=false;
-
- start=System.currentTimeMillis();
- while(hasResult == false) {
- if(timeout <= 0) {
- doWait();
- }
- else {
- if(time_to_wait <= 0) {
- timeout_occurred=true;
- break; // terminate the while loop
- }
- else {
- doWait(time_to_wait);
- time_to_wait=timeout - (System.currentTimeMillis() - start);
- }
- }
- }
-
- if(timeout_occurred)
- throw new TimeoutException();
- return result;
- }
-
public T getResult() {
try {
return getResultWithTimeout(0);
@@ -92,8 +56,8 @@ public T getResult() {
/**
* Returns the result, but never throws a TimeoutException; returns null instead.
- * @param timeout
- * @return Object
+ * @param timeout in ms
+ * @return T
*/
public T getResult(long timeout) {
try {
@@ -104,18 +68,6 @@ public T getResult(long timeout) {
}
}
-
- private void doWait() {
- try {cond.await();} catch(InterruptedException e) {}
- }
-
- private void doWait(long timeout) {
- try {cond.await(timeout, TimeUnit.MILLISECONDS);} catch(InterruptedException e) {}
- }
-
-
-
-
/**
* Checks whether result is available. Does not block.
*/
@@ -130,8 +82,7 @@ public boolean hasResult() {
}
/**
- * Sets the result and notifies any threads
- * waiting for it
+ * Sets the result and notifies any threads waiting for it
*/
public void setResult(T obj) {
lock.lock();
@@ -167,4 +118,35 @@ public String toString() {
}
+
+
+ /**
+ * Blocks until a result is available, or timeout milliseconds have elapsed. Needs to be called with lock held
+ * @param timeout in ms
+ * @return An object
+ * @throws TimeoutException If a timeout occurred (implies that timeout > 0)
+ */
+ protected T _getResultWithTimeout(final long timeout) throws TimeoutException {
+ if(timeout <= 0) {
+ while(!hasResult) { /* Wait for responses: */
+ try {cond.await();} catch(Exception e) {}
+ }
+ }
+ else {
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ final long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0 && !hasResult) { /* Wait for responses: */
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ try {cond.await(wait_time, TimeUnit.NANOSECONDS);} catch(Exception e) {}
+ }
+ }
+ if(!hasResult && wait_time <= 0)
+ throw new TimeoutException();
+ }
+ return result;
+ }
+
+
+
}
7 src/org/jgroups/util/ResponseCollector.java
View
@@ -180,17 +180,16 @@ public int size() {
public boolean waitForAllResponses(long timeout) {
if(timeout <= 0)
timeout=2000L;
- long end_time=System.currentTimeMillis() + timeout;
- long wait_time;
lock.lock();
try {
+ final long end_time=System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
while(!hasAllResponses()) {
- wait_time=end_time - System.currentTimeMillis();
+ long wait_time=end_time - System.nanoTime();
if(wait_time <= 0)
return false;
try {
- cond.await(wait_time, TimeUnit.MILLISECONDS);
+ cond.await(wait_time, TimeUnit.NANOSECONDS);
}
catch(InterruptedException e) {
Thread.currentThread().interrupt(); // set interrupt flag again
27 src/org/jgroups/util/Table.java
View
@@ -4,6 +4,7 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +47,11 @@
/** The highest delivered (= removed) seqno */
protected long hd;
- /** Time (in ms) after which a compaction should take place. 0 disables compaction */
- protected long max_compaction_time=DEFAULT_MAX_COMPACTION_TIME;
+ /** Time (in nanoseconds) after which a compaction should take place. 0 disables compaction */
+ protected long max_compaction_time=TimeUnit.NANOSECONDS.convert(DEFAULT_MAX_COMPACTION_TIME, TimeUnit.MILLISECONDS);
/** The time when the last compaction took place. If a {@link #compact()} takes place and sees that the
- * last compaction is more than max_compaction_time ms ago, a compaction will take place */
+ * last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place */
protected long last_compaction_timestamp=0;
protected final Lock lock=new ReentrantLock();
@@ -59,7 +60,7 @@
protected int num_compactions=0, num_resizes=0, num_moves=0, num_purges=0;
- protected static final long DEFAULT_MAX_COMPACTION_TIME=10000;
+ protected static final long DEFAULT_MAX_COMPACTION_TIME=10000; // in milliseconds
protected static final double DEFAULT_RESIZE_FACTOR=1.2;
@@ -100,12 +101,20 @@ public Table(int num_rows, int elements_per_row, long offset, double resize_fact
this(num_rows,elements_per_row, offset, resize_factor, DEFAULT_MAX_COMPACTION_TIME);
}
+ /**
+ * Creates a new table
+ * @param num_rows the number of rows in the matrix
+ * @param elements_per_row the number of messages per row.
+ * @param offset the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
+ * @param resize_factor teh factor with which to increase the number of rows
+ * @param max_compaction_time the max time in milliseconds after we attempt a compaction
+ */
@SuppressWarnings("unchecked")
public Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time) {
this.num_rows=num_rows;
this.elements_per_row=elements_per_row;
this.resize_factor=resize_factor;
- this.max_compaction_time=max_compaction_time;
+ this.max_compaction_time=TimeUnit.NANOSECONDS.convert(max_compaction_time, TimeUnit.MILLISECONDS);
this.offset=this.low=this.hr=this.hd=offset;
matrix=(T[][])new Object[num_rows][];
if(resize_factor <= 1)
@@ -132,7 +141,9 @@ public Table(int num_rows, int elements_per_row, long offset, double resize_fact
public long getHighestDelivered() {return hd;}
public long getHighestReceived() {return hr;}
public long getMaxCompactionTime() {return max_compaction_time;}
- public void setMaxCompactionTime(long max_compaction_time) {this.max_compaction_time=max_compaction_time;}
+ public void setMaxCompactionTime(long max_compaction_time) {
+ this.max_compaction_time=TimeUnit.NANOSECONDS.convert(max_compaction_time, TimeUnit.MILLISECONDS);
+ }
public int getNumRows() {return matrix.length;}
public void resetStats() {num_compactions=num_moves=num_resizes=num_purges=0;}
@@ -357,14 +368,14 @@ public void purge(long seqno, boolean force) {
if(max_compaction_time <= 0) // see if compaction should be triggered
return;
- long current_time=System.currentTimeMillis();
+ long current_time=System.nanoTime();
if(last_compaction_timestamp > 0) {
if(current_time - last_compaction_timestamp >= max_compaction_time) {
_compact();
last_compaction_timestamp=current_time;
}
}
- else
+ else // the first time we don't do a compaction
last_compaction_timestamp=current_time;
}
finally {
1  src/org/jgroups/util/Util.java
View
@@ -7,7 +7,6 @@
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
-import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.GMS;
2  tests/junit-functional/org/jgroups/tests/PromiseTest.java
View
@@ -81,7 +81,7 @@ public void run() {
};
t.start();
long start=System.currentTimeMillis(), stop;
- Object result=p.getResult(100000);
+ Object result=p.getResult(30000);
stop=System.currentTimeMillis();
System.out.println("-- waited for " + (stop-start) + "ms, result is " + result);
assert result != null;
3  tests/junit-functional/org/jgroups/tests/ResponseCollectorTest.java
View
@@ -16,7 +16,8 @@
*/
@Test(groups=Global.FUNCTIONAL,sequential=false)
public class ResponseCollectorTest {
- static final Address a=Util.createRandomAddress(), b=Util.createRandomAddress(), c=Util.createRandomAddress();
+ static final Address a=Util.createRandomAddress("A"),
+ b=Util.createRandomAddress("B"), c=Util.createRandomAddress("C");
public static void testAdd() {
4 tests/junit/org/jgroups/blocks/ExecutingServiceTest.java
View
@@ -68,7 +68,7 @@ protected void init() throws Exception {
// Add the exposed executing protocol
ProtocolStack stack=c1.getProtocolStack();
exposedProtocol = new ExposedExecutingProtocol();
- exposedProtocol.setLevel("trace");
+ exposedProtocol.setLevel("debug");
stack.insertProtocolAtTop(exposedProtocol);
er1=new ExecutionRunner(c1);
@@ -82,7 +82,7 @@ protected void init() throws Exception {
er3=new ExecutionRunner(c3);
c3.connect("ExecutionServiceTest");
- LogFactory.getLog(ExecutionRunner.class).setLevel("trace");
+ LogFactory.getLog(ExecutionRunner.class).setLevel("debug");
}
@AfterClass
149 tests/junit/org/jgroups/blocks/MuxRpcDispatcherTest.java
View
@@ -7,25 +7,25 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.Util;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
+import org.testng.annotations.*;
import java.util.Map;
/**
* @author Paul Ferraro
*/
-@Test(groups=Global.STACK_DEPENDENT)
+@Test(groups={Global.STACK_DEPENDENT},sequential=true)
public class MuxRpcDispatcherTest extends ChannelTestBase {
- private JChannel[] channels = new JChannel[2];
+ private JChannel[] channels = null;
+ private RpcDispatcher[] dispatchers = null;
+ private RpcDispatcher[][] muxDispatchers = null;
- private RpcDispatcher[] dispatchers = new RpcDispatcher[2];
- private RpcDispatcher[][] muxDispatchers = new RpcDispatcher[2][2];
-
- @BeforeClass
+ @BeforeMethod
void setUp() throws Exception {
+ channels = new JChannel[2];
+ dispatchers = new RpcDispatcher[2];
+ muxDispatchers = new RpcDispatcher[2][2];
channels[0] = createChannel(true, 2, "A");
channels[1] = createChannel(channels[0], "B");
@@ -48,27 +48,76 @@ void setUp() throws Exception {
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, channels);
}
- @AfterClass
+ @AfterMethod
void tearDown() throws Exception {
for (int i = 0; i < dispatchers.length; ++i) {
- channels[i].disconnect();
- channels[i].close();
dispatchers[i].stop();
-
- for (int j = 0; j < muxDispatchers[i].length; ++j) {
+ for (int j = 0; j < muxDispatchers[i].length; ++j)
muxDispatchers[i][j].stop();
- }
}
+ for(JChannel ch: channels)
+ Util.close(ch);
+ }
+
+
+
+ public void testUnicastRPCs() throws Exception {
+ RequestOptions options=RequestOptions.SYNC().setTimeout(30000);
+ MethodCall method = new MethodCall("getName", new Object[0], new Class[0]);
+
+ final Address address = channels[1].getAddress();
+
+ // Validate normal dispatchers
+ Object response = dispatchers[0].callRemoteMethod(address, method, options);
+ Assert.assertEquals(response, "dispatcher[1]");
+
+ // Validate muxed dispatchers
+ for (int j = 0; j < muxDispatchers[0].length; ++j) {
+ response = muxDispatchers[0][j].callRemoteMethod(address, method, options);
+ Assert.assertEquals(response, "muxDispatcher[1][" + j + "]");
+ }
+
+ // Filter testing is disabled for now pending filter improvements in JGroups 3
+
+ // Validate muxed rpc dispatchers w/filter
+
+ RspFilter filter = new RspFilter() {
+ @Override public boolean isAcceptable(Object response, Address sender) {return !sender.equals(address);}
+ @Override public boolean needMoreResponses() {return true;}
+ };
+
+ response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setRspFilter(filter));
+ assert response == null;
+
+ // Validate stopped mux dispatcher response is auto-filtered
+ muxDispatchers[1][0].stop();
+
+ try {
+ response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setTimeout(2000).setRspFilter(null));
+ assert false : "should have run into a TimeoutException";
+ }
+ catch(TimeoutException timeout) {
+ System.out.println("received " + timeout + " - as expected");
+ }
+
+ Assert.assertNull(response);
+
+ // Validate restarted mux dispatcher functions normally
+ muxDispatchers[1][0].start();
+
+ response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setRspFilter(null));
+
+ Assert.assertEquals(response, "muxDispatcher[1][0]");
}
public void testMulticastRPCs() throws Exception {
MethodCall method = new MethodCall("getName", new Object[0], new Class[0]);
RequestOptions options=RequestOptions.SYNC().setTimeout(30000);
-
+
// Validate normal dispatchers
Map<Address, Rsp<String>> responses = dispatchers[0].callRemoteMethods(null, method, options);
Assert.assertEquals(responses.size(), 2);
-
+
for (int i = 0; i < dispatchers.length; ++i)
verifyResponse(responses, channels[i], "dispatcher[" + i + "]");
@@ -79,10 +128,10 @@ public void testMulticastRPCs() throws Exception {
for (int i = 0; i < dispatchers.length; ++i)
verifyResponse(responses, channels[i], "muxDispatcher[" + i + "][" + j + "]");
}
-
+
// Validate muxed rpc dispatchers w/filter
final Address address = channels[0].getAddress();
-
+
RspFilter filter = new RspFilter() {
public boolean isAcceptable(Object response, Address sender) {
return !sender.equals(address);
@@ -91,31 +140,31 @@ public boolean needMoreResponses() {
return true;
}
};
-
+
responses = muxDispatchers[0][0].callRemoteMethods(null,method,RequestOptions.SYNC().setTimeout(30000).setRspFilter(filter));
Assert.assertEquals(responses.size(), 2);
verifyResponse(responses, channels[0], null);
verifyResponse(responses, channels[1], "muxDispatcher[1][0]");
-
+
// Validate stopped mux dispatcher response is auto-filtered
muxDispatchers[1][0].stop();
-
+
responses = muxDispatchers[0][0].callRemoteMethods(null, method, options.setTimeout(2000));
Assert.assertEquals(responses.size(), 2);
verifyResponse(responses, channels[0], "muxDispatcher[0][0]");
verifyResponse(responses, channels[1], null);
-
+
// Validate stopped mux dispatcher response is auto-filtered and custom filter is applied
responses = muxDispatchers[0][0].callRemoteMethods(null,method,RequestOptions.SYNC().setTimeout(2000).setRspFilter(filter));
Assert.assertEquals(responses.size(), 2);
verifyResponse(responses, channels[0], null);
verifyResponse(responses, channels[1], null);
-
+
// Validate restarted mux dispatcher functions normally
muxDispatchers[1][0].start();
-
+
responses = muxDispatchers[0][0].callRemoteMethods(null, method, options.setTimeout(30000));
Assert.assertEquals(responses.size(), 2);
@@ -124,56 +173,6 @@ public boolean needMoreResponses() {
}
-
- public void testUnicastRPCs() throws Exception {
- RequestOptions options=RequestOptions.SYNC().setTimeout(30000);
- MethodCall method = new MethodCall("getName", new Object[0], new Class[0]);
-
- final Address address = channels[1].getAddress();
-
- // Validate normal dispatchers
- Object response = dispatchers[0].callRemoteMethod(address, method, options);
- Assert.assertEquals(response, "dispatcher[1]");
-
- // Validate muxed dispatchers
- for (int j = 0; j < muxDispatchers[0].length; ++j) {
- response = muxDispatchers[0][j].callRemoteMethod(address, method, options);
- Assert.assertEquals(response, "muxDispatcher[1][" + j + "]");
- }
-
- // Filter testing is disabled for now pending filter improvements in JGroups 3
-
- // Validate muxed rpc dispatchers w/filter
-
- RspFilter filter = new RspFilter() {
- @Override public boolean isAcceptable(Object response, Address sender) {return !sender.equals(address);}
- @Override public boolean needMoreResponses() {return true;}
- };
-
- response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setRspFilter(filter));
- assert response == null;
-
- // Validate stopped mux dispatcher response is auto-filtered
- muxDispatchers[1][0].stop();
-
- try {
- response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setTimeout(2000).setRspFilter(null));
- assert false : "should have run into a TimeoutException";
- }
- catch(TimeoutException timeout) {
- System.out.println("received " + timeout + " - as expected");
- }
-
- Assert.assertNull(response);
-
- // Validate restarted mux dispatcher functions normally
- muxDispatchers[1][0].start();
-
- response = muxDispatchers[0][0].callRemoteMethod(address, method, RequestOptions.SYNC().setRspFilter(null));
-
- Assert.assertEquals(response, "muxDispatcher[1][0]");
- }
-
private static void verifyResponse(Map<Address,Rsp<String>> responses, Channel channel, Object expected) {
Rsp<?> response = responses.get(channel.getAddress());
String address = channel.getAddress().toString();
96 tests/junit/org/jgroups/tests/TCPGOSSIP_Test.java
View
@@ -3,8 +3,11 @@
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.View;
+import org.jgroups.protocols.MERGE2;
+import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.TCPGOSSIP;
import org.jgroups.stack.GossipRouter;
+import org.jgroups.stack.Protocol;
import org.jgroups.util.StackType;
import org.jgroups.util.Util;
import org.testng.annotations.AfterClass;
@@ -12,14 +15,17 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.net.InetSocketAddress;
+import java.util.List;
+
/**
- * Tests TCPGOSSIP protocol
+ * Tests the TCPGOSSIP protocol
*
* @author Vladimir Blagojevic
*
**/
@Test(groups = { Global.STACK_INDEPENDENT, Global.GOSSIP_ROUTER }, sequential = true)
-public class TCPGOSSIP_Test extends ChannelTestBase {
+public class TCPGOSSIP_Test {
private JChannel channel, coordinator;
private final static String GROUP = "TCPGOSSIP_Test";
private GossipRouter gossipRouter;
@@ -28,20 +34,19 @@
@BeforeClass
void startRouter() throws Exception {
String bind_addr = getRouterBindAddress();
- gossipRouter = new GossipRouter(12001, null); // binds the GR to 0.0.0.0
+
+ System.setProperty("jgroups.bind_addr", bind_addr);
+
+ gossipRouter = new GossipRouter(12001, bind_addr); // binds the GR to 127.0.0.1:12001
gossipRouter.start();
}
private static String getRouterBindAddress() {
- String bind_addr = Util.getProperty(Global.BIND_ADDR);
- if (bind_addr == null) {
- StackType type = Util.getIpStackType();
- if (type == StackType.IPv6)
- bind_addr = "::1";
- else
- bind_addr = "127.0.0.1";
- }
- return bind_addr;
+ StackType type = Util.getIpStackType();
+ if (type == StackType.IPv6)
+ return "::1";
+ else
+ return "127.0.0.1";
}
@AfterClass(alwaysRun = true)
@@ -55,12 +60,11 @@ void tearDown() throws Exception {
}
/**
- * Tests connect-disconnect-connect sequence for a group with two members (using default
- * configuration).
+ * Tests connect-disconnect-connect sequence for a group with two members (using the default configuration).
**/
public void testDisconnectConnectTwo() throws Exception {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect("DisconnectTest.testgroup-1");
channel.disconnect();
@@ -72,15 +76,14 @@ public void testDisconnectConnectTwo() throws Exception {
}
public void testAddInitialHosts() throws Exception {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect(GROUP);
TCPGOSSIP p = (TCPGOSSIP) channel.getProtocolStack().findProtocol(TCPGOSSIP.class);
String bind_addr = getRouterBindAddress();
assert p.removeInitialHost(bind_addr, 12001);
p.addInitialHost(bind_addr, 12001);
-
View view = channel.getView();
assert view.size() == 2;
@@ -91,11 +94,11 @@ public void testAddInitialHosts() throws Exception {
public void testConnectThree() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect(GROUP);
- third = new JChannel(props);
+ third=createChannel("C");
third.connect(GROUP);
View view = channel.getView();
assert channel.getView().size() == 3;
@@ -110,8 +113,9 @@ public void testConnectThree() throws Exception {
public void testConnectThreeChannelsWithGRDown() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
+ changeMergeInterval(coordinator, channel);
coordinator.connect("testConnectThreeChannelsWithGRDown");
channel.connect("testConnectThreeChannelsWithGRDown");
@@ -120,13 +124,13 @@ public void testConnectThreeChannelsWithGRDown() throws Exception {
// cannot discover others since GR is down
- third = new JChannel(props);
+ third=createChannel("C");
+ changeMergeInterval(third);
third.connect("testConnectThreeChannelsWithGRDown");
-
// restart and....
gossipRouter.start();
- Util.waitUntilAllChannelsHaveSameSize(60000, 500, coordinator, channel, third);
+ Util.waitUntilAllChannelsHaveSameSize(60000, 1000, coordinator, channel, third);
// confirm they found each other
View view = channel.getView();
@@ -142,8 +146,9 @@ public void testConnectThreeChannelsWithGRDown() throws Exception {
public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
+ changeMergeInterval(coordinator, channel);
// kill router
gossipRouter.stop();
@@ -152,12 +157,13 @@ public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
coordinator.connect("testConnectThreeChannelsWithGRAlreadyDown");
channel.connect("testConnectThreeChannelsWithGRAlreadyDown");
- third = new JChannel(props);
+ third=createChannel("C");
+ changeMergeInterval(third);
third.connect("testConnectThreeChannelsWithGRAlreadyDown");
// restart and....
gossipRouter.start();
- Util.waitUntilAllChannelsHaveSameSize(60000, 500, coordinator, channel, third);
+ Util.waitUntilAllChannelsHaveSameSize(60000, 1000, coordinator, channel, third);
// confirm they found each other
View view = channel.getView();
@@ -169,4 +175,32 @@ public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
Util.close(third);
}
}
+
+ protected JChannel createChannel(String name) throws Exception {
+ JChannel retval=new JChannel(props);
+ retval.setName(name);
+ changeGossipRouter(retval,getRouterBindAddress(),12001);
+ return retval;
+ }
+
+ protected void changeGossipRouter(JChannel channel, String host, int port) {
+ TCPGOSSIP tcp_gossip_prot=(TCPGOSSIP)channel.getProtocolStack().findProtocol(TCPGOSSIP.class);
+ List<InetSocketAddress> initial_hosts=tcp_gossip_prot.getInitialHosts();
+ initial_hosts.clear();
+ initial_hosts.add(new InetSocketAddress(host, port));
+ }
+
+ protected void changeMergeInterval(JChannel ... channels) {
+ for(JChannel ch: channels) {
+ Protocol p=ch.getProtocolStack().findProtocol(MERGE2.class,MERGE3.class);
+ if(p instanceof MERGE2) {
+ ((MERGE2)p).setMinInterval(1000);
+ ((MERGE2)p).setMaxInterval(3000);
+ }
+ else if(p instanceof MERGE3) {
+ ((MERGE3)p).setMaxInterval(1000);
+ ((MERGE3)p).setMaxInterval(3000);
+ }
+ }
+ }
}

No commit comments for this range

Something went wrong with that request. Please try again.