diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 3ab81e71ca9..b417a8f095d 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -87,15 +87,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected static final Logger LOG; public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; - protected static int globalOutstandingLimit = 1000; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); Environment.logEnv("Server environment:", LOG); - - globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000); - LOG.info("{} = {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } protected ZooKeeperServerBean jmxServerBean; @@ -864,6 +860,17 @@ public static int getSnapCount() { } } + public int getGlobalOutstandingLimit() { + String sc = System.getProperty(GLOBAL_OUTSTANDING_LIMIT); + int limit; + try { + limit = Integer.parseInt(sc); + } catch (Exception e) { + limit = 1000; + } + return limit; + } + public void setServerCnxnFactory(ServerCnxnFactory factory) { serverCnxnFactory = factory; } @@ -1090,7 +1097,7 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) th } public boolean shouldThrottle(long outStandingCount) { - if (globalOutstandingLimit < getInProcess()) { + if (getGlobalOutstandingLimit() < getInProcess()) { return outStandingCount > 0; } return false; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index ec529de1843..78c12db83d8 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -60,10 +60,6 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); this.pendingSyncs = new ConcurrentLinkedQueue(); - - int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; - globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor; - LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } public Follower getFollower(){ @@ -126,6 +122,14 @@ synchronized public void sync(){ commitProcessor.commit(r); } + @Override + public int getGlobalOutstandingLimit() { + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor; + LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); + return globalOutstandingLimit; + } + @Override public String getState() { return "follower"; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index c6f60e115e1..e1d1bb6fead 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -55,10 +55,6 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { */ LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self); - - int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; - globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor; - LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); } public Leader getLeader(){ @@ -106,6 +102,14 @@ public synchronized void shutdown() { super.shutdown(); } + @Override + public int getGlobalOutstandingLimit() { + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor; + LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit); + return globalOutstandingLimit; + } + @Override public void createSessionTracker() { sessionTracker = new LeaderSessionTracker(