Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Replaced some occurrences of currentTimeMillis() withnanoTime()

  • Loading branch information...
commit 696434b4f9c4836fae7201927933462a368573ff 1 parent 3e57100
@belaban authored
View
46 src/org/jgroups/blocks/Request.java
@@ -34,7 +34,7 @@
/** Is set as soon as the request has received all required responses */
protected final Condition completed=lock.newCondition();
- protected final Message request_msg;
+ protected final Message request_msg;
protected final RequestCorrelator corr; // either use RequestCorrelator or ...
protected final RequestOptions options;
@@ -170,7 +170,7 @@ protected static long getRequestId() {
/** This method runs with lock locked (called by <code>execute()</code>). */
@GuardedBy("lock")
- protected boolean responsesComplete(long timeout) throws InterruptedException {
+ protected boolean responsesComplete(final long timeout) throws InterruptedException {
if(timeout <= 0) {
while(!done) { /* Wait for responses: */
if(responsesComplete()) {
@@ -180,30 +180,29 @@ protected boolean responsesComplete(long timeout) throws InterruptedException {
}
completed.await();
}
- return responsesComplete();
}
else {
- long start_time=System.currentTimeMillis();
- long timeout_time=start_time + timeout;
- while(timeout > 0 && !done) { /* Wait for responses: */
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0 && !done) { /* Wait for responses: */
if(responsesComplete()) {
if(corr != null)
corr.done(req_id);
return true;
}
- timeout=timeout_time - System.currentTimeMillis();
- if(timeout > 0) {
- completed.await(timeout, TimeUnit.MILLISECONDS);
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ completed.await(wait_time, TimeUnit.NANOSECONDS);
}
}
if(corr != null)
corr.done(req_id);
- return responsesComplete();
}
+ return responsesComplete();
}
@GuardedBy("lock")
- protected boolean waitForResults(long timeout) {
+ protected boolean waitForResults(final long timeout) {
if(timeout <= 0) {
while(true) { /* Wait for responses: */
if(responsesComplete())
@@ -212,14 +211,14 @@ protected boolean waitForResults(long timeout) {
}
}
else {
- long start_time=System.currentTimeMillis();
- long timeout_time=start_time + timeout;
- while(timeout > 0) { /* Wait for responses: */
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0) { /* Wait for responses: */
if(responsesComplete())
return true;
- timeout=timeout_time - System.currentTimeMillis();
- if(timeout > 0) {
- try {completed.await(timeout, TimeUnit.MILLISECONDS);} catch(Exception e) {}
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ try {completed.await(wait_time, TimeUnit.NANOSECONDS);} catch(Exception e) {}
}
}
return false;
@@ -227,17 +226,4 @@ protected boolean waitForResults(long timeout) {
}
-
- /*public static String modeToString(int m) {
- switch(m) {
- case GET_FIRST: return "GET_FIRST";
- case GET_ALL: return "GET_ALL";
- case GET_MAJORITY: return "GET_MAJORITY";
- case GET_ABS_MAJORITY: return "GET_ABS_MAJORITY";
- case GET_NONE: return "GET_NONE";
- default: return "<unknown> (" + m + ")";
- }
- }*/
-
-
}
View
8 src/org/jgroups/protocols/TP.java
@@ -1769,7 +1769,7 @@ private void handleMyMessage(Message msg, boolean multicast) {
int num_msgs=0;
@GuardedBy("lock")
int num_bundling_tasks=0;
- long last_bundle_time;
+ long last_bundle_time; // in nanoseconds
final ReentrantLock lock=new ReentrantLock();
final Log log=LogFactory.getLog(getClass());
@@ -1819,7 +1819,7 @@ private void addMessage(Message msg) {
SingletonAddress dest=new SingletonAddress(cluster_name, dst);
if(msgs.isEmpty())
- last_bundle_time=System.currentTimeMillis();
+ last_bundle_time=System.nanoTime();
List<Message> tmp=msgs.get(dest);
if(tmp == null) {
tmp=new LinkedList<Message>();
@@ -1837,13 +1837,13 @@ private void addMessage(Message msg) {
*/
private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs) {
if(log.isTraceEnabled()) {
- long stop=System.currentTimeMillis();
double percentage=100.0 / max_bundle_size * count;
StringBuilder sb=new StringBuilder("sending ").append(num_msgs).append(" msgs (");
num_msgs=0;
sb.append(count).append(" bytes (" + f.format(percentage) + "% of max_bundle_size)");
if(last_bundle_time > 0) {
- sb.append(", collected in ").append(stop-last_bundle_time).append("ms) ");
+ long diff=(System.nanoTime() - last_bundle_time) / 1000000;
+ sb.append(", collected in ").append(diff).append("ms) ");
}
sb.append(" to ").append(msgs.size()).append(" destination(s)");
if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
View
96 src/org/jgroups/util/Promise.java
@@ -23,17 +23,14 @@
private T result=null;
private volatile boolean hasResult=false;
- public Lock getLock() {
- return lock;
- }
-
- public Condition getCond() {
- return cond;
- }
+
+ public Lock getLock() {return lock;}
+ public Condition getCond() {return cond;}
+
/**
* Blocks until a result is available, or timeout milliseconds have elapsed
- * @param timeout
+ * @param timeout in ms
* @return An object
* @throws TimeoutException If a timeout occurred (implies that timeout > 0)
*/
@@ -48,39 +45,6 @@ public T getResultWithTimeout(long timeout) throws TimeoutException {
}
}
-
- /**
- * Blocks until a result is available, or timeout milliseconds have elapsed. Needs to be called with lock held
- * @param timeout
- * @return An object
- * @throws TimeoutException If a timeout occurred (implies that timeout > 0)
- */
- private T _getResultWithTimeout(long timeout) throws TimeoutException {
- long time_to_wait=timeout, start;
- boolean timeout_occurred=false;
-
- start=System.currentTimeMillis();
- while(hasResult == false) {
- if(timeout <= 0) {
- doWait();
- }
- else {
- if(time_to_wait <= 0) {
- timeout_occurred=true;
- break; // terminate the while loop
- }
- else {
- doWait(time_to_wait);
- time_to_wait=timeout - (System.currentTimeMillis() - start);
- }
- }
- }
-
- if(timeout_occurred)
- throw new TimeoutException();
- return result;
- }
-
public T getResult() {
try {
return getResultWithTimeout(0);
@@ -92,8 +56,8 @@ public T getResult() {
/**
* Returns the result, but never throws a TimeoutException; returns null instead.
- * @param timeout
- * @return Object
+ * @param timeout in ms
+ * @return T
*/
public T getResult(long timeout) {
try {
@@ -104,18 +68,6 @@ public T getResult(long timeout) {
}
}
-
- private void doWait() {
- try {cond.await();} catch(InterruptedException e) {}
- }
-
- private void doWait(long timeout) {
- try {cond.await(timeout, TimeUnit.MILLISECONDS);} catch(InterruptedException e) {}
- }
-
-
-
-
/**
* Checks whether result is available. Does not block.
*/
@@ -130,8 +82,7 @@ public boolean hasResult() {
}
/**
- * Sets the result and notifies any threads
- * waiting for it
+ * Sets the result and notifies any threads waiting for it
*/
public void setResult(T obj) {
lock.lock();
@@ -167,4 +118,35 @@ public String toString() {
}
+
+
+ /**
+ * Blocks until a result is available, or timeout milliseconds have elapsed. Needs to be called with lock held
+ * @param timeout in ms
+ * @return An object
+ * @throws TimeoutException If a timeout occurred (implies that timeout > 0)
+ */
+ protected T _getResultWithTimeout(final long timeout) throws TimeoutException {
+ if(timeout <= 0) {
+ while(!hasResult) { /* Wait for responses: */
+ try {cond.await();} catch(Exception e) {}
+ }
+ }
+ else {
+ long wait_time=TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ final long target_time=System.nanoTime() + wait_time;
+ while(wait_time > 0 && !hasResult) { /* Wait for responses: */
+ wait_time=target_time - System.nanoTime();
+ if(wait_time > 0) {
+ try {cond.await(wait_time, TimeUnit.NANOSECONDS);} catch(Exception e) {}
+ }
+ }
+ if(!hasResult && wait_time <= 0)
+ throw new TimeoutException();
+ }
+ return result;
+ }
+
+
+
}
View
7 src/org/jgroups/util/ResponseCollector.java
@@ -180,17 +180,16 @@ public int size() {
public boolean waitForAllResponses(long timeout) {
if(timeout <= 0)
timeout=2000L;
- long end_time=System.currentTimeMillis() + timeout;
- long wait_time;
lock.lock();
try {
+ final long end_time=System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
while(!hasAllResponses()) {
- wait_time=end_time - System.currentTimeMillis();
+ long wait_time=end_time - System.nanoTime();
if(wait_time <= 0)
return false;
try {
- cond.await(wait_time, TimeUnit.MILLISECONDS);
+ cond.await(wait_time, TimeUnit.NANOSECONDS);
}
catch(InterruptedException e) {
Thread.currentThread().interrupt(); // set interrupt flag again
View
27 src/org/jgroups/util/Table.java
@@ -4,6 +4,7 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +47,11 @@
/** The highest delivered (= removed) seqno */
protected long hd;
- /** Time (in ms) after which a compaction should take place. 0 disables compaction */
- protected long max_compaction_time=DEFAULT_MAX_COMPACTION_TIME;
+ /** Time (in nanoseconds) after which a compaction should take place. 0 disables compaction */
+ protected long max_compaction_time=TimeUnit.NANOSECONDS.convert(DEFAULT_MAX_COMPACTION_TIME, TimeUnit.MILLISECONDS);
/** The time when the last compaction took place. If a {@link #compact()} takes place and sees that the
- * last compaction is more than max_compaction_time ms ago, a compaction will take place */
+ * last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place */
protected long last_compaction_timestamp=0;
protected final Lock lock=new ReentrantLock();
@@ -59,7 +60,7 @@
protected int num_compactions=0, num_resizes=0, num_moves=0, num_purges=0;
- protected static final long DEFAULT_MAX_COMPACTION_TIME=10000;
+ protected static final long DEFAULT_MAX_COMPACTION_TIME=10000; // in milliseconds
protected static final double DEFAULT_RESIZE_FACTOR=1.2;
@@ -100,12 +101,20 @@ public Table(int num_rows, int elements_per_row, long offset, double resize_fact
this(num_rows,elements_per_row, offset, resize_factor, DEFAULT_MAX_COMPACTION_TIME);
}
+ /**
+ * Creates a new table
+ * @param num_rows the number of rows in the matrix
+ * @param elements_per_row the number of messages per row.
+ * @param offset the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
+ * @param resize_factor teh factor with which to increase the number of rows
+ * @param max_compaction_time the max time in milliseconds after we attempt a compaction
+ */
@SuppressWarnings("unchecked")
public Table(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time) {
this.num_rows=num_rows;
this.elements_per_row=elements_per_row;
this.resize_factor=resize_factor;
- this.max_compaction_time=max_compaction_time;
+ this.max_compaction_time=TimeUnit.NANOSECONDS.convert(max_compaction_time, TimeUnit.MILLISECONDS);
this.offset=this.low=this.hr=this.hd=offset;
matrix=(T[][])new Object[num_rows][];
if(resize_factor <= 1)
@@ -132,7 +141,9 @@ public Table(int num_rows, int elements_per_row, long offset, double resize_fact
public long getHighestDelivered() {return hd;}
public long getHighestReceived() {return hr;}
public long getMaxCompactionTime() {return max_compaction_time;}
- public void setMaxCompactionTime(long max_compaction_time) {this.max_compaction_time=max_compaction_time;}
+ public void setMaxCompactionTime(long max_compaction_time) {
+ this.max_compaction_time=TimeUnit.NANOSECONDS.convert(max_compaction_time, TimeUnit.MILLISECONDS);
+ }
public int getNumRows() {return matrix.length;}
public void resetStats() {num_compactions=num_moves=num_resizes=num_purges=0;}
@@ -357,14 +368,14 @@ public void purge(long seqno, boolean force) {
if(max_compaction_time <= 0) // see if compaction should be triggered
return;
- long current_time=System.currentTimeMillis();
+ long current_time=System.nanoTime();
if(last_compaction_timestamp > 0) {
if(current_time - last_compaction_timestamp >= max_compaction_time) {
_compact();
last_compaction_timestamp=current_time;
}
}
- else
+ else // the first time we don't do a compaction
last_compaction_timestamp=current_time;
}
finally {
View
1  src/org/jgroups/util/Util.java
@@ -7,7 +7,6 @@
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
-import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.GMS;
View
2  tests/junit-functional/org/jgroups/tests/PromiseTest.java
@@ -81,7 +81,7 @@ public void run() {
};
t.start();
long start=System.currentTimeMillis(), stop;
- Object result=p.getResult(100000);
+ Object result=p.getResult(30000);
stop=System.currentTimeMillis();
System.out.println("-- waited for " + (stop-start) + "ms, result is " + result);
assert result != null;
View
3  tests/junit-functional/org/jgroups/tests/ResponseCollectorTest.java
@@ -16,7 +16,8 @@
*/
@Test(groups=Global.FUNCTIONAL,sequential=false)
public class ResponseCollectorTest {
- static final Address a=Util.createRandomAddress(), b=Util.createRandomAddress(), c=Util.createRandomAddress();
+ static final Address a=Util.createRandomAddress("A"),
+ b=Util.createRandomAddress("B"), c=Util.createRandomAddress("C");
public static void testAdd() {
View
4 tests/junit/org/jgroups/blocks/ExecutingServiceTest.java
@@ -68,7 +68,7 @@ protected void init() throws Exception {
// Add the exposed executing protocol
ProtocolStack stack=c1.getProtocolStack();
exposedProtocol = new ExposedExecutingProtocol();
- exposedProtocol.setLevel("trace");
+ exposedProtocol.setLevel("debug");
stack.insertProtocolAtTop(exposedProtocol);
er1=new ExecutionRunner(c1);
@@ -82,7 +82,7 @@ protected void init() throws Exception {
er3=new ExecutionRunner(c3);
c3.connect("ExecutionServiceTest");
- LogFactory.getLog(ExecutionRunner.class).setLevel("trace");
+ LogFactory.getLog(ExecutionRunner.class).setLevel("debug");
}
@AfterClass
View
96 tests/junit/org/jgroups/tests/TCPGOSSIP_Test.java
@@ -3,8 +3,11 @@
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.View;
+import org.jgroups.protocols.MERGE2;
+import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.TCPGOSSIP;
import org.jgroups.stack.GossipRouter;
+import org.jgroups.stack.Protocol;
import org.jgroups.util.StackType;
import org.jgroups.util.Util;
import org.testng.annotations.AfterClass;
@@ -12,14 +15,17 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.net.InetSocketAddress;
+import java.util.List;
+
/**
- * Tests TCPGOSSIP protocol
+ * Tests the TCPGOSSIP protocol
*
* @author Vladimir Blagojevic
*
**/
@Test(groups = { Global.STACK_INDEPENDENT, Global.GOSSIP_ROUTER }, sequential = true)
-public class TCPGOSSIP_Test extends ChannelTestBase {
+public class TCPGOSSIP_Test {
private JChannel channel, coordinator;
private final static String GROUP = "TCPGOSSIP_Test";
private GossipRouter gossipRouter;
@@ -28,20 +34,19 @@
@BeforeClass
void startRouter() throws Exception {
String bind_addr = getRouterBindAddress();
- gossipRouter = new GossipRouter(12001, null); // binds the GR to 0.0.0.0
+
+ System.setProperty("jgroups.bind_addr", bind_addr);
+
+ gossipRouter = new GossipRouter(12001, bind_addr); // binds the GR to 127.0.0.1:12001
gossipRouter.start();
}
private static String getRouterBindAddress() {
- String bind_addr = Util.getProperty(Global.BIND_ADDR);
- if (bind_addr == null) {
- StackType type = Util.getIpStackType();
- if (type == StackType.IPv6)
- bind_addr = "::1";
- else
- bind_addr = "127.0.0.1";
- }
- return bind_addr;
+ StackType type = Util.getIpStackType();
+ if (type == StackType.IPv6)
+ return "::1";
+ else
+ return "127.0.0.1";
}
@AfterClass(alwaysRun = true)
@@ -55,12 +60,11 @@ void tearDown() throws Exception {
}
/**
- * Tests connect-disconnect-connect sequence for a group with two members (using default
- * configuration).
+ * Tests connect-disconnect-connect sequence for a group with two members (using the default configuration).
**/
public void testDisconnectConnectTwo() throws Exception {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect("DisconnectTest.testgroup-1");
channel.disconnect();
@@ -72,15 +76,14 @@ public void testDisconnectConnectTwo() throws Exception {
}
public void testAddInitialHosts() throws Exception {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect(GROUP);
TCPGOSSIP p = (TCPGOSSIP) channel.getProtocolStack().findProtocol(TCPGOSSIP.class);
String bind_addr = getRouterBindAddress();
assert p.removeInitialHost(bind_addr, 12001);
p.addInitialHost(bind_addr, 12001);
-
View view = channel.getView();
assert view.size() == 2;
@@ -91,11 +94,11 @@ public void testAddInitialHosts() throws Exception {
public void testConnectThree() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
coordinator.connect(GROUP);
channel.connect(GROUP);
- third = new JChannel(props);
+ third=createChannel("C");
third.connect(GROUP);
View view = channel.getView();
assert channel.getView().size() == 3;
@@ -110,8 +113,9 @@ public void testConnectThree() throws Exception {
public void testConnectThreeChannelsWithGRDown() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
+ changeMergeInterval(coordinator, channel);
coordinator.connect("testConnectThreeChannelsWithGRDown");
channel.connect("testConnectThreeChannelsWithGRDown");
@@ -120,13 +124,13 @@ public void testConnectThreeChannelsWithGRDown() throws Exception {
// cannot discover others since GR is down
- third = new JChannel(props);
+ third=createChannel("C");
+ changeMergeInterval(third);
third.connect("testConnectThreeChannelsWithGRDown");
-
// restart and....
gossipRouter.start();
- Util.waitUntilAllChannelsHaveSameSize(60000, 500, coordinator, channel, third);
+ Util.waitUntilAllChannelsHaveSameSize(60000, 1000, coordinator, channel, third);
// confirm they found each other
View view = channel.getView();
@@ -142,8 +146,9 @@ public void testConnectThreeChannelsWithGRDown() throws Exception {
public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
JChannel third = null;
try {
- coordinator = new JChannel(props);
- channel = new JChannel(props);
+ coordinator=createChannel("A");
+ channel=createChannel("B");
+ changeMergeInterval(coordinator, channel);
// kill router
gossipRouter.stop();
@@ -152,12 +157,13 @@ public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
coordinator.connect("testConnectThreeChannelsWithGRAlreadyDown");
channel.connect("testConnectThreeChannelsWithGRAlreadyDown");
- third = new JChannel(props);
+ third=createChannel("C");
+ changeMergeInterval(third);
third.connect("testConnectThreeChannelsWithGRAlreadyDown");
// restart and....
gossipRouter.start();
- Util.waitUntilAllChannelsHaveSameSize(60000, 500, coordinator, channel, third);
+ Util.waitUntilAllChannelsHaveSameSize(60000, 1000, coordinator, channel, third);
// confirm they found each other
View view = channel.getView();
@@ -169,4 +175,32 @@ public void testConnectThreeChannelsWithGRAlreadyDown() throws Exception {
Util.close(third);
}
}
+
+ protected JChannel createChannel(String name) throws Exception {
+ JChannel retval=new JChannel(props);
+ retval.setName(name);
+ changeGossipRouter(retval,getRouterBindAddress(),12001);
+ return retval;
+ }
+
+ protected void changeGossipRouter(JChannel channel, String host, int port) {
+ TCPGOSSIP tcp_gossip_prot=(TCPGOSSIP)channel.getProtocolStack().findProtocol(TCPGOSSIP.class);
+ List<InetSocketAddress> initial_hosts=tcp_gossip_prot.getInitialHosts();
+ initial_hosts.clear();
+ initial_hosts.add(new InetSocketAddress(host, port));
+ }
+
+ protected void changeMergeInterval(JChannel ... channels) {
+ for(JChannel ch: channels) {
+ Protocol p=ch.getProtocolStack().findProtocol(MERGE2.class,MERGE3.class);
+ if(p instanceof MERGE2) {
+ ((MERGE2)p).setMinInterval(1000);
+ ((MERGE2)p).setMaxInterval(3000);
+ }
+ else if(p instanceof MERGE3) {
+ ((MERGE3)p).setMaxInterval(1000);
+ ((MERGE3)p).setMaxInterval(3000);
+ }
+ }
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.