From 489cec9b78b21a2a241eeab18ddfb968758b2e67 Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Mon, 13 Feb 2017 11:36:44 +0000 Subject: [PATCH 01/18] ZOOKEEPER-2691: recreateSocketAddresses may recreate the unreachable IP address --- .../zookeeper/server/quorum/QuorumPeer.java | 29 ++++++++++++++++++- .../server/quorum/CnxManagerTest.java | 5 ++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 889ee62a851..60860dd673f 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -159,7 +159,7 @@ public QuorumServer(long id, String hostname, public void recreateSocketAddresses() { InetAddress address = null; try { - address = InetAddress.getByName(this.hostname); + address = getReachableAddress(this.hostname, 2000); LOG.info("Resolved hostname: {} to address: {}", this.hostname, address); this.addr = new InetSocketAddress(address, this.port); if (this.electionPort > 0){ @@ -181,6 +181,33 @@ public void recreateSocketAddresses() { } } + /** + * Resolve the hostname to IP addresses, and find one reachable address. + * + * @param hostname the name of the host + * @param timeout the time, in millseconds, before {@link InetAddress#isReachable} + * aborts + * @return a reachable IP address. If no such IP address can be found, + * just return the first IP address of the hostname. + * + * @exception UnknownHostException + */ + public InetAddress getReachableAddress(String hostname, int timeout) + throws UnknownHostException { + InetAddress[] addresses = InetAddress.getAllByName(hostname); + for (InetAddress a : addresses) { + try { + if (a.isReachable(timeout)) { + return a; + } + } catch (IOException e) { + LOG.warn("IP address {} is unreachable", a); + } + } + // All the IP address is unreachable, just return the first one. + return addresses[0]; + } + public InetSocketAddress addr; public InetSocketAddress electionAddr; diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java index a82a728e26e..d4e968d1126 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -186,8 +186,9 @@ public void testCnxManagerTimeout() throws Exception { long begin = System.currentTimeMillis(); cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = System.currentTimeMillis(); - - if((end - begin) > 6000) Assert.fail("Waited more than necessary"); + // 5 seconds for socket timeout, 2 seconds for QuorumServer.recreateSocketAddresses + // delay, 2 seconds for some other delay. + if((end - begin) > 9000) Assert.fail("Waited more than necessary"); } From 31700c45030cca2d702fe0279443cd3f3b46a2b0 Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Tue, 14 Feb 2017 02:00:13 +0000 Subject: [PATCH 02/18] ZOOKEEPER-2691: recreateSocketAddresses may recreate the unreachable IP address --- .../zookeeper/server/quorum/QuorumPeer.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 60860dd673f..c4c17552b1c 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -113,6 +113,10 @@ private QuorumServer(long id, InetSocketAddress addr, this.id = id; this.addr = addr; this.electionAddr = electionAddr; + String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); + if(checkIPReachableValue != null){ + this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + } } // VisibleForTesting @@ -120,6 +124,10 @@ public QuorumServer(long id, InetSocketAddress addr) { this.id = id; this.addr = addr; this.electionAddr = null; + String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); + if(checkIPReachableValue != null){ + this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + } } private QuorumServer(long id, InetSocketAddress addr, @@ -128,24 +136,32 @@ private QuorumServer(long id, InetSocketAddress addr, this.addr = addr; this.electionAddr = electionAddr; this.type = type; + String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); + if(checkIPReachableValue != null){ + this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + } } public QuorumServer(long id, String hostname, Integer port, Integer electionPort, LearnerType type) { - this.id = id; - this.hostname=hostname; - if (port!=null){ + this.id = id; + this.hostname=hostname; + if (port!=null){ this.port=port; - } - if (electionPort!=null){ + } + if (electionPort!=null){ this.electionPort=electionPort; - } - if (type!=null){ + } + if (type!=null){ this.type = type; - } - this.recreateSocketAddresses(); } + String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); + if(checkIPReachableValue != null){ + this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + } + this.recreateSocketAddresses(); + } /** * Performs a DNS lookup of hostname and (re)creates the this.addr and @@ -159,7 +175,7 @@ public QuorumServer(long id, String hostname, public void recreateSocketAddresses() { InetAddress address = null; try { - address = getReachableAddress(this.hostname, 2000); + address = getReachableAddress(this.hostname, checkIPReachableTO); LOG.info("Resolved hostname: {} to address: {}", this.hostname, address); this.addr = new InetSocketAddress(address, this.port); if (this.electionPort > 0){ @@ -221,6 +237,12 @@ public InetAddress getReachableAddress(String hostname, int timeout) public long id; public LearnerType type = LearnerType.PARTICIPANT; + + /** + * the time, in millseconds, before {@link InetAddress#isReachable} aborts + * in {@link #getReachableAddress}. + */ + private int checkIPReachableTO = 2000; } public enum ServerState { From 5c1bf6bd452e8237cb0bb9d871f3d0b3d08e0de2 Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Sat, 18 Feb 2017 01:46:10 +0000 Subject: [PATCH 03/18] ZOOKEEPER-2691: recreateSocketAddresses may recreate the unreachable IP address --- .../zookeeper/server/quorum/QuorumPeer.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index c4c17552b1c..425086f80cf 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -113,9 +113,9 @@ private QuorumServer(long id, InetSocketAddress addr, this.id = id; this.addr = addr; this.electionAddr = electionAddr; - String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); - if(checkIPReachableValue != null){ - this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); + if (ipReachableValue != null) { + this.ipReachableTimeout = Integer.parseInt(ipReachableValue); } } @@ -124,9 +124,9 @@ public QuorumServer(long id, InetSocketAddress addr) { this.id = id; this.addr = addr; this.electionAddr = null; - String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); - if(checkIPReachableValue != null){ - this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); + if (ipReachableValue != null) { + this.ipReachableTimeout = Integer.parseInt(ipReachableValue); } } @@ -136,9 +136,9 @@ private QuorumServer(long id, InetSocketAddress addr, this.addr = addr; this.electionAddr = electionAddr; this.type = type; - String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); - if(checkIPReachableValue != null){ - this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); + if (ipReachableValue != null) { + this.ipReachableTimeout = Integer.parseInt(ipReachableValue); } } @@ -156,9 +156,9 @@ public QuorumServer(long id, String hostname, if (type!=null){ this.type = type; } - String checkIPReachableValue = System.getProperty("zookeeper.checkIPTimeout"); - if(checkIPReachableValue != null){ - this.checkIPReachableTO = Integer.parseInt(checkIPReachableValue); + String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); + if (ipReachableValue != null) { + this.ipReachableTimeout = Integer.parseInt(ipReachableValue); } this.recreateSocketAddresses(); } @@ -175,7 +175,7 @@ public QuorumServer(long id, String hostname, public void recreateSocketAddresses() { InetAddress address = null; try { - address = getReachableAddress(this.hostname, checkIPReachableTO); + address = getReachableAddress(this.hostname, ipReachableTimeout); LOG.info("Resolved hostname: {} to address: {}", this.hostname, address); this.addr = new InetSocketAddress(address, this.port); if (this.electionPort > 0){ @@ -242,7 +242,7 @@ public InetAddress getReachableAddress(String hostname, int timeout) * the time, in millseconds, before {@link InetAddress#isReachable} aborts * in {@link #getReachableAddress}. */ - private int checkIPReachableTO = 2000; + private int ipReachableTimeout = 2000; } public enum ServerState { From aa7b63d047450d6d1189860d08a3bc16d3ca4243 Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 12 May 2017 03:22:17 +0000 Subject: [PATCH 04/18] ZOOKEEPER-2691 --- .../zookeeper/server/quorum/QuorumPeer.java | 15 ++++++++++----- .../zookeeper/server/quorum/CnxManagerTest.java | 5 ++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 25aa03da9a4..004e0b3e3c3 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -179,7 +179,12 @@ public QuorumServer(long id, String hostname, public void recreateSocketAddresses() { InetAddress address = null; try { - address = getReachableAddress(this.hostname, ipReachableTimeout); + // zookeeper.ipReachableTimeout is not defined + if (ipReachableTimeout <= 0) { + address = InetAddress.getByName(this.hostname); + } else { + address = getReachableAddress(this.hostname, ipReachableTimeout); + } LOG.info("Resolved hostname: {} to address: {}", this.hostname, address); this.addr = new InetSocketAddress(address, this.port); if (this.electionPort > 0){ @@ -205,7 +210,7 @@ public void recreateSocketAddresses() { * Resolve the hostname to IP addresses, and find one reachable address. * * @param hostname the name of the host - * @param timeout the time, in millseconds, before {@link InetAddress#isReachable} + * @param timeout the time, in milliseconds, before {@link InetAddress#isReachable} * aborts * @return a reachable IP address. If no such IP address can be found, * just return the first IP address of the hostname. @@ -224,7 +229,7 @@ public InetAddress getReachableAddress(String hostname, int timeout) LOG.warn("IP address {} is unreachable", a); } } - // All the IP address is unreachable, just return the first one. + // All the IP addresses are unreachable, just return the first one. return addresses[0]; } @@ -243,10 +248,10 @@ public InetAddress getReachableAddress(String hostname, int timeout) public LearnerType type = LearnerType.PARTICIPANT; /** - * the time, in millseconds, before {@link InetAddress#isReachable} aborts + * the time, in milliseconds, before {@link InetAddress#isReachable} aborts * in {@link #getReachableAddress}. */ - private int ipReachableTimeout = 2000; + private int ipReachableTimeout = 0; } public enum ServerState { diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java index d4e968d1126..4bcce544256 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -186,9 +186,8 @@ public void testCnxManagerTimeout() throws Exception { long begin = System.currentTimeMillis(); cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = System.currentTimeMillis(); - // 5 seconds for socket timeout, 2 seconds for QuorumServer.recreateSocketAddresses - // delay, 2 seconds for some other delay. - if((end - begin) > 9000) Assert.fail("Waited more than necessary"); + + if((end - begin) > 6000) Assert.fail("Waited more than necessary"); } From e2589df9630fd0310c5a39275a25632b27c50a1a Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 12 May 2017 03:35:03 +0000 Subject: [PATCH 05/18] ZOOKEEPER-2691 --- .../test/org/apache/zookeeper/server/quorum/CnxManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java index 4bcce544256..a82a728e26e 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -186,7 +186,7 @@ public void testCnxManagerTimeout() throws Exception { long begin = System.currentTimeMillis(); cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = System.currentTimeMillis(); - + if((end - begin) > 6000) Assert.fail("Waited more than necessary"); } From eeb07f9b385d5e0161919f874466f837aaed3f99 Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 12 May 2017 06:02:51 +0000 Subject: [PATCH 06/18] ZOOKEEPER-2691 --- .../test/org/apache/zookeeper/server/quorum/CnxManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java index a82a728e26e..4bcce544256 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -186,7 +186,7 @@ public void testCnxManagerTimeout() throws Exception { long begin = System.currentTimeMillis(); cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = System.currentTimeMillis(); - + if((end - begin) > 6000) Assert.fail("Waited more than necessary"); } From c366949d6325cdc61aed59be30e37fe743186575 Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 12 May 2017 09:29:51 +0000 Subject: [PATCH 07/18] ZOOKEEPER-2691 --- .../content/xdocs/zookeeperAdmin.xml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml index 27060e09a57..1faa77365f9 100644 --- a/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml +++ b/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml @@ -1076,6 +1076,26 @@ server.3=zoo3:2888:3888 + + zookeeper.ipReachableTimeout + + + (Java system property: zookeeper.ipReachableTimeout) + + New in 3.4.11: + Set this timeout value for IP addresses reachable checking when hostname is resolved, as mesured in + milliseconds. + By default, ZooKeeper will use the first IP address of the hostname(without any reachable checking). + When zookeeper.ipReachableTimeout is set(larger than 0), ZooKeeper will will try to pick up the first + IP address which is reachable. This is done by calling Java API InetAddress.isReachable(long timeout) + function, in which this timeout value is used. If none of such reachable IP address can be found, the + first IP address of the hostname will be used anyway. + + + + + From 6139f533af4f3b513bd713746449f147503168e0 Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 12 May 2017 09:47:31 +0000 Subject: [PATCH 08/18] ZOOKEEPER-2691 --- .../test/org/apache/zookeeper/server/quorum/CnxManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java index 4bcce544256..a82a728e26e 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -186,7 +186,7 @@ public void testCnxManagerTimeout() throws Exception { long begin = System.currentTimeMillis(); cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1)); long end = System.currentTimeMillis(); - + if((end - begin) > 6000) Assert.fail("Waited more than necessary"); } From 3ac65ead39fad4f8d9f26365e1bc73f83889f11e Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Sat, 13 May 2017 03:41:52 +0000 Subject: [PATCH 09/18] ZOOKEEPER-2774 --- .../main/org/apache/zookeeper/ClientCnxn.java | 5 +- .../apache/zookeeper/ClientCnxnSocket.java | 3 +- src/java/main/org/apache/zookeeper/Login.java | 10 ++-- src/java/main/org/apache/zookeeper/Shell.java | 7 +-- .../main/org/apache/zookeeper/ZKUtil.java | 3 +- .../org/apache/zookeeper/common/Time.java | 52 +++++++++++++++++++ .../zookeeper/server/ConnectionBean.java | 4 +- .../server/FinalRequestProcessor.java | 7 +-- .../server/PrepRequestProcessor.java | 12 +++-- .../org/apache/zookeeper/server/Request.java | 3 +- .../apache/zookeeper/server/ServerStats.java | 5 +- .../zookeeper/server/SessionTrackerImpl.java | 9 ++-- .../zookeeper/server/ZooKeeperServer.java | 4 -- .../server/quorum/AuthFastLeaderElection.java | 3 +- .../server/quorum/FastLeaderElection.java | 3 +- .../zookeeper/server/quorum/Follower.java | 3 +- .../zookeeper/server/quorum/Leader.java | 15 +++--- .../zookeeper/test/system/GenerateLoad.java | 21 ++++---- .../zookeeper/test/ClientHammerTest.java | 7 +-- .../zookeeper/test/LoadFromLogTest.java | 19 +++---- .../zookeeper/test/ReadOnlyModeTest.java | 5 +- .../test/StaticHostProviderTest.java | 9 ++-- .../org/apache/zookeeper/test/TestHammer.java | 5 +- .../zookeeper/test/ZooKeeperTestClient.java | 3 +- 24 files changed, 145 insertions(+), 72 deletions(-) create mode 100644 src/java/main/org/apache/zookeeper/common/Time.java diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java index 08934b0f856..3cf2f755044 100644 --- a/src/java/main/org/apache/zookeeper/ClientCnxn.java +++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java @@ -60,6 +60,7 @@ import org.apache.zookeeper.ZooKeeper.WatchRegistration; import org.apache.zookeeper.client.HostProvider; import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.proto.AuthPacket; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.CreateResponse; @@ -1041,7 +1042,7 @@ public void run() { clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; - long lastPingRwServer = System.currentTimeMillis(); + long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds while (state.isAlive()) { try { @@ -1126,7 +1127,7 @@ public void run() { // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { - long now = System.currentTimeMillis(); + long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java index 5ca0ba77bcc..b67653141ba 100644 --- a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java +++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java @@ -27,6 +27,7 @@ import org.apache.jute.BinaryInputArchive; import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.server.ByteBufferInputStream; import org.slf4j.Logger; @@ -74,7 +75,7 @@ void introduce(ClientCnxn.SendThread sendThread, long sessionId) { } void updateNow() { - now = System.currentTimeMillis(); + now = Time.currentElapsedTime(); } int getIdleRecv() { diff --git a/src/java/main/org/apache/zookeeper/Login.java b/src/java/main/org/apache/zookeeper/Login.java index 3e21aae7c8c..c4975be1997 100644 --- a/src/java/main/org/apache/zookeeper/Login.java +++ b/src/java/main/org/apache/zookeeper/Login.java @@ -33,6 +33,7 @@ import javax.security.auth.callback.CallbackHandler; import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.kerberos.KerberosTicket; @@ -74,7 +75,8 @@ public class Login { private String keytabFile = null; private String principal = null; - private long lastLogin = 0; + // Initialize 'lastLogin' to do a login at first time + private long lastLogin = Time.currentElapsedTime() - MIN_TIME_BEFORE_RELOGIN; /** * LoginThread constructor. The constructor starts the thread used @@ -128,7 +130,7 @@ public void run() { LOG.info("TGT refresh thread started."); while (true) { // renewal thread's main loop. if it exits from here, thread will exit. KerberosTicket tgt = getTGT(); - long now = System.currentTimeMillis(); + long now = Time.currentWallTime(); long nextRefresh; Date nextRefreshDate; if (tgt == null) { @@ -306,7 +308,7 @@ private long getRefreshTime(KerberosTicket tgt) { (TICKET_RENEW_WINDOW + (TICKET_RENEW_JITTER * rng.nextDouble()))); if (proposedRefresh > expires) { // proposedRefresh is too far in the future: it's after ticket expires: simply return now. - return System.currentTimeMillis(); + return Time.currentWallTime(); } else { return proposedRefresh; @@ -327,7 +329,7 @@ private synchronized KerberosTicket getTGT() { } private boolean hasSufficientTimeElapsed() { - long now = System.currentTimeMillis(); + long now = Time.currentElapsedTime(); if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN ) { LOG.warn("Not attempting to re-login since the last re-login was " + "attempted less than " + (MIN_TIME_BEFORE_RELOGIN/1000) + " seconds"+ diff --git a/src/java/main/org/apache/zookeeper/Shell.java b/src/java/main/org/apache/zookeeper/Shell.java index 789c481a634..97efad34d4e 100644 --- a/src/java/main/org/apache/zookeeper/Shell.java +++ b/src/java/main/org/apache/zookeeper/Shell.java @@ -39,8 +39,9 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; +import org.apache.zookeeper.common.Time; -/** +/** * A base class for running a Unix command. * * Shell can be used to run unix commands like du or @@ -146,7 +147,7 @@ protected void setWorkingDirectory(File dir) { /** check to see if a command needs to be executed and execute if needed */ protected void run() throws IOException { - if (lastTime + interval > System.currentTimeMillis()) + if (lastTime + interval > Time.currentElapsedTime()) return; exitCode = 0; // reset for next run runCommand(); @@ -245,7 +246,7 @@ public void run() { LOG.warn("Error while closing the error stream", ioe); } process.destroy(); - lastTime = System.currentTimeMillis(); + lastTime = Time.currentElapsedTime(); } } diff --git a/src/java/main/org/apache/zookeeper/ZKUtil.java b/src/java/main/org/apache/zookeeper/ZKUtil.java index 4713a08a934..f0f1a0b8677 100644 --- a/src/java/main/org/apache/zookeeper/ZKUtil.java +++ b/src/java/main/org/apache/zookeeper/ZKUtil.java @@ -120,5 +120,4 @@ public static List listSubTreeBFS(ZooKeeper zk, final String pathRoot) t } return tree; } - -} \ No newline at end of file +} diff --git a/src/java/main/org/apache/zookeeper/common/Time.java b/src/java/main/org/apache/zookeeper/common/Time.java new file mode 100644 index 00000000000..83e53f056b9 --- /dev/null +++ b/src/java/main/org/apache/zookeeper/common/Time.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import java.util.Date; + +public class Time { + /** + * Returns time in milliseconds as does System.currentTimeMillis(), + * but uses elapsed time from an arbitrary epoch more like System.nanoTime(). + * The difference is that if somebody changes the system clock, + * Time.currentElapsedTime will change but nanoTime won't. On the other hand, + * all of ZK assumes that time is measured in milliseconds. + * @return The time in milliseconds from some arbitrary point in time. + */ + public static long currentElapsedTime() { + return System.nanoTime() / 1000000; + } + + /** + * Explicitly returns system dependent current wall time. + * @return Current time in msec. + */ + public static long currentWallTime() { + return System.currentTimeMillis(); + } + + /** + * This is to convert the elapsedTime to a Date. + * @return A date object indicated by the elapsedTime. + */ + public static Date elapsedTimeToDate(long elapsedTime) { + long wallTime = currentWallTime() + elapsedTime - currentElapsedTime(); + return new Date(wallTime); + } +} \ No newline at end of file diff --git a/src/java/main/org/apache/zookeeper/server/ConnectionBean.java b/src/java/main/org/apache/zookeeper/server/ConnectionBean.java index 917aacfdcdc..58917e05f2b 100644 --- a/src/java/main/org/apache/zookeeper/server/ConnectionBean.java +++ b/src/java/main/org/apache/zookeeper/server/ConnectionBean.java @@ -22,10 +22,10 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; -import java.util.Date; import javax.management.ObjectName; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.jmx.MBeanRegistry; @@ -164,7 +164,7 @@ public String getLastZxid() { } public String getLastResponseTime() { - return new Date(stats.getLastResponseTime()).toString(); + return Time.elapsedTimeToDate(stats.getLastResponseTime()).toString(); } public long getLastLatency() { diff --git a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java index 7278064382a..65f7ac03472 100644 --- a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.jute.Record; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; @@ -165,7 +166,7 @@ public void processRequest(Request request) { lastOp = "PING"; cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, - request.createTime, System.currentTimeMillis()); + request.createTime, Time.currentElapsedTime()); cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); @@ -176,7 +177,7 @@ public void processRequest(Request request) { lastOp = "SESS"; cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, - request.createTime, System.currentTimeMillis()); + request.createTime, Time.currentElapsedTime()); zks.finishSessionInit(request.cnxn, true); return; @@ -385,7 +386,7 @@ public void processRequest(Request request) { zks.serverStats().updateLatency(request.createTime); cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, - request.createTime, System.currentTimeMillis()); + request.createTime, Time.currentElapsedTime()); try { cnxn.sendResponse(hdr, rsp, "response"); diff --git a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java index fe02b8f32b2..a4fe9a9f24f 100644 --- a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -35,6 +35,7 @@ import org.apache.jute.Record; import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; @@ -319,7 +320,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, - zks.getTime(), type); + Time.currentWallTime(), type); switch (type) { case OpCode.create: @@ -558,9 +559,9 @@ protected void pRequest(Request request) throws RequestProcessorException { try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { - request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), - zks.getTime(), OpCode.multi); - throw e; + request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), + Time.currentWallTime(), OpCode.multi); + throw e; } List txns = new ArrayList(); //Each op in a multi-op must have the same zxid! @@ -617,7 +618,8 @@ protected void pRequest(Request request) throws RequestProcessorException { index++; } - request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type); + request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, + Time.currentWallTime(), request.type); request.txn = new MultiTxn(txns); break; diff --git a/src/java/main/org/apache/zookeeper/server/Request.java b/src/java/main/org/apache/zookeeper/server/Request.java index 80d2b99d569..bb8b1ca3fbc 100644 --- a/src/java/main/org/apache/zookeeper/server/Request.java +++ b/src/java/main/org/apache/zookeeper/server/Request.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.txn.TxnHeader; @@ -75,7 +76,7 @@ public Request(ServerCnxn cnxn, long sessionId, int xid, int type, public final List authInfo; - public final long createTime = System.currentTimeMillis(); + public final long createTime = Time.currentElapsedTime(); private Object owner; diff --git a/src/java/main/org/apache/zookeeper/server/ServerStats.java b/src/java/main/org/apache/zookeeper/server/ServerStats.java index dbee6d54094..788599522f2 100644 --- a/src/java/main/org/apache/zookeeper/server/ServerStats.java +++ b/src/java/main/org/apache/zookeeper/server/ServerStats.java @@ -19,6 +19,9 @@ package org.apache.zookeeper.server; + +import org.apache.zookeeper.common.Time; + /** * Basic Server Statistics */ @@ -102,7 +105,7 @@ public String toString(){ } // mutators synchronized void updateLatency(long requestCreateTime) { - long latency = System.currentTimeMillis() - requestCreateTime; + long latency = Time.currentElapsedTime() - requestCreateTime; totalLatency += latency; count++; if (latency < minLatency) { diff --git a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java index 938e9dac289..cdaaf2beb03 100644 --- a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java +++ b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.common.Time; /** * This is a full featured SessionTracker. It tracks session in grouped by tick @@ -74,7 +75,7 @@ public static class SessionImpl implements Session { public static long initializeNextSession(long id) { long nextSid = 0; - nextSid = (System.currentTimeMillis() << 24) >>> 8; + nextSid = (Time.currentElapsedTime() << 24) >>> 8; nextSid = nextSid | (id <<56); return nextSid; } @@ -98,7 +99,7 @@ public SessionTrackerImpl(SessionExpirer expirer, this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; - nextExpirationTime = roundToInterval(System.currentTimeMillis()); + nextExpirationTime = roundToInterval(Time.currentElapsedTime()); this.nextSessionId = initializeNextSession(sid); for (Entry e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); @@ -141,7 +142,7 @@ synchronized public String toString() { synchronized public void run() { try { while (running) { - currentTime = System.currentTimeMillis(); + currentTime = Time.currentElapsedTime(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; @@ -174,7 +175,7 @@ synchronized public boolean touchSession(long sessionId, int timeout) { if (s == null || s.isClosing()) { return false; } - long expireTime = roundToInterval(System.currentTimeMillis() + timeout); + long expireTime = roundToInterval(Time.currentElapsedTime() + timeout); if (s.tickTime >= expireTime) { // Nothing needs to be done return true; diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index 62ac466bb1a..c1d3a90c40e 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -325,10 +325,6 @@ public void setZxid(long zxid) { hzxid.set(zxid); } - long getTime() { - return System.currentTimeMillis(); - } - private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java index 5bf54f81498..d32a725abf4 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.Random; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -412,7 +413,7 @@ class WorkerSender implements Runnable { WorkerSender(int attempts) { maxAttempts = attempts; rand = new Random(java.lang.Thread.currentThread().getId() - + System.currentTimeMillis()); + + Time.currentElapsedTime()); } long genChallenge() { diff --git a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java index 2a3d4fd4da1..dc5f099bbe7 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.ZooKeeperThread; import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message; @@ -801,7 +802,7 @@ public Vote lookForLeader() throws InterruptedException { self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { - self.start_fle = System.currentTimeMillis(); + self.start_fle = Time.currentElapsedTime(); } try { HashMap recvset = new HashMap(); diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java index a17af496e94..e439aaa60cd 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Follower.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Follower.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import org.apache.jute.Record; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; @@ -58,7 +59,7 @@ public String toString() { * @throws InterruptedException */ void followLeader() throws InterruptedException { - self.end_fle = System.currentTimeMillis(); + self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("FOLLOWING - LEADER ELECTION TOOK - {}", electionTimeTaken); diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java index 44e6b4f4e56..bd7bf35bf00 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Leader.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Leader.java @@ -41,6 +41,7 @@ import javax.security.sasl.SaslException; import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -365,7 +366,7 @@ public void halt() { * @throws InterruptedException */ void lead() throws IOException, InterruptedException { - self.end_fle = System.currentTimeMillis(); + self.end_fle = Time.currentElapsedTime(); long electionTimeTaken = self.end_fle - self.start_fle; self.setElectionTimeTaken(electionTimeTaken); LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken); @@ -885,12 +886,12 @@ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws Interrupt self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll(); } else { - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { connectingFollowers.wait(end - cur); - cur = System.currentTimeMillis(); + cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); @@ -922,12 +923,12 @@ public void waitForEpochAck(long id, StateSummary ss) throws IOException, Interr electionFinished = true; electingFollowers.notifyAll(); } else { - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(!electionFinished && cur < end) { electingFollowers.wait(end - cur); - cur = System.currentTimeMillis(); + cur = Time.currentElapsedTime(); } if (!electionFinished) { throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum"); @@ -1010,12 +1011,12 @@ public void waitForNewLeaderAck(long sid, long zxid, LearnerType learnerType) quorumFormed = true; newLeaderProposal.ackSet.notifyAll(); } else { - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit() * self.getTickTime(); while (!quorumFormed && cur < end) { newLeaderProposal.ackSet.wait(end - cur); - cur = System.currentTimeMillis(); + cur = Time.currentElapsedTime(); } if (!quorumFormed) { throw new InterruptedException( diff --git a/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java b/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java index cfd4e7b5245..b6ac04aa323 100644 --- a/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java +++ b/src/java/systest/org/apache/zookeeper/test/system/GenerateLoad.java @@ -53,6 +53,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.common.Time; public class GenerateLoad { protected static final Logger LOG = LoggerFactory.getLogger(GenerateLoad.class); @@ -194,7 +195,7 @@ static class ReporterThread extends Thread { public void run() { try { - currentInterval = System.currentTimeMillis() / INTERVAL; + currentInterval = Time.currentElapsedTime() / INTERVAL; // Give things time to report; Thread.sleep(INTERVAL * 2); long min = 99999; @@ -202,7 +203,7 @@ public void run() { long total = 0; int number = 0; while (true) { - long now = System.currentTimeMillis(); + long now = Time.currentElapsedTime(); long lastInterval = currentInterval; currentInterval += 1; long count = remove(lastInterval); @@ -249,13 +250,13 @@ public void run() { } synchronized static void sendChange(int percentage) { - long now = System.currentTimeMillis(); + long now = Time.currentElapsedTime(); long start = now; ReporterThread.percentage = percentage; for (SlaveThread st : slaves.toArray(new SlaveThread[0])) { st.send(percentage); } - now = System.currentTimeMillis(); + now = Time.currentElapsedTime(); long delay = now - start; if (delay > 1000) { System.out.println("Delay of " + delay + " to send new percentage"); @@ -387,7 +388,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, errors++; } else { finished++; - rlatency += System.currentTimeMillis() - (Long) ctx; + rlatency += Time.currentElapsedTime() - (Long) ctx; reads++; } } @@ -401,7 +402,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { errors++; } else { finished++; - wlatency += System.currentTimeMillis() - (Long) ctx; + wlatency += Time.currentElapsedTime() - (Long) ctx; writes++; } } @@ -427,7 +428,7 @@ public void run() { if (percentage == -1 || (finished == 0 && errors == 0)) { continue; } - String report = System.currentTimeMillis() + " " + String report = Time.currentElapsedTime() + " " + percentage + " " + finished + " " + errors + " " + outstanding + "\n"; /* String subreport = reads + " " @@ -547,9 +548,9 @@ public boolean isConnected() { synchronized public boolean waitConnected(long timeout) throws InterruptedException { - long endTime = System.currentTimeMillis() + timeout; - while (!connected && System.currentTimeMillis() < endTime) { - wait(endTime - System.currentTimeMillis()); + long endTime = Time.currentElapsedTime() + timeout; + while (!connected && Time.currentElapsedTime() < endTime) { + wait(endTime - Time.currentElapsedTime()); } return connected; } diff --git a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java index 581402cfa89..025ccbe4cd7 100644 --- a/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java +++ b/src/java/test/org/apache/zookeeper/test/ClientHammerTest.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; +import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; @@ -124,7 +125,7 @@ public void runHammer(final int threadCount, final int childCount) { try { HammerThread[] threads = new HammerThread[threadCount]; - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); for (int i = 0; i < threads.length; i++) { ZooKeeper zk = createClient(); String prefix = "/test-" + i; @@ -157,7 +158,7 @@ public void testHammerSuper() throws Throwable { final int childCount = 10; HammerThread[] threads = new HammerThread[threadCount]; - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); for (int i = 0; i < threads.length; i++) { String prefix = "/test-" + i; { @@ -218,7 +219,7 @@ public void verifyHammer(long start, HammerThread[] threads, int childCount) * HAMMERTHREAD_LATENCY * safetyFactor); } LOG.info(new Date() + " Total time " - + (System.currentTimeMillis() - start)); + + (Time.currentElapsedTime() - start)); ZooKeeper zk = createClient(); try { diff --git a/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java b/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java index 419683f0faf..94b0f97ec02 100644 --- a/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java +++ b/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.zookeeper.common.Time; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; @@ -155,7 +156,7 @@ public void testTxnFailure() throws Exception { dt.createNode("/test", new byte[0], null, 0, -1, 1, 1); for (count = 1; count <= 3; count++) { dt.createNode("/test/" + count, new byte[0], null, 0, -1, count, - System.currentTimeMillis()); + Time.currentElapsedTime()); } DataNode zk = dt.getNode("/test"); @@ -204,15 +205,15 @@ private void doOp(FileTxnSnapLog logFile, int type, String path, if (type == OpCode.delete) { txn = new DeleteTxn(path); txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - System.currentTimeMillis(), OpCode.delete); + Time.currentElapsedTime(), OpCode.delete); } else if (type == OpCode.create) { txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - System.currentTimeMillis(), OpCode.create); + Time.currentElapsedTime(), OpCode.create); txn = new CreateTxn(path, new byte[0], null, false, cversion); } else if (type == OpCode.multi) { txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - System.currentTimeMillis(), OpCode.create); + Time.currentElapsedTime(), OpCode.create); txn = new CreateTxn(path, new byte[0], null, false, cversion); ArrayList txnList = new ArrayList(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -223,7 +224,7 @@ else if (type == OpCode.multi) { txnList.add(txact); txn = new MultiTxn(txnList); txnHeader = new TxnHeader(0xabcd, 0x123, prevPzxid + 1, - System.currentTimeMillis(), OpCode.multi); + Time.currentElapsedTime(), OpCode.multi); } logFile.processTransaction(txnHeader, dt, null, txn); @@ -250,7 +251,7 @@ public void testPad() throws Exception { File tmpDir = ClientBase.createTmpDir(); FileTxnLog txnLog = new FileTxnLog(tmpDir); TxnHeader txnHeader = new TxnHeader(0xabcd, 0x123, 0x123, - System.currentTimeMillis(), OpCode.create); + Time.currentElapsedTime(), OpCode.create); Record txn = new CreateTxn("/Test", new byte[0], null, false, 1); txnLog.append(txnHeader, txn); FileInputStream in = new FileInputStream(tmpDir.getPath() + "/log." + @@ -444,9 +445,9 @@ public void testReloadSnapshotWithMissingParent() throws Exception { private ZooKeeper getConnectedZkClient() throws IOException { ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); while (!connected) { - long end = System.currentTimeMillis(); + long end = Time.currentElapsedTime(); if (end - start > 5000) { Assert.assertTrue("Could not connect with server in 5 seconds", false); @@ -459,4 +460,4 @@ private ZooKeeper getConnectedZkClient() throws IOException { } return zk; } -} \ No newline at end of file +} diff --git a/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java b/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java index 3b7a149ab53..b04e5bc492b 100644 --- a/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java +++ b/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.After; import org.junit.Before; @@ -175,13 +176,15 @@ public void process(WatchedEvent event) { // kill peer and wait no more than 5 seconds for read-only server // to be started (which should take one tickTime (2 seconds)) qu.shutdown(2); - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); while (!(zk.getState() == States.CONNECTEDREADONLY)) { Thread.sleep(200); // FIXME this was originally 5 seconds, but realistically, on random/slow/virt hosts, there is no way to guarantee this Assert.assertTrue("Can't connect to the server", System .currentTimeMillis() - start < 30000); + Assert.assertTrue("Can't connect to the server", + Time.currentElapsedTime() - start < 30000); } // At this point states list should contain, in the given order, diff --git a/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java b/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java index 75d3c5901d2..aa78a4b20ed 100644 --- a/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java +++ b/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java @@ -26,6 +26,7 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.client.HostProvider; import org.apache.zookeeper.client.StaticHostProvider; +import org.apache.zookeeper.common.Time; import org.junit.Test; import org.slf4j.Logger; @@ -58,9 +59,9 @@ public void testNextGoesRoundAndSleeps() throws UnknownHostException { hostProvider.next(0); --size; } - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); hostProvider.next(1000); - long stop = System.currentTimeMillis(); + long stop = Time.currentElapsedTime(); assertTrue(900 <= stop - start); } @@ -72,9 +73,9 @@ public void testNextDoesNotSleepForZero() throws UnknownHostException { hostProvider.next(0); --size; } - long start = System.currentTimeMillis(); + long start = Time.currentElapsedTime(); hostProvider.next(0); - long stop = System.currentTimeMillis(); + long stop = Time.currentElapsedTime(); assertTrue(5 > stop - start); } diff --git a/src/java/test/org/apache/zookeeper/test/TestHammer.java b/src/java/test/org/apache/zookeeper/test/TestHammer.java index 09a678b28c1..a73d6df35df 100644 --- a/src/java/test/org/apache/zookeeper/test/TestHammer.java +++ b/src/java/test/org/apache/zookeeper/test/TestHammer.java @@ -24,6 +24,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.common.Time; public class TestHammer implements VoidCallback { @@ -32,7 +33,7 @@ public class TestHammer implements VoidCallback { */ static int REPS = 50000; public static void main(String[] args) { - long startTime = System.currentTimeMillis(); + long startTime = Time.currentElapsedTime(); ZooKeeper zk = null; try { zk = new ZooKeeper(args[0], 10000, null); @@ -51,7 +52,7 @@ public static void main(String[] args) { e.printStackTrace(); } } - System.out.println("creates/sec=" + (REPS*1000/(System.currentTimeMillis()-startTime))); + System.out.println("creates/sec=" + (REPS*1000/(Time.currentElapsedTime()-startTime))); } public void processResult(int rc, String path, Object ctx) { diff --git a/src/java/test/org/apache/zookeeper/test/ZooKeeperTestClient.java b/src/java/test/org/apache/zookeeper/test/ZooKeeperTestClient.java index 67ca52faed2..0bbba61d5d9 100644 --- a/src/java/test/org/apache/zookeeper/test/ZooKeeperTestClient.java +++ b/src/java/test/org/apache/zookeeper/test/ZooKeeperTestClient.java @@ -32,6 +32,7 @@ import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.Stat; import org.junit.Assert; @@ -40,7 +41,7 @@ public class ZooKeeperTestClient extends ZKTestCase implements Watcher { protected static final String dirOnZK = "/test_dir"; - protected String testDirOnZK = dirOnZK + "/" + System.currentTimeMillis(); + protected String testDirOnZK = dirOnZK + "/" + Time.currentElapsedTime(); LinkedBlockingQueue events = new LinkedBlockingQueue(); From dc2924977fb2d5527810581012c3144b4dc11632 Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Tue, 16 May 2017 09:14:11 +0800 Subject: [PATCH 10/18] Update ZKUtil.java --- src/java/main/org/apache/zookeeper/ZKUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/java/main/org/apache/zookeeper/ZKUtil.java b/src/java/main/org/apache/zookeeper/ZKUtil.java index f0f1a0b8677..d745dc4b69c 100644 --- a/src/java/main/org/apache/zookeeper/ZKUtil.java +++ b/src/java/main/org/apache/zookeeper/ZKUtil.java @@ -120,4 +120,5 @@ public static List listSubTreeBFS(ZooKeeper zk, final String pathRoot) t } return tree; } + } From 172e35153e7fa226fdffa41bd0f353ee6377098d Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Tue, 16 May 2017 09:15:21 +0800 Subject: [PATCH 11/18] Update ZKUtil.java --- src/java/main/org/apache/zookeeper/ZKUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/main/org/apache/zookeeper/ZKUtil.java b/src/java/main/org/apache/zookeeper/ZKUtil.java index d745dc4b69c..0571f1cc63f 100644 --- a/src/java/main/org/apache/zookeeper/ZKUtil.java +++ b/src/java/main/org/apache/zookeeper/ZKUtil.java @@ -120,5 +120,5 @@ public static List listSubTreeBFS(ZooKeeper zk, final String pathRoot) t } return tree; } - + } From 5afbd4eb6a0d97c20df89d3c307f5092d19db5e1 Mon Sep 17 00:00:00 2001 From: JiangJiafu Date: Tue, 16 May 2017 09:16:34 +0800 Subject: [PATCH 12/18] Update ZKUtil.java --- src/java/main/org/apache/zookeeper/ZKUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/main/org/apache/zookeeper/ZKUtil.java b/src/java/main/org/apache/zookeeper/ZKUtil.java index 0571f1cc63f..e901832cfb2 100644 --- a/src/java/main/org/apache/zookeeper/ZKUtil.java +++ b/src/java/main/org/apache/zookeeper/ZKUtil.java @@ -121,4 +121,4 @@ public static List listSubTreeBFS(ZooKeeper zk, final String pathRoot) t return tree; } -} +} From 61b5644f680ebc8ddcc5af1c9aee932a038a390b Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Wed, 17 May 2017 07:51:18 +0000 Subject: [PATCH 13/18] ZOOKEEPER-2691 --- .../zookeeper/server/quorum/QuorumPeer.java | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 004e0b3e3c3..573bab74e35 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -117,10 +117,6 @@ private QuorumServer(long id, InetSocketAddress addr, this.id = id; this.addr = addr; this.electionAddr = electionAddr; - String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); - if (ipReachableValue != null) { - this.ipReachableTimeout = Integer.parseInt(ipReachableValue); - } } // VisibleForTesting @@ -128,10 +124,6 @@ public QuorumServer(long id, InetSocketAddress addr) { this.id = id; this.addr = addr; this.electionAddr = null; - String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); - if (ipReachableValue != null) { - this.ipReachableTimeout = Integer.parseInt(ipReachableValue); - } } private QuorumServer(long id, InetSocketAddress addr, @@ -140,10 +132,6 @@ private QuorumServer(long id, InetSocketAddress addr, this.addr = addr; this.electionAddr = electionAddr; this.type = type; - String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); - if (ipReachableValue != null) { - this.ipReachableTimeout = Integer.parseInt(ipReachableValue); - } } public QuorumServer(long id, String hostname, @@ -160,10 +148,6 @@ public QuorumServer(long id, String hostname, if (type!=null){ this.type = type; } - String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); - if (ipReachableValue != null) { - this.ipReachableTimeout = Integer.parseInt(ipReachableValue); - } this.recreateSocketAddresses(); } @@ -179,6 +163,17 @@ public QuorumServer(long id, String hostname, public void recreateSocketAddresses() { InetAddress address = null; try { + // the time, in milliseconds, before {@link InetAddress#isReachable} aborts + // in {@link #getReachableAddress}. + int ipReachableTimeout = 0; + String ipReachableValue = System.getProperty("zookeeper.ipReachableTimeout"); + if (ipReachableValue != null) { + try { + ipReachableTimeout = Integer.parseInt(ipReachableValue); + } catch (NumberFormatException e) { + LOG.error("{} is not a valid number", ipReachableValue); + } + } // zookeeper.ipReachableTimeout is not defined if (ipReachableTimeout <= 0) { address = InetAddress.getByName(this.hostname); @@ -247,11 +242,6 @@ public InetAddress getReachableAddress(String hostname, int timeout) public LearnerType type = LearnerType.PARTICIPANT; - /** - * the time, in milliseconds, before {@link InetAddress#isReachable} aborts - * in {@link #getReachableAddress}. - */ - private int ipReachableTimeout = 0; } public enum ServerState { From 61f5b20b5b86fd68901016b7c9715033fc88ef5c Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Wed, 17 May 2017 07:59:26 +0000 Subject: [PATCH 14/18] Remove unnecessary change. --- src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 573bab74e35..b54b827f256 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -241,7 +241,6 @@ public InetAddress getReachableAddress(String hostname, int timeout) public long id; public LearnerType type = LearnerType.PARTICIPANT; - } public enum ServerState { From 6f8e4120f72f9999b04d6148a9501d55c0db788a Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 9 Jun 2017 10:45:33 +0000 Subject: [PATCH 15/18] Fix the bug ZOOKEEPER-2355. --- .../main/org/apache/zookeeper/server/quorum/Learner.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index c54f6e6b797..36c7e19cb50 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -333,7 +333,7 @@ protected void syncWithLeader(long newLeaderZxid) throws IOException, Interrupte snapshotNeeded = false; } else if (qp.getType() == Leader.SNAP) { - LOG.info("Getting a snapshot from leader"); + LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid())); // The leader is going to dump the database // clear our own database and read zk.getZKDatabase().clear(); @@ -343,6 +343,7 @@ else if (qp.getType() == Leader.SNAP) { LOG.error("Missing signature. Got " + signature); throw new IOException("Missing signature"); } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" @@ -354,6 +355,7 @@ else if (qp.getType() == Leader.SNAP) { + Long.toHexString(qp.getZxid())); System.exit(13); } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); } else { @@ -362,7 +364,6 @@ else if (qp.getType() == Leader.SNAP) { System.exit(13); } - zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); zk.createSessionTracker(); long lastQueued = 0; From 6ba291663bea666422292f50fb201d61a6280ebf Mon Sep 17 00:00:00 2001 From: Jiang Jiafu Date: Fri, 7 Jul 2017 09:09:17 +0000 Subject: [PATCH 16/18] Fix the PR that sock.setSoTimeout(0) is not reasonable. --- .../apache/zookeeper/server/quorum/QuorumCnxManager.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 878008b64cc..559dc0b2ddf 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -967,14 +967,6 @@ class RecvWorker extends ZooKeeperThread { this.sock = sock; this.sw = sw; this.din = din; - try { - // OK to wait until socket disconnects while reading. - sock.setSoTimeout(0); - } catch (IOException e) { - LOG.error("Error while accessing socket for " + sid, e); - closeSocket(sock); - running = false; - } } /** From 7e56762b48b0e075f02729e4a4f5a6f1ac7d2e2b Mon Sep 17 00:00:00 2001 From: Jiafu Jiang Date: Mon, 23 Oct 2017 02:50:27 +0000 Subject: [PATCH 17/18] ZOOKEEPER-2923 --- .../org/apache/zookeeper/server/quorum/CommitProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java index 220d6588cd1..3419f08836e 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java @@ -54,7 +54,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP /** * This flag indicates whether we need to wait for a response to come back from the * leader or we just let the sync operation flow through like a read. The flag will - * be true if the CommitProcessor is in a Leader pipeline. + * be false if the CommitProcessor is in a Leader pipeline. */ boolean matchSyncs; From 17f038ea2661d4fb92e926aca960998ef1aa03ee Mon Sep 17 00:00:00 2001 From: Jiafu Jiang Date: Mon, 23 Oct 2017 02:59:32 +0000 Subject: [PATCH 18/18] QuorumCnxManager.java --- .../apache/zookeeper/server/quorum/QuorumCnxManager.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index d002de4059a..ec6be4a2a88 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -969,6 +969,14 @@ class RecvWorker extends ZooKeeperThread { this.sock = sock; this.sw = sw; this.din = din; + try { + // OK to wait until socket disconnects while reading. + sock.setSoTimeout(0); + } catch (IOException e) { + LOG.error("Error while accessing socket for " + sid, e); + closeSocket(sock); + running = false; + } } /**