From 77c4ea4e53cc76ed77a3ef8feeff0650f32fbeb9 Mon Sep 17 00:00:00 2001 From: Fangmin Lyu Date: Thu, 19 Dec 2019 16:18:06 -0800 Subject: [PATCH] ZOOKEEPER-3657: Implementing snapshot schedule to avoid high latency issue due to disk contention --- .../main/resources/markdown/zookeeperAdmin.md | 74 ++- .../java/org/apache/zookeeper/ZooDefs.java | 8 + .../apache/zookeeper/server/PurgeTxnLog.java | 6 +- .../zookeeper/server/ServerMetrics.java | 5 + .../zookeeper/server/SnapshotGenerator.java | 105 +++++ .../server/SyncRequestProcessor.java | 117 +++-- .../apache/zookeeper/server/ZKDatabase.java | 24 + .../zookeeper/server/ZooKeeperServer.java | 20 + .../server/persistence/FileTxnLog.java | 14 +- .../server/persistence/FileTxnSnapLog.java | 15 + .../zookeeper/server/persistence/TxnLog.java | 4 + .../zookeeper/server/quorum/Follower.java | 89 ++++ .../zookeeper/server/quorum/Leader.java | 113 ++++- .../zookeeper/server/quorum/LeaderBean.java | 9 + .../zookeeper/server/quorum/LeaderMXBean.java | 9 + .../server/quorum/LeaderZooKeeperServer.java | 4 +- .../server/quorum/LearnerHandler.java | 58 ++- .../server/quorum/QuorumPeerMain.java | 6 +- .../server/quorum/SnapPingListener.java | 35 ++ .../server/quorum/SnapPingManager.java | 399 ++++++++++++++++ .../zookeeper/server/quorum/SnapPingTest.java | 441 ++++++++++++++++++ 21 files changed, 1512 insertions(+), 43 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index 6dbfa277cc7..397b09b7ce7 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1128,7 +1128,7 @@ property, when available, is noted below. The size threshold after which a request is considered a large request. If it is -1, then all requests are considered small, effectively turning off large request throttling. The default is -1. * *outstandingHandshake.limit* - (Jave system property only: **zookeeper.netty.server.outstandingHandshake.limit**) + (Java system property only: **zookeeper.netty.server.outstandingHandshake.limit**) The maximum in-flight TLS handshake connections could have in ZooKeeper, the connections exceed this limit will be rejected before starting handshake. This setting doesn't limit the max TLS concurrency, but helps avoid herd @@ -1145,7 +1145,7 @@ property, when available, is noted below. When set to 0, no requests will be throttled. The default is 0. * *learner.closeSocketAsync* - (Jave system property only: **learner.closeSocketAsync**) + (Java system property only: **learner.closeSocketAsync**) When enabled, a learner will close the quorum socket asynchronously. This is useful for TLS connections where closing a socket might take a long time, block the shutdown process, potentially delay a new leader election, and leave the quorum unavailabe. Closing the socket asynchronously avoids blocking the shutdown process despite the long socket closing time and a new leader election can be started while the socket being closed. The default is false. * *forward_learner_requests_to_commit_processor_disabled* @@ -1156,6 +1156,76 @@ property, when available, is noted below. The default value is false. +* *leader.snapPingIntervalInSeconds* + (Java system property only: **zookeeper.leader.snapPingIntervalInSeconds**) + Set the interval of snapshot scheduler, this is also the switch for + enabling/disabling snapshot scheduler. + + Snapshot scheduler is the feature used to coordinate the time of snapshot + happens in the quorum, which avoid high latency issue due to majority of + servers taking snapshot at the same time when running on a single disk + driver. + + A new quorum packet is added: SNAPPING, but it's backwards compatible and can be + rolled out safely with rolling restart. Leader will check and start the snapshot + scheduler if it's enabled, and send SNAPPING to the quorum. If the follower is + running old code, it will ignore that packet. When follower with new code received + SNAPPING packet, it will turn off the periodically snapshot locally, and only + taking safety snapshot if the txns since last snapshot is much larger than + the threshold defined in SyncRequestProcessor. This is used to avoid issues like + the follower accumulated too many txns before it is scheduled to take snapshot. + + The default value is -1, which disables the central snapshot scheduler in + quorum. The suggest value would be 20s, which means it checks and schedule + the next round of snapshot every 20s. Note that each round will only schedule + at most one server to take snapshot. + + Also there is a JMX setting on leader to turn it on and off in flight. + +* *leader.snapTxnsThreshold* + (Java system property only: **zookeeper.leader.snapTxnsThreshold**) + The minimal number of txns to schedule snapshot since last snapshot. The + default value is 100,000 which is the suggested value. + +* *leader.snapTxnsSizeThresholdKB* + (Java system property only: **zookeeper.leader.snapTxnsSizeThresholdKB**) + The minimal size of txns to scheduler snapshot since last snapshot. The + default value is 4GB, which is the suggested value. + +* *flushLatencyDrainThreshold* + (Java system property only: **zookeeper.flushLatencyDrainThreshold**) + The threshold used to decide if the learner is having high fsync time, + which might due to draining the previous snapshot data. The default value + is 200ms, and when a server exceeds this threshold, it will be considered + as draining, being excluded from the idle snapshot server set. + +* *learner.queueSizeDrainThreshold* + (Java system property only: **zookeeper.queueSizeDrainThreshold**) + The threshold used to decide if the learner is having long queue in + SyncRequestProcessor, the default value is 10,000. The server will be + considered as draining if it exceeds this threshold, and being excluded + from the idle snapshot server set. + +* *snapsync.unscheduledSnapshotThreshold* + (Java system property only: **zookeeper.snapsync.unscheduledSnapshotThreshold**) + If no snapshot is scheduled in the last N runs due to things like slow disk + on majority for a while, leader will go ahead and schedule the snapshot to + avoid out of disk issue. + + By default this value is 3, which means if no snapshot scheduled in 3 rounds, + leader will ignore the majority rule and schedule anyway. + +* *purgeAfterSnapshot.enabled* + (Java system property: **zookeeper.purgeAfterSnapshot.enabled**) + Purge or not after snapshot, from our test it's better to enable this to + include purge in the snapshot schedule to avoid high latency due to purge. + + The default value is false. + +* *fsyncSnapshotFromScheduler* + (Java system property: **zookeeper.fsyncSnapshotFromScheduler**) + Fsync the snapshot or not. The default value is true, which seems better + from our testing. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java index a12e5803c27..5d5b9fe01b7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java @@ -32,6 +32,14 @@ public class ZooDefs { public static final String ZOOKEEPER_NODE_SUBTREE = "/zookeeper/"; + /** + * WARN: please don't retain the order, which is used to check + * the op during snapshot schedule. + */ + public enum SnapPingCode { + CHECK, SNAP, SKIP, CANCEL; + } + @InterfaceAudience.Public public interface OpCode { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java index d152fabd8dd..b73fe286ef4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PurgeTxnLog.java @@ -73,12 +73,14 @@ static void printUsage() { * @throws IOException */ public static void purge(File dataDir, File snapDir, int num) throws IOException { + purge(new FileTxnSnapLog(dataDir, snapDir), num); + } + + public static void purge(FileTxnSnapLog txnLog, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } - FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); - List snaps = txnLog.findNValidSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java index 3962bb974f1..6a177a13df2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java @@ -243,6 +243,8 @@ private ServerMetrics(MetricsProvider metricsProvider) { REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR = metricsContext.getCounter( "requests_not_forwarded_to_commit_processor"); + TAKING_SAFE_SNAPSHOT = metricsContext.getCounter("taking_safe_snapshot"); + MANAGER_INITIATED_SAFE_SNAPSHOT = metricsContext.getCounter("manager_initiated_safe_snapshot"); } /** @@ -471,6 +473,9 @@ private ServerMetrics(MetricsProvider metricsProvider) { private final MetricsProvider metricsProvider; + public final Counter TAKING_SAFE_SNAPSHOT; + public final Counter MANAGER_INITIATED_SAFE_SNAPSHOT; + public void resetAll() { metricsProvider.resetAllValues(); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java new file mode 100644 index 00000000000..37d2db001a5 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Util class Used to control the behavior abouthow we take snapshot. + */ +public class SnapshotGenerator { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotGenerator.class); + + public static final String PURGE_AFTER_SNAPSHOT = "zookeeper.purgeAfterSnapshot.enabled"; + private static boolean purgeAfterSnapshot; + + public static final String FSYNC_SNAPSHOT_FROM_SCHEDULER = "zookeeper.fsyncSnapshotFromScheduler"; + private static boolean fsyncSnapshotFromScheduler; + + static { + purgeAfterSnapshot = Boolean.getBoolean(PURGE_AFTER_SNAPSHOT); + LOG.info("{} = {}", PURGE_AFTER_SNAPSHOT, purgeAfterSnapshot); + + fsyncSnapshotFromScheduler = Boolean.parseBoolean( + System.getProperty(FSYNC_SNAPSHOT_FROM_SCHEDULER, "true")); + LOG.info("{} = {}", FSYNC_SNAPSHOT_FROM_SCHEDULER, fsyncSnapshotFromScheduler); + } + + public static boolean getPurgeAfterSnapshot() { + return purgeAfterSnapshot; + } + + public static void setPurgeAfterSnapshot(boolean enabled) { + purgeAfterSnapshot = enabled; + LOG.info("{} = {}", PURGE_AFTER_SNAPSHOT, purgeAfterSnapshot); + } + + public static void setFsyncSnapshotFromScheduler(boolean fsync) { + fsyncSnapshotFromScheduler = fsync; + LOG.info("{} = {}", FSYNC_SNAPSHOT_FROM_SCHEDULER, fsyncSnapshotFromScheduler); + } + + public static boolean getFsyncSnapshotFromScheduler() { + return fsyncSnapshotFromScheduler; + } + + private final ZooKeeperServer zks; + private final ExecutorService worker; + private final AtomicBoolean isTakingSnapshot; + + public SnapshotGenerator(final ZooKeeperServer zks) { + this.zks = zks; + this.worker = Executors.newFixedThreadPool(1); + this.isTakingSnapshot = new AtomicBoolean(false); + } + + public boolean takeSnapshot(boolean syncSnap) { + // Only allow a single snapshot in progress. + if (isTakingSnapshot.compareAndSet(false, true)) { + this.worker.execute(new Runnable() { + @Override + public void run() { + try { + zks.takeSnapshot(syncSnap); + if (purgeAfterSnapshot) { + zks.purge(); + } + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + isTakingSnapshot.compareAndSet(true, false); + } + } + }); + return true; + } else { + LOG.warn("Previous snapshot is still in-flight, too busy to snap, skipping"); + return false; + } + } + + public boolean isSnapInProgress() { + return isTakingSnapshot.get(); + } + +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 4df319f86a4..45bd67f15db 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -25,10 +25,10 @@ import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.server.metric.AvgMinMaxCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,9 +69,14 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private int randRoll; private long randSize; + private AvgMinMaxCounter lastRequestFlushLatency = new AvgMinMaxCounter("flushDelay"); + + private static final double DEFAULT_SNAPSHOT_FACTOR = 0.5; + private static final double SAFETY_SNAPSHOT_FACTOR = 10; + private final BlockingQueue queuedRequests = new LinkedBlockingQueue(); - private final Semaphore snapThreadMutex = new Semaphore(1); + private final SnapshotGenerator snapGenerator; private final ZooKeeperServer zks; @@ -85,11 +90,31 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private final Queue toFlush; private long lastFlushTime; + private boolean onlySnapWhenSafetyIsThreatened = false; + + private static volatile Threshold selfSnapTxnRollThreshold = new Threshold( + (int) (snapCount * DEFAULT_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * DEFAULT_SNAPSHOT_FACTOR)); + private static volatile Threshold safetySnapThreshold = new Threshold( + (int) (snapCount * SAFETY_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * SAFETY_SNAPSHOT_FACTOR)); + + public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor; this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); + this.snapGenerator = zks.getSnapshotGenerator(); + } + + public void setOnlySnapWhenSafetyIsThreatened(boolean mode) { + if (mode != this.onlySnapWhenSafetyIsThreatened) { + LOG.info("Set onlySnapWhenSafetyIsThreatened to {}", mode); + } + this.onlySnapWhenSafetyIsThreatened = mode; + } + + public boolean isOnlySnapWhenSafetyIsThreatened() { + return this.onlySnapWhenSafetyIsThreatened; } /** @@ -99,6 +124,10 @@ public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) */ public static void setSnapCount(int count) { snapCount = count; + selfSnapTxnRollThreshold = new Threshold( + (int) (snapCount * DEFAULT_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * DEFAULT_SNAPSHOT_FACTOR)); + safetySnapThreshold = new Threshold( + (int) (snapCount * SAFETY_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * SAFETY_SNAPSHOT_FACTOR)); } /** @@ -140,25 +169,44 @@ public static void setSnapSizeInBytes(long size) { snapSizeInBytes = size; } - private boolean shouldSnapshot() { - int logCount = zks.getZKDatabase().getTxnCount(); - long logSize = zks.getZKDatabase().getTxnSize(); - return (logCount > (snapCount / 2 + randRoll)) - || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); - } + public static class Threshold { + + private final int countLimit; + private final long sizeLimit; - private void resetSnapshotStats() { - randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); - randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); + private int randCount; + private long randSize; + + public Threshold(int countLimit, long sizeLimit) { + this.countLimit = countLimit; + this.sizeLimit = sizeLimit; + resetRandomNum(); + } + + public boolean meet(int count, long size) { + boolean result = (count > (countLimit + randCount)) + || (sizeLimit > 0 && size > (sizeLimit + randSize)); + if (result) { + resetRandomNum(); + } + return result; + } + + /** + * we do this in an attempt to ensure that not all of the servers + * in the ensemble take a snapshot at the same time + */ + private void resetRandomNum() { + randCount = ThreadLocalRandom.current().nextInt(countLimit); + randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (sizeLimit)); + } } @Override public void run() { try { - // we do this in an attempt to ensure that not all of the servers - // in the ensemble take a snapshot at the same time - resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); + final ZKDatabase zkDB = zks.getZKDatabase(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); @@ -178,27 +226,22 @@ public void run() { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); // track the number of records written to the log - if (!si.isThrottled() && zks.getZKDatabase().append(si)) { - if (shouldSnapshot()) { - resetSnapshotStats(); + if (!si.isThrottled() && zkDB.append(si)) { + if (selfSnapTxnRollThreshold.meet(zkDB.getTxnCount(), zkDB.getTxnSize())) { // roll the log - zks.getZKDatabase().rollLog(); - // take a snapshot - if (!snapThreadMutex.tryAcquire()) { - LOG.warn("Too busy to snap, skipping"); + zkDB.rollLog(); + + if (onlySnapWhenSafetyIsThreatened) { + if (safetySnapThreshold.meet(zkDB.getTxnsSinceLastSnap(), zkDB.getTxnsSizeSinceLastSnap())) { + snapGenerator.takeSnapshot(false); + ServerMetrics.getMetrics().TAKING_SAFE_SNAPSHOT.add(1); + LOG.info("The txns size since last snapshot is larger than 10x, " + + "going to take a snapshot for safe."); + } } else { - new ZooKeeperThread("Snapshot Thread") { - public void run() { - try { - zks.takeSnapshot(); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - } finally { - snapThreadMutex.release(); - } - } - }.start(); + snapGenerator.takeSnapshot(false); } + } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads @@ -241,6 +284,7 @@ private void flush() throws IOException, RequestProcessorException { while (!this.toFlush.isEmpty()) { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; + lastRequestFlushLatency.addDataPoint(latency); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); this.nextProcessor.processRequest(i); } @@ -251,6 +295,12 @@ private void flush() throws IOException, RequestProcessorException { lastFlushTime = Time.currentElapsedTime(); } + public long getLastRequestFlushLatency() { + long result = (long) lastRequestFlushLatency.getAvg(); + lastRequestFlushLatency.reset(); + return result; + } + public void shutdown() { LOG.info("Shutting down"); queuedRequests.add(REQUEST_OF_DEATH); @@ -278,4 +328,7 @@ public void processRequest(final Request request) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); } + public int getQueuedRequestsSize() { + return queuedRequests.size(); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index f758f5de5ae..46fdae39287 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -79,6 +79,16 @@ public class ZKDatabase { protected FileTxnSnapLog snapLog; protected long minCommittedLog, maxCommittedLog; + /** + * Used to track the approximate number of txns since the beginning of + * last snapshot happened. Leader is relying on this to schedule the + * snapshot on servers who have largest txns since last snapshot. + * + * It's different than txnCount defined in this class, which is used to + * track the snapshot locally. + */ + protected AtomicInteger txnsSinceLastSnap = new AtomicInteger(0); + /** * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot */ @@ -271,6 +281,7 @@ public ConcurrentHashMap getSessionWithTimeOuts() { private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { public void onTxnLoaded(TxnHeader hdr, Record txn, TxnDigest digest) { addCommittedProposal(hdr, txn, digest); + txnsSinceLastSnap.incrementAndGet(); } }; @@ -631,6 +642,18 @@ public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedE SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts()); } + public int getTxnsSinceLastSnap() { + return txnsSinceLastSnap.get(); + } + + public void resetTxnSinceLastSnap() { + txnsSinceLastSnap.set(0); + } + + public long getTxnsSizeSinceLastSnap() { + return snapLog.getTxnsSizeSinceLastSnap(); + } + /** * append to the underlying transaction log * @param si the request to append @@ -638,6 +661,7 @@ public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedE */ public boolean append(Request si) throws IOException { txnCount.incrementAndGet(); + txnsSinceLastSnap.incrementAndGet(); return this.snapLog.append(si); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index f37225edbf0..5b746250d26 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -180,6 +180,7 @@ public static void setCloseSessionTxnEnabled(boolean enabled) { private final RequestPathMetricsCollector requestPathMetricsCollector; private boolean localSessionEnabled = false; + private SnapshotGenerator snapGenerator; protected enum State { INITIAL, RUNNING, @@ -323,6 +324,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio GET_CHILDREN_RESPONSE_CACHE_SIZE, ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE)); + this.snapGenerator = new SnapshotGenerator(this); this.initialConfig = initialConfig; this.requestPathMetricsCollector = new RequestPathMetricsCollector(); @@ -345,6 +347,19 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio txnLogFactory.getSnapDir()); } + public SnapshotGenerator getSnapshotGenerator() { + return this.snapGenerator; + } + + // Visible for testing. + public void setSnapshotGenerator(SnapshotGenerator generator) { + this.snapGenerator = generator; + } + + public void purge() throws IOException { + PurgeTxnLog.purge(txnLogFactory, txnLogFactory.getSnapRetainCount()); + } + public String getInitialConfig() { return initialConfig; } @@ -511,6 +526,7 @@ public void takeSnapshot() { public void takeSnapshot(boolean syncSnap) { long start = Time.currentElapsedTime(); try { + zkDb.resetTxnSinceLastSnap(); txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); @@ -523,6 +539,10 @@ public void takeSnapshot(boolean syncSnap) { ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); } + public void resetTxnSinceLastSnap() { + zkDb.resetTxnSinceLastSnap(); + } + @Override public long getDataDirSize() { if (zkDb == null) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java index 62969bac8a6..6957c500f2e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java @@ -167,6 +167,16 @@ public class FileTxnLog implements TxnLog, Closeable { */ private long prevLogsRunningTotal; + private long txnsSizeSinceLastSnapshot; + + public synchronized void resetTxnsSizeSinceLastSnap() { + txnsSizeSinceLastSnapshot = 0; + } + + public synchronized long getTxnsSizeSinceLastSnap() { + return txnsSizeSinceLastSnapshot + getCurrentLogSize(); + } + /** * constructor for FileTxnLog. Take the directory * where the txnlogs are stored @@ -235,7 +245,9 @@ protected Checksum makeChecksumAlgorithm() { public synchronized void rollLog() throws IOException { if (logStream != null) { this.logStream.flush(); - prevLogsRunningTotal += getCurrentLogSize(); + long currentLogSize = getCurrentLogSize(); + prevLogsRunningTotal += currentLogSize; + txnsSizeSinceLastSnapshot += currentLogSize; this.logStream = null; oa = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index eddeae8d93e..a7232d642c4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -78,6 +78,16 @@ public class FileTxnSnapLog { private static final String EMPTY_SNAPSHOT_WARNING = "No snapshot found, but there are log entries. "; + private int snapRetainCount = -1; + + public void setSnapRetainCount(int snapRetainCount) { + this.snapRetainCount = snapRetainCount; + } + + public int getSnapRetainCount() { + return this.snapRetainCount; + } + /** * This listener helps * the external apis calling @@ -469,6 +479,7 @@ public void save( File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { + txnLog.resetTxnsSizeSinceLastSnap(); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } catch (IOException e) { if (snapshotFile.length() == 0) { @@ -658,4 +669,8 @@ public void setTotalLogSize(long size) { public long getTotalLogSize() { return txnLog.getTotalLogSize(); } + + public long getTxnsSizeSinceLastSnap() { + return txnLog.getTxnsSizeSinceLastSnap(); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java index b5572087fb0..5a6df8429ec 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java @@ -122,6 +122,10 @@ public interface TxnLog extends Closeable { */ long getTotalLogSize(); + void resetTxnsSizeSinceLastSnap(); + + long getTxnsSizeSinceLastSnap(); + /** * an iterating interface for reading * transaction logs. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java index eb6742f5c6d..c05818383a5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java @@ -18,15 +18,21 @@ package org.apache.zookeeper.server.quorum; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; import org.apache.jute.Record; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooDefs.SnapPingCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.SnapshotGenerator; import org.apache.zookeeper.server.TxnLogEntry; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -51,6 +57,7 @@ public class Follower extends Learner { this.self = self; this.zk = zk; this.fzk = zk; + this.snapGenerator = zk.getSnapshotGenerator(); } @Override @@ -150,6 +157,85 @@ void followLeader() throws InterruptedException { } } + // Visible for testing + protected void snapPing(QuorumPacket qp) throws IOException { + int peerSnapPingVersion = -1; + long snapPingId = -1; + int snapCode = -1; + try { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + peerSnapPingVersion = dis.readInt(); + snapPingId = dis.readLong(); + snapCode = dis.readInt(); + } catch (IOException e) { + LOG.error("Error while parsing SnapPing packet", e); + return; + } + + if (peerSnapPingVersion != SnapPingManager.SNAP_PING_VERSION) { + LOG.warn("The SnapPing version on leader is {} which is not " + + "compatible with ours {}, will skip", peerSnapPingVersion, + SnapPingManager.SNAP_PING_VERSION); + if (fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) { + LOG.info("SnapPing version incompatible, start self snapshot"); + fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(false); + } + return; + } + + if (snapCode == SnapPingCode.CANCEL.ordinal()) { + if (fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) { + LOG.info("Snapshot schedule cancelled by leader, start self snapshot"); + fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(false); + } + return; + } + + // If SNAP schedule is enabled, then disabled the periodically SNAP + // in SyncRequestProcessor, and only take snapshot if there is no + // snapshot scheduled with too many txns since last SNAP. + // + // This is useful to deal with the following cases: + // + // 1. The follower accumulated too many txns before it is scheduled + // to take snapshot. + // 2. There are 2/5 servers down for a long time, and the leader is + // not issuing snap for a long time. + if (!fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) { + LOG.info("Snapshot schedule enabled on leader, stop self " + + "snapshot unless safety is threatened"); + fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(true); + } + + // Check and take a snapshot if needed, do not consider the snapshot + // the syncProcessor is doing. + if (snapCode == SnapPingCode.SNAP.ordinal()) { + // Sync the snapshot when snapshot schedule is enabled, to recover + // faster and reduce the impact on txns fsync. + if (snapGenerator.takeSnapshot( + SnapshotGenerator.getFsyncSnapshotFromScheduler())) { + LOG.info("Taking a snapshot with SNAPPING"); + } + } else if (snapCode == SnapPingCode.CHECK.ordinal()) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeLong(snapPingId); + dos.writeBoolean(snapGenerator.isSnapInProgress()); + // Note: the txnsSinceLastSnap will only updated after we finished SNAP + dos.writeInt(zk.getZKDatabase().getTxnsSinceLastSnap()); + dos.writeLong(fzk.syncProcessor == null ? 0 + : fzk.syncProcessor.getLastRequestFlushLatency()); + dos.writeInt(fzk.syncProcessor == null ? 0 + : fzk.syncProcessor.getQueuedRequestsSize()); + dos.writeLong(zk.getZKDatabase().getTxnsSizeSinceLastSnap()); + qp.setData(bos.toByteArray()); + writePacket(qp, true); + } + } + + private SnapshotGenerator snapGenerator; + /** * Examine the packet received in qp and dispatch based on its contents. * @param qp @@ -157,6 +243,9 @@ void followLeader() throws InterruptedException { */ protected void processPacket(QuorumPacket qp) throws Exception { switch (qp.getType()) { + case Leader.SNAPPING: + snapPing(qp); + break; case Leader.PING: ping(qp); break; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 0eb3722a47c..3611e905b4e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -54,6 +55,7 @@ import javax.security.sasl.SaslException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooDefs.SnapPingCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.ExitCode; @@ -61,10 +63,13 @@ import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.SnapshotGenerator; +import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperCriticalThread; import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData; import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.SerializeUtils; @@ -76,7 +81,7 @@ /** * This class has the control logic for the Leader. */ -public class Leader extends LearnerMaster { +public class Leader extends LearnerMaster implements SnapPingListener { private static final Logger LOG = LoggerFactory.getLogger(Leader.class); @@ -115,10 +120,22 @@ public static int getAckLoggingFrequency() { return ackLoggingFrequency; } + SnapPingManager snapPingManager; + + public void setSnapPingIntervalInSeconds(int seconds) { + snapPingManager.setSnapPingIntervalInSeconds(seconds); + } + + public int getSnapPingIntervalInSeconds() { + return snapPingManager.getSnapPingIntervalInSeconds(); + } + final LeaderZooKeeperServer zk; final QuorumPeer self; + private SnapshotGenerator leaderSnapGenerator; + // VisibleForTesting protected boolean quorumFormed = false; @@ -158,6 +175,13 @@ public List getForwardingFollowers() { } } + public List getSnapPingListeners() { + List listeners = new ArrayList(); + listeners.addAll(getForwardingFollowers()); + listeners.add(this); + return listeners; + } + public List getNonVotingFollowers() { List nonVotingFollowers = new ArrayList(); synchronized (forwardingFollowers) { @@ -170,7 +194,8 @@ public List getNonVotingFollowers() { return nonVotingFollowers; } - void addForwardingFollower(LearnerHandler lh) { + // VisibleForTesting + protected void addForwardingFollower(LearnerHandler lh) { synchronized (forwardingFollowers) { forwardingFollowers.add(lh); } @@ -303,6 +328,9 @@ public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException { } this.zk = zk; + + this.leaderSnapGenerator = zk.getSnapshotGenerator(); + this.snapPingManager = new SnapPingManager(this); } Optional createServerSocket(InetSocketAddress address, boolean portUnification, boolean sslQuorum) { @@ -425,6 +453,12 @@ Optional createServerSocket(InetSocketAddress address, boolean por */ static final int INFORMANDACTIVATE = 19; + /** + * Get the information from follower to decide the next server which is + * going to take snapshot. + */ + static final int SNAPPING = 100; + final ConcurrentMap outstandingProposals = new ConcurrentHashMap(); private final ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue(); @@ -432,6 +466,8 @@ Optional createServerSocket(InetSocketAddress address, boolean por // VisibleForTesting protected final Proposal newLeaderProposal = new Proposal(); + private ScheduledExecutorService snapPinger; + class LearnerCnxAcceptor extends ZooKeeperCriticalThread { private final AtomicBoolean stop = new AtomicBoolean(false); @@ -805,6 +841,10 @@ void shutdown(String reason) { closeSockets(); } + if (snapPingManager != null) { + snapPingManager.shutdown(); + } + // NIO should not accept conenctions self.setZooKeeperServer(null); self.adminServer.setZooKeeperServer(null); @@ -1750,4 +1790,73 @@ public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) { } } + private SyncRequestProcessor getSyncRequestProcessor() { + if (zk.proposalProcessor == null) { + return null; + } + return zk.proposalProcessor.syncProcessor; + } + + public void processSnapPing(SnapPingData spData) { + snapPingManager.processSnapPing(spData); + } + + @Override + public void snapPing(long snapPingId, SnapPingCode code) { + SyncRequestProcessor syncProcessor = getSyncRequestProcessor(); + + if (syncProcessor != null && code != SnapPingCode.CANCEL) { + if (!syncProcessor.isOnlySnapWhenSafetyIsThreatened()) { + LOG.info("Snapshot schedule enabled on leader, stop self " + + "snapshot unless safety is threatened"); + syncProcessor.setOnlySnapWhenSafetyIsThreatened(true); + } + } + + switch (code) { + case CANCEL: + if (syncProcessor != null) { + LOG.info("Snapshot schedule cancelled by leader, start self snapshot"); + syncProcessor.setOnlySnapWhenSafetyIsThreatened(false); + } + break; + case SNAP: + if (leaderSnapGenerator.takeSnapshot( + SnapshotGenerator.getFsyncSnapshotFromScheduler())) { + LOG.info("Taking a snapshot with SNAPPING"); + } + break; + case CHECK: + long lastRequestFlushLatency = 0; + int syncProcessorQueuedRequests = 0; + + if (syncProcessor != null) { + lastRequestFlushLatency = syncProcessor.getLastRequestFlushLatency(); + syncProcessorQueuedRequests = syncProcessor.getQueuedRequestsSize(); + } + + ZKDatabase zkDB = zk.getZKDatabase(); + SnapPingData spData = new SnapPingData(getSid(), snapPingId, + leaderSnapGenerator.isSnapInProgress(), + zkDB.getTxnsSinceLastSnap(), + lastRequestFlushLatency, syncProcessorQueuedRequests, + zkDB.getTxnsSizeSinceLastSnap(), 0); + + processSnapPing(spData); + break; + case SKIP: + break; + default: + break; + } + } + + @Override + public long getSid() { + return self.getId(); + } + + QuorumVerifier getQuorumVerifier() { + return self.getQuorumVerifier(); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java index 8d5b48ff589..cd26ebcfd5e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java @@ -106,4 +106,13 @@ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) { leader.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs); } + @Override + public int getSnapPingIntervalInSeconds() { + return leader.getSnapPingIntervalInSeconds(); + } + + @Override + public void setSnapPingIntervalInSeconds(int seconds) { + leader.setSnapPingIntervalInSeconds(seconds); + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java index e64d2f3820d..03a9ace896b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java @@ -85,4 +85,13 @@ public interface LeaderMXBean extends ZooKeeperServerMXBean { */ void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs); + /** + * @return the frequency of sending SNAPPING internal + */ + int getSnapPingIntervalInSeconds(); + + /** + * @param seconds update the frequency of sending SNAPPING + */ + void setSnapPingIntervalInSeconds(int seconds); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 8834b5fbdb1..c5b5768fce7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -50,6 +50,8 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { PrepRequestProcessor prepRequestProcessor; + ProposalRequestProcessor proposalProcessor; + /** * @throws IOException */ @@ -67,7 +69,7 @@ protected void setupRequestProcessors() { RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); - ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); + proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 3bab398b41e..8d972f57230 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -21,7 +21,9 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; @@ -40,6 +42,7 @@ import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZooDefs.SnapPingCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.TxnLogProposalIterator; @@ -48,6 +51,7 @@ import org.apache.zookeeper.server.ZooTrace; import org.apache.zookeeper.server.quorum.Leader.Proposal; import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; +import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData; import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; import org.apache.zookeeper.server.util.MessageTracker; import org.apache.zookeeper.server.util.ZxidUtils; @@ -59,7 +63,7 @@ * learner. All communication with a learner is handled by this * class. */ -public class LearnerHandler extends ZooKeeperThread { +public class LearnerHandler extends ZooKeeperThread implements SnapPingListener { private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class); @@ -82,10 +86,27 @@ public Socket getSocket() { */ protected long sid = 0; - long getSid() { + protected volatile boolean snapInProgress; + protected volatile int txnsSinceLastSnap; + protected volatile long lastRequestFlushLatency; + + @Override + public long getSid() { return sid; } + boolean isSnapInProgress() { + return snapInProgress; + } + + int getTxnsSinceLastSnap() { + return txnsSinceLastSnap; + } + + long getLastRequestFlushLatency() { + return lastRequestFlushLatency; + } + String getRemoteAddress() { return sock == null ? "" : sock.getRemoteSocketAddress().toString(); } @@ -677,6 +698,21 @@ public void run() { learnerMaster.touch(sess, to); } break; + case Leader.SNAPPING: + // Only deal with SNAPPING on leader + if (learnerMaster instanceof Leader) { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(qp.getData()); + DataInputStream dataIS = new DataInputStream(bais); + SnapPingData spData = new SnapPingData( + sid, dataIS, getQueuedPacketsSize()); + ((Leader) learnerMaster).processSnapPing(spData); + } catch (IOException e) { + LOG.info("Error while processing snapPing data " + + "from {}", sid, e); + } + } + break; case Leader.REVALIDATE: ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1); learnerMaster.revalidateSession(qp, this); @@ -1079,6 +1115,20 @@ public void ping() { } } + @Override + public void snapPing(long snapPingId, SnapPingCode code) throws IOException { + if (!sendingThreadStarted) { + return; + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeInt(SnapPingManager.SNAP_PING_VERSION); + oa.writeLong(snapPingId); + oa.writeInt(code.ordinal()); + oa.close(); + queuePacket(new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null)); + } + /** * Queue leader packet of a given type * @param type @@ -1143,6 +1193,10 @@ public Queue getQueuedPackets() { return queuedPackets; } + public int getQueuedPacketsSize() { + return queuedPackets.size(); + } + /** * For testing, we need to reset this value */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index a6f94ec3bd1..713e15d9814 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -173,7 +173,11 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServ } quorumPeer = getQuorumPeer(); - quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); + + FileTxnSnapLog txnSnapLog = new FileTxnSnapLog( + config.getDataLogDir(), config.getDataDir()); + txnSnapLog.setSnapRetainCount(config.getSnapRetainCount()); + quorumPeer.setTxnFactory(txnSnapLog); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java new file mode 100644 index 00000000000..6134986e23b --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java @@ -0,0 +1,35 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import org.apache.zookeeper.ZooDefs.SnapPingCode; + +/** + * The interface of SnapPing event distributer and handler. + */ +public interface SnapPingListener { + + long SNAP_PING_ID_DONT_CARE = -1; + + long getSid(); + + void snapPing(long snapPingId, SnapPingCode code) throws IOException; +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java new file mode 100644 index 00000000000..e75a5170bde --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.ZooDefs.SnapPingCode; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the snap ping check, collect information, and based on the + * status and strategy to decide who should take snapshot. + */ +public class SnapPingManager { + private static final Logger LOG = LoggerFactory.getLogger(SnapPingManager.class); + + public static final int SNAP_PING_VERSION = 2; + public static final String SNAP_PING_INTERVAL_IN_SECONDS = "zookeeper.leader.snapPingIntervalInSeconds"; + + private int snapPingIntervalInSeconds; + + private ScheduledExecutorService snapPinger; + private final SnapPingHandler spHandler; + private final Leader leader; + + public SnapPingManager(Leader leader) { + this.leader = leader; + this.spHandler = new SnapPingHandler(leader); + this.snapPingIntervalInSeconds = Integer.getInteger(SNAP_PING_INTERVAL_IN_SECONDS, -1); + LOG.info("{} = {}", SNAP_PING_INTERVAL_IN_SECONDS, snapPingIntervalInSeconds); + + if (snapPingIntervalInSeconds > 0) { + restartSnapPinger(); + } + } + + public void processSnapPing(SnapPingData spData) { + spHandler.processSnapPing(spData); + } + + public void setSnapPingIntervalInSeconds(int seconds) { + if (seconds != snapPingIntervalInSeconds) { + snapPingIntervalInSeconds = seconds; + LOG.info("Updated snapPingIntervalInSeconds to {}", snapPingIntervalInSeconds); + } + + if (snapPingIntervalInSeconds < 0) { + cancelSnapSchedule(); + LOG.info("snapPingIntervalInSeconds is less than 0, going to " + + "disable snapshot schedule"); + } else { + restartSnapPinger(); + } + } + + public int getSnapPingIntervalInSeconds() { + return snapPingIntervalInSeconds; + } + + final Runnable snapPingRunnable = new Runnable() { + long snapPingId = 1; + + @Override + public void run() { + + // Don't need to have a strong guarantee of the current view of + // voting members when doing snap schedule. + Map currentVotingMembers = + leader.getQuorumVerifier().getVotingMembers(); + List listeners = leader.getSnapPingListeners(); + + // Trigger a SnapPing to check the status + for (SnapPingListener listener : listeners) { + long sid = listener.getSid(); + if (currentVotingMembers.containsKey(sid)) { + try { + listener.snapPing(snapPingId, SnapPingCode.CHECK); + } catch (IOException e) { + LOG.error("Exception when sending SNAPPING " + + "to {}, {}", sid, e); + } + } + } + + if (++snapPingId < 0) { + snapPingId = 1; + } + } + }; + + public void shutdown() { + if (snapPinger != null) { + snapPinger.shutdownNow(); + snapPinger = null; + } + } + + private void restartSnapPinger() { + shutdown(); + snapPinger = Executors.newSingleThreadScheduledExecutor(); + snapPinger.scheduleAtFixedRate(snapPingRunnable, 0, + snapPingIntervalInSeconds, TimeUnit.SECONDS); + LOG.info("Started snapPinger with interval {}s", snapPingIntervalInSeconds); + } + + private void cancelSnapSchedule() { + boolean snapPinerTerminated = true; + if (snapPinger != null) { + snapPinger.shutdownNow(); + try { + snapPinerTerminated = snapPinger.awaitTermination( + 60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Got InterruptedException while waiting for " + + "snapPinger to terminate"); + } finally { + snapPinger = null; + } + } + + // This is unlikely to happen + if (!snapPinerTerminated) { + LOG.warn("SnapPinger is not terminated yet, learner may " + + "not enable self snapshot."); + } + + Map currentVotingMembers = + leader.getQuorumVerifier().getVotingMembers(); + // Resume self snapshot behavior on learner + List listeners = leader.getSnapPingListeners(); + for (SnapPingListener listener : listeners) { + long sid = listener.getSid(); + if (currentVotingMembers.containsKey(sid)) { + try { + listener.snapPing(SnapPingListener.SNAP_PING_ID_DONT_CARE, + SnapPingCode.CANCEL); + } catch (IOException e) { + LOG.error("Exception when sending SNAPPING " + + "to {}, {}", sid, e); + } + } + } + } + + public static class SnapPingHandler { + + public static final String SNAP_TXNS_THRESHOLD = "zookeeper.leader.snapTxnsThreshold"; + public static final String SNAP_TXNS_SIZE_THRESHOLD_KB = "zookeeper.leader.snapTxnsSizeThresholdKB"; + public static final String FLUSH_LATENCY_DRAIN_THRESHOLD = "zookeeper.flushLatencyDrainThreshold"; + public static final String LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD = "zookeeper.learner.queueSizeDrainThreshold"; + public static final String UNSCHEDULED_SNAP_THRESHOLD = "zookeeper.snapsync.unscheduledSnapshotThreshold"; + + private int snapTxnsThreshold; + private long snapTxnsSizeThresholdBytes; + private int flushLatencyDrainThreshold; + private int learnerQueueSizeDrainThreshold; + private int unscheduledSnapThreshold; + private int idleSnapPings = 0; + + private final Leader leader; + private long currentSnapPingId = -1; + private Set data = new HashSet(); + + private int votingMemberSize = 0; + private Map currentVotingMembers; + + public SnapPingHandler(Leader leader) { + this.leader = leader; + + this.snapTxnsThreshold = Integer.getInteger(SNAP_TXNS_THRESHOLD, 100000); + // by default using 4GB + this.snapTxnsSizeThresholdBytes = Long.getLong(SNAP_TXNS_SIZE_THRESHOLD_KB, 4 * 1024 * 1024L) * 1024; + this.flushLatencyDrainThreshold = Integer.getInteger( + FLUSH_LATENCY_DRAIN_THRESHOLD, 200); + this.learnerQueueSizeDrainThreshold = Integer.getInteger( + LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD, 10000); + this.unscheduledSnapThreshold = Integer.getInteger(UNSCHEDULED_SNAP_THRESHOLD, 3); + + LOG.info("{} = {}, {} = {} KB, {} = {}, {} = {}, {} = {}", + SNAP_TXNS_THRESHOLD, snapTxnsThreshold, + SNAP_TXNS_SIZE_THRESHOLD_KB, snapTxnsSizeThresholdBytes, + FLUSH_LATENCY_DRAIN_THRESHOLD, flushLatencyDrainThreshold, + LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD, learnerQueueSizeDrainThreshold, + UNSCHEDULED_SNAP_THRESHOLD, unscheduledSnapThreshold); + } + + private boolean snapsThresholdExceeded(int maxLearnerTxnsSinceLastSnap, long maxLearnerTxnsSizeSinceLastSnap) { + return maxLearnerTxnsSinceLastSnap > snapTxnsThreshold || ((snapTxnsSizeThresholdBytes > 0) + && maxLearnerTxnsSizeSinceLastSnap > snapTxnsSizeThresholdBytes); + } + + public void processData() { + // Find out the total number of learners not taking snapshot. + int numOfMembersNotInSnap = 0; + int numOfMembersDraining = 0; + int maxLearnerTxnsSinceLastSnap = 0; + int numSnapsInProgress = 0; + long maxLearnerTxnsSizeSinceLastSnap = 0; + long learnerWithMaxTxnsNotInSnap = -1; + for (SnapPingData f : data) { + // Given the order of snapInProgress and txnsSinceLastSnap + // being assigned in LearnerHandler with received SNAPPING, + // and given how we're using it here, there is no need to + // do the atomic update and check for these. + long sid = f.getSid(); + if (!currentVotingMembers.containsKey(sid)) { + continue; + } + if (f.isSnapInProgress()) { + numSnapsInProgress++; + continue; + } + numOfMembersNotInSnap++; + int txns = f.getTxnsSinceLastSnap(); + long txnsSize = f.getTxnsSizeSinceLastSnap(); + if (txns > maxLearnerTxnsSinceLastSnap + && txnsSize > maxLearnerTxnsSizeSinceLastSnap) { + maxLearnerTxnsSinceLastSnap = txns; + maxLearnerTxnsSizeSinceLastSnap = txnsSize; + learnerWithMaxTxnsNotInSnap = sid; + } + long lastRequestFlushLatency = f.getLastRequestFlushLatency(); + if (lastRequestFlushLatency > flushLatencyDrainThreshold) { + numOfMembersDraining++; + LOG.info("{} has long flush delay {}, mark it as draining", + sid, lastRequestFlushLatency); + } else if (f.getQueuedRequestsSize() > learnerQueueSizeDrainThreshold) { + numOfMembersDraining++; + LOG.info("{} has long queue {}, mark it as draining", + sid, f.getQueuedRequestsSize()); + } else if (f.getLearnerHandlerQueueSize() > learnerQueueSizeDrainThreshold) { + numOfMembersDraining++; + LOG.info("{} has long learner queue {}, mark it as draining", + sid, f.getLearnerHandlerQueueSize()); + } + } + + // Doesn't distinguish the server weight in the quorum hierarchical model + // for now. + int majority = currentVotingMembers.size() / 2 + 1; + int numOfCandidates = numOfMembersNotInSnap - majority - numOfMembersDraining; + + long learnerSnapCandidate = -1; + if (snapsThresholdExceeded(maxLearnerTxnsSinceLastSnap, maxLearnerTxnsSizeSinceLastSnap)) { + // Schedule a snapshot when there is an available candidate OR when we have a majority running, but + // haven't been able to schedule a snapshot in a while due to a lack of candidates. This second + // condition should protect against all workers scheduling safety snapshots when running in a degraded + // state. + if (numOfCandidates > 0 || (numOfMembersNotInSnap >= majority && numSnapsInProgress == 0 + && unscheduledSnapThreshold > -1 && ++idleSnapPings >= unscheduledSnapThreshold)) { + learnerSnapCandidate = learnerWithMaxTxnsNotInSnap; + idleSnapPings = 0; + LOG.info("Going to tell {} to take snapshot, txns since " + "last snapshot is {}, {}", + learnerSnapCandidate, maxLearnerTxnsSinceLastSnap, maxLearnerTxnsSizeSinceLastSnap); + if (numOfCandidates <= 0) { + ServerMetrics.getMetrics().MANAGER_INITIATED_SAFE_SNAPSHOT.add(1); + } + } + } + + // Send SNAPPING to all current participants, non-voting + // members are not considered here, those non-voting ones + // will take snapshot periodically as before. + List listeners = leader.getSnapPingListeners(); + for (SnapPingListener listener : listeners) { + long sid = listener.getSid(); + if (currentVotingMembers.containsKey(sid)) { + try { + listener.snapPing(SnapPingListener.SNAP_PING_ID_DONT_CARE, + sid == learnerSnapCandidate + ? SnapPingCode.SNAP : SnapPingCode.SKIP); + } catch (IOException e) { + LOG.error("Exception when sending SNAPPING " + + "to {}, {}", sid, e); + } + } + } + + // clear the processed data points + data.clear(); + } + + public synchronized void processSnapPing(SnapPingData spData) { + if (spData.getSnapPingId() > currentSnapPingId) { + if (!data.isEmpty()) { + processData(); + } + currentSnapPingId = spData.snapPingId; + } + data.add(spData); + + currentVotingMembers = leader.getQuorumVerifier().getVotingMembers(); + votingMemberSize = currentVotingMembers.size(); + + if (data.size() == votingMemberSize) { + processData(); + } + } + } + + public static class SnapPingData { + + private long sid; + private long snapPingId; + private boolean snapInProgress; + private int txnsSinceLastSnap; + private long lastRequestFlushLatency; + private int syncProcessorQueuedRequests; + private long txnsSizeSinceLastSnap; + private int learnerHandlerQueueSize; + + public SnapPingData(long sid, DataInputStream dataIS, + int learnerHandlerQueueSize) throws IOException { + this(sid, dataIS.readLong(), dataIS.readBoolean(), + dataIS.readInt(), dataIS.readLong(), dataIS.readInt(), + dataIS.readLong(), learnerHandlerQueueSize); + } + + public SnapPingData(long sid, long snapPingId, boolean snapInProgress, + int txnsSinceLastSnap, long lastRequestFlushLatency, + int syncProcessorQueuedRequests, long txnsSizeSinceLastSnap, + int learnerHandlerQueueSize) { + this.sid = sid; + this.snapPingId = snapPingId; + this.snapInProgress = snapInProgress; + this.txnsSinceLastSnap = txnsSinceLastSnap; + this.lastRequestFlushLatency = lastRequestFlushLatency; + this.syncProcessorQueuedRequests = syncProcessorQueuedRequests; + this.txnsSizeSinceLastSnap = txnsSizeSinceLastSnap; + this.learnerHandlerQueueSize = learnerHandlerQueueSize; + LOG.info("{}: {}, snap in progress: {}, txns since last " + + "snap: {}, last request flush delay: {}, " + + "sync processor queued requests: {}, " + + "txn size in bytes since last snap: {}, " + + "learner handler queue size: {}", snapPingId, sid, + snapInProgress, txnsSinceLastSnap, + lastRequestFlushLatency, syncProcessorQueuedRequests, + txnsSizeSinceLastSnap, learnerHandlerQueueSize); + } + + public long getSid() { + return sid; + } + + public long getSnapPingId() { + return snapPingId; + } + + public boolean isSnapInProgress() { + return snapInProgress; + } + + public int getTxnsSinceLastSnap() { + return txnsSinceLastSnap; + } + + public long getLastRequestFlushLatency() { + return lastRequestFlushLatency; + } + + public int getQueuedRequestsSize() { + return syncProcessorQueuedRequests; + } + + public long getTxnsSizeSinceLastSnap() { + return txnsSizeSinceLastSnap; + } + + public int getLearnerHandlerQueueSize() { + return learnerHandlerQueueSize; + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java new file mode 100644 index 00000000000..3ed2f245c49 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java @@ -0,0 +1,441 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs.SnapPingCode; +import org.apache.zookeeper.server.SnapshotGenerator; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class SnapPingTest extends ZKTestCase { + + private ZKDatabase zkDb; + private Leader leader; + private SnapshotGenerator leaderSnapGenerator; + private static final long TXNS_SIZE_THRESHOLD = 4000 * 1024 * 1024L; + + @Before + public void setup() throws Exception { + System.setProperty( + SnapPingManager.SnapPingHandler.SNAP_TXNS_SIZE_THRESHOLD_KB, + "" + (TXNS_SIZE_THRESHOLD / 1024)); + // create a 5 members voting quorum + Map votingMembers = new HashMap(); + for (long i = 1L; i <= 5L; i++) { + votingMembers.put(i, null); + } + QuorumPeer qp = new QuorumPeer(); + QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class); + when(quorumVerifierMock.getVotingMembers()).thenReturn(votingMembers); + when(quorumVerifierMock.getAllMembers()).thenReturn(LeaderBeanTest.getMockedPeerViews(qp.getId())); + qp.setQuorumVerifier(quorumVerifierMock, false); + qp.setId(5L); + + zkDb = mock(ZKDatabase.class); + File tmpDir = ClientBase.createEmptyTestDir(); + LeaderZooKeeperServer lzs = new LeaderZooKeeperServer( + new FileTxnSnapLog(tmpDir, tmpDir), qp, zkDb); + leaderSnapGenerator = mock(SnapshotGenerator.class); + lzs.setSnapshotGenerator(leaderSnapGenerator); + leader = new Leader(qp, lzs); + + leader.addForwardingFollower(genLearnerHandler(1L)); + leader.addForwardingFollower(genLearnerHandler(2L)); + leader.addForwardingFollower(genLearnerHandler(3L)); + leader.addForwardingFollower(genLearnerHandler(4L)); + } + + /** + * Verify when leader has the maximum txns since last snapshot, it will + * take snapshot. Non voting followers are being ignored. + */ + @Test + public void testLeaderSnapSchedule() throws Exception { + // add a non-voting follower + leader.addForwardingFollower(genLearnerHandler(8L)); + + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0)); + + // take snapshot on leader since it has the most txns since last snap + verify(leaderSnapGenerator, times(1)).takeSnapshot(true); + + for (SnapPingListener listener : leader.getForwardingFollowers()) { + long sid = listener.getSid(); + if (sid == 8L) { + verify(listener, never()).snapPing(anyLong(), any()); + } else { + verify(listener, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + } + + /** + * Verify on a 5 members ensemble, there could be 2 concurrent snapshot. + */ + @Test + public void testSnapScheduleWithMultipleSnap() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, true, 5000000, 0, 0, 10000000, 0)); + + // won't take snapshot on leader since it's already in snapshot + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + if (lh.getSid() != 4L) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } else { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SNAP); + } + } + } + + /** + * When there are already 2 concurrent snapshot in 5 members ensemble, + * no more snapshot could be scheduled. + */ + @Test + public void testMaxConcurrentSnapSchedule() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, true, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, true, 5000000, 0, 0, 10000000, 0)); + + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + // skip snap on all servers + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + /** + * With 2 out of 5 servers have high sync latency, we'll skip SNAP on + * all servers. + */ + @Test + public void testSnapScheduleWithHighFlushLatency() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 1000, 0, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 1000, 0, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0)); + + // won't take snapshot on leader since it's already in snapshot + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + /** + * With 2 out of 5 servers have large sync request queue size, we'll skip + * SNAP on all servers. + */ + @Test + public void testSnapScheduleWithLargeLearnerQueueSize() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 100000, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 100000, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0)); + + // won't take snapshot on leader since it's already in snapshot + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + /** + * With 2 out of 5 servers have large learner queue size, we'll skip SNAP on + * all servers. + */ + @Test + public void testSnapScheduleWithLargeSyncRequestQueueSize() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 1000000)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 1000000)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0)); + + // won't take snapshot on leader since it's already in snapshot + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + /** + * Test the logic of processing snapPing on follower. + */ + @Test + public void testSnapPingPacketOnFollower() throws Exception { + QuorumPeer qp = new QuorumPeer(); + QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class); + qp.setQuorumVerifier(quorumVerifierMock, false); + File tmpDir = ClientBase.createEmptyTestDir(); + FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(tmpDir, tmpDir); + ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog); + + final AtomicInteger snapshotDurationInSeconds = new AtomicInteger(0); + FollowerZooKeeperServer fzk = new FollowerZooKeeperServer( + fileTxnSnapLog, qp, zkDb) { + @Override + public void takeSnapshot(boolean syncSnap) { + try { + Thread.sleep(snapshotDurationInSeconds.get() * 1000); + } catch (InterruptedException e) {} + } + }; + fzk.setupRequestProcessors(); + Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()); + + LinkedBlockingQueue snapPacketsSent = + new LinkedBlockingQueue(); + + Follower follower = new Follower(qp, fzk) { + @Override + public void writePacket(QuorumPacket pp, boolean flush) { + if (pp.getType() == Leader.SNAPPING) { + try { + snapPacketsSent.put(pp); + } catch (InterruptedException e) { /* ignore */ } + } + } + }; + + snapshotDurationInSeconds.set(1); + + // Send a SNAP message + follower.snapPing(buildSnapPingPacket(SnapPingCode.SNAP)); + Assert.assertTrue(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()); + + // Send a CHECK message + follower.snapPing(buildSnapPingPacket(SnapPingCode.CHECK)); + QuorumPacket packetSent = snapPacketsSent.poll(1, TimeUnit.SECONDS); + Assert.assertNotNull(packetSent); + + ByteArrayInputStream bais = new ByteArrayInputStream(packetSent.getData()); + DataInputStream dataIS = new DataInputStream(bais); + long snapPingId = dataIS.readLong(); + boolean snapInProgress = dataIS.readBoolean(); + int txnsSinceLastSnap = dataIS.readInt(); + long lastRequestFlushLatency = dataIS.readLong(); + long syncProcessorQueueSize = dataIS.readInt(); + Assert.assertEquals(snapPingId, 1); + Assert.assertTrue(snapInProgress); + Assert.assertEquals(0, txnsSinceLastSnap); + Assert.assertEquals(0, lastRequestFlushLatency); + Assert.assertEquals(0, syncProcessorQueueSize); + + // Send a CANCEL message and make sure we enabled self SNAP again + follower.snapPing(buildSnapPingPacket(SnapPingCode.CANCEL)); + Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()); + } + + /** + * Make sure the packet will be ignored if the version is different. + */ + @Test + public void testDifferentVersion() throws Exception { + QuorumPeer qp = new QuorumPeer(); + QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class); + qp.setQuorumVerifier(quorumVerifierMock, false); + File tmpDir = ClientBase.createEmptyTestDir(); + FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(tmpDir, tmpDir); + ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog); + FollowerZooKeeperServer fzk = new FollowerZooKeeperServer( + fileTxnSnapLog, qp, zkDb); + fzk.setupRequestProcessors(); + Follower follower = new Follower(qp, fzk); + + Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeInt(SnapPingManager.SNAP_PING_VERSION + 1); + oa.writeLong(1L); + oa.writeInt(SnapPingCode.CHECK.ordinal()); + oa.close(); + + follower.snapPing(new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null)); + // Make sure we don't enable safety snapshot when snap ping version + // is different + Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()); + } + + @Test + public void testCancelSnapPingSchedule() throws Exception { + // Disable the snapPing manager via set the interval to -1 + leader.snapPingManager.setSnapPingIntervalInSeconds(-1); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.CANCEL); + } + } + + /** + * Expect to trigger snap schedule even we didn't receive enough responses + * if we received a higher SnapPing id message. + */ + @Test + public void testNotReceiveAllSnapPingResponse() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0)); + + // Received a new SnapPing with higher SnapPing Id before we received + // the all responses for the previous PING. + leader.processSnapPing(new SnapPingData(1L, 2L, false, 3000000, 0, 0, 6000000, 0)); + + // won't take snapshot on leader since it's already in snapshot + verify(leaderSnapGenerator, times(1)).takeSnapshot(true); + + for (LearnerHandler lh : leader.getForwardingFollowers()) { + verify(lh, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + @Test + public void testScheduleBasedOnTxnSize() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000, 0, 0, TXNS_SIZE_THRESHOLD / 4, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000, 0, 0, TXNS_SIZE_THRESHOLD / 2, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000, 0, 0, TXNS_SIZE_THRESHOLD * 3 / 4, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000, 0, 0, TXNS_SIZE_THRESHOLD, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000, 0, 0, TXNS_SIZE_THRESHOLD * 5 / 4, 0)); + + // take snapshot on leader since it has the most txns size since last snap + verify(leaderSnapGenerator, times(1)).takeSnapshot(true); + + for (SnapPingListener listener : leader.getForwardingFollowers()) { + long sid = listener.getSid(); + verify(listener, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + @Test + public void testScheduleBasedOnTxnSizeSmallerThanThreshold() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 100, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 200, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0)); + leader.processSnapPing(new SnapPingData(3L, 1L, false, 300, 0, 0, TXNS_SIZE_THRESHOLD * 3 / 40, 0)); + leader.processSnapPing(new SnapPingData(4L, 1L, false, 400, 0, 0, TXNS_SIZE_THRESHOLD / 10, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 500, 0, 0, TXNS_SIZE_THRESHOLD * 5 / 40, 0)); + + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + for (SnapPingListener listener : leader.getForwardingFollowers()) { + long sid = listener.getSid(); + verify(listener, times(1)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + /** + * Schedule snapshot after several idle snap pings even if two followers are down. + */ + @Test + public void testScheduleBasedOnIdle() throws Exception { + leader.processSnapPing(new SnapPingData(1L, 1L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0)); + leader.processSnapPing(new SnapPingData(2L, 1L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0)); + leader.processSnapPing(new SnapPingData(5L, 1L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0)); + leader.processSnapPing(new SnapPingData(1L, 2L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0)); + leader.processSnapPing(new SnapPingData(2L, 2L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0)); + leader.processSnapPing(new SnapPingData(5L, 2L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0)); + leader.processSnapPing(new SnapPingData(1L, 3L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0)); + leader.processSnapPing(new SnapPingData(2L, 3L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0)); + leader.processSnapPing(new SnapPingData(5L, 3L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0)); + + verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean()); + + // At this point, the number of idle pings should be equal to the threshold of 3, so the next ping should + // result in a snapshot being scheduled. + leader.processSnapPing(new SnapPingData(1L, 4L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0)); + + verify(leaderSnapGenerator, times(1)).takeSnapshot(true); + for (SnapPingListener listener : leader.getForwardingFollowers()) { + long sid = listener.getSid(); + verify(listener, times(3)).snapPing( + SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP); + } + } + + private QuorumPacket buildSnapPingPacket(SnapPingCode code) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeInt(SnapPingManager.SNAP_PING_VERSION); + oa.writeLong(1L); + oa.writeInt(code.ordinal()); + oa.close(); + return new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null); + } + + private LearnerHandler genLearnerHandler(long sid) { + LearnerHandler lh = mock(LearnerHandler.class); + when(lh.getSid()).thenReturn(sid); + return lh; + } +}