snaps = txnLog.findNValidSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 3962bb974f1..6a177a13df2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -243,6 +243,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {
REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR = metricsContext.getCounter(
"requests_not_forwarded_to_commit_processor");
+ TAKING_SAFE_SNAPSHOT = metricsContext.getCounter("taking_safe_snapshot");
+ MANAGER_INITIATED_SAFE_SNAPSHOT = metricsContext.getCounter("manager_initiated_safe_snapshot");
}
/**
@@ -471,6 +473,9 @@ private ServerMetrics(MetricsProvider metricsProvider) {
private final MetricsProvider metricsProvider;
+ public final Counter TAKING_SAFE_SNAPSHOT;
+ public final Counter MANAGER_INITIATED_SAFE_SNAPSHOT;
+
public void resetAll() {
metricsProvider.resetAllValues();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java
new file mode 100644
index 00000000000..37d2db001a5
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SnapshotGenerator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Util class Used to control the behavior abouthow we take snapshot.
+ */
+public class SnapshotGenerator {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotGenerator.class);
+
+ public static final String PURGE_AFTER_SNAPSHOT = "zookeeper.purgeAfterSnapshot.enabled";
+ private static boolean purgeAfterSnapshot;
+
+ public static final String FSYNC_SNAPSHOT_FROM_SCHEDULER = "zookeeper.fsyncSnapshotFromScheduler";
+ private static boolean fsyncSnapshotFromScheduler;
+
+ static {
+ purgeAfterSnapshot = Boolean.getBoolean(PURGE_AFTER_SNAPSHOT);
+ LOG.info("{} = {}", PURGE_AFTER_SNAPSHOT, purgeAfterSnapshot);
+
+ fsyncSnapshotFromScheduler = Boolean.parseBoolean(
+ System.getProperty(FSYNC_SNAPSHOT_FROM_SCHEDULER, "true"));
+ LOG.info("{} = {}", FSYNC_SNAPSHOT_FROM_SCHEDULER, fsyncSnapshotFromScheduler);
+ }
+
+ public static boolean getPurgeAfterSnapshot() {
+ return purgeAfterSnapshot;
+ }
+
+ public static void setPurgeAfterSnapshot(boolean enabled) {
+ purgeAfterSnapshot = enabled;
+ LOG.info("{} = {}", PURGE_AFTER_SNAPSHOT, purgeAfterSnapshot);
+ }
+
+ public static void setFsyncSnapshotFromScheduler(boolean fsync) {
+ fsyncSnapshotFromScheduler = fsync;
+ LOG.info("{} = {}", FSYNC_SNAPSHOT_FROM_SCHEDULER, fsyncSnapshotFromScheduler);
+ }
+
+ public static boolean getFsyncSnapshotFromScheduler() {
+ return fsyncSnapshotFromScheduler;
+ }
+
+ private final ZooKeeperServer zks;
+ private final ExecutorService worker;
+ private final AtomicBoolean isTakingSnapshot;
+
+ public SnapshotGenerator(final ZooKeeperServer zks) {
+ this.zks = zks;
+ this.worker = Executors.newFixedThreadPool(1);
+ this.isTakingSnapshot = new AtomicBoolean(false);
+ }
+
+ public boolean takeSnapshot(boolean syncSnap) {
+ // Only allow a single snapshot in progress.
+ if (isTakingSnapshot.compareAndSet(false, true)) {
+ this.worker.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zks.takeSnapshot(syncSnap);
+ if (purgeAfterSnapshot) {
+ zks.purge();
+ }
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ isTakingSnapshot.compareAndSet(true, false);
+ }
+ }
+ });
+ return true;
+ } else {
+ LOG.warn("Previous snapshot is still in-flight, too busy to snap, skipping");
+ return false;
+ }
+ }
+
+ public boolean isSnapInProgress() {
+ return isTakingSnapshot.get();
+ }
+
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 4df319f86a4..45bd67f15db 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -25,10 +25,10 @@
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.metric.AvgMinMaxCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,9 +69,14 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
private int randRoll;
private long randSize;
+ private AvgMinMaxCounter lastRequestFlushLatency = new AvgMinMaxCounter("flushDelay");
+
+ private static final double DEFAULT_SNAPSHOT_FACTOR = 0.5;
+ private static final double SAFETY_SNAPSHOT_FACTOR = 10;
+
private final BlockingQueue queuedRequests = new LinkedBlockingQueue();
- private final Semaphore snapThreadMutex = new Semaphore(1);
+ private final SnapshotGenerator snapGenerator;
private final ZooKeeperServer zks;
@@ -85,11 +90,31 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
private final Queue toFlush;
private long lastFlushTime;
+ private boolean onlySnapWhenSafetyIsThreatened = false;
+
+ private static volatile Threshold selfSnapTxnRollThreshold = new Threshold(
+ (int) (snapCount * DEFAULT_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * DEFAULT_SNAPSHOT_FACTOR));
+ private static volatile Threshold safetySnapThreshold = new Threshold(
+ (int) (snapCount * SAFETY_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * SAFETY_SNAPSHOT_FACTOR));
+
+
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
+ this.snapGenerator = zks.getSnapshotGenerator();
+ }
+
+ public void setOnlySnapWhenSafetyIsThreatened(boolean mode) {
+ if (mode != this.onlySnapWhenSafetyIsThreatened) {
+ LOG.info("Set onlySnapWhenSafetyIsThreatened to {}", mode);
+ }
+ this.onlySnapWhenSafetyIsThreatened = mode;
+ }
+
+ public boolean isOnlySnapWhenSafetyIsThreatened() {
+ return this.onlySnapWhenSafetyIsThreatened;
}
/**
@@ -99,6 +124,10 @@ public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor)
*/
public static void setSnapCount(int count) {
snapCount = count;
+ selfSnapTxnRollThreshold = new Threshold(
+ (int) (snapCount * DEFAULT_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * DEFAULT_SNAPSHOT_FACTOR));
+ safetySnapThreshold = new Threshold(
+ (int) (snapCount * SAFETY_SNAPSHOT_FACTOR), (long) (snapSizeInBytes * SAFETY_SNAPSHOT_FACTOR));
}
/**
@@ -140,25 +169,44 @@ public static void setSnapSizeInBytes(long size) {
snapSizeInBytes = size;
}
- private boolean shouldSnapshot() {
- int logCount = zks.getZKDatabase().getTxnCount();
- long logSize = zks.getZKDatabase().getTxnSize();
- return (logCount > (snapCount / 2 + randRoll))
- || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
- }
+ public static class Threshold {
+
+ private final int countLimit;
+ private final long sizeLimit;
- private void resetSnapshotStats() {
- randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
- randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
+ private int randCount;
+ private long randSize;
+
+ public Threshold(int countLimit, long sizeLimit) {
+ this.countLimit = countLimit;
+ this.sizeLimit = sizeLimit;
+ resetRandomNum();
+ }
+
+ public boolean meet(int count, long size) {
+ boolean result = (count > (countLimit + randCount))
+ || (sizeLimit > 0 && size > (sizeLimit + randSize));
+ if (result) {
+ resetRandomNum();
+ }
+ return result;
+ }
+
+ /**
+ * we do this in an attempt to ensure that not all of the servers
+ * in the ensemble take a snapshot at the same time
+ */
+ private void resetRandomNum() {
+ randCount = ThreadLocalRandom.current().nextInt(countLimit);
+ randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (sizeLimit));
+ }
}
@Override
public void run() {
try {
- // we do this in an attempt to ensure that not all of the servers
- // in the ensemble take a snapshot at the same time
- resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
+ final ZKDatabase zkDB = zks.getZKDatabase();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
@@ -178,27 +226,22 @@ public void run() {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// track the number of records written to the log
- if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
- if (shouldSnapshot()) {
- resetSnapshotStats();
+ if (!si.isThrottled() && zkDB.append(si)) {
+ if (selfSnapTxnRollThreshold.meet(zkDB.getTxnCount(), zkDB.getTxnSize())) {
// roll the log
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (!snapThreadMutex.tryAcquire()) {
- LOG.warn("Too busy to snap, skipping");
+ zkDB.rollLog();
+
+ if (onlySnapWhenSafetyIsThreatened) {
+ if (safetySnapThreshold.meet(zkDB.getTxnsSinceLastSnap(), zkDB.getTxnsSizeSinceLastSnap())) {
+ snapGenerator.takeSnapshot(false);
+ ServerMetrics.getMetrics().TAKING_SAFE_SNAPSHOT.add(1);
+ LOG.info("The txns size since last snapshot is larger than 10x, "
+ + "going to take a snapshot for safe.");
+ }
} else {
- new ZooKeeperThread("Snapshot Thread") {
- public void run() {
- try {
- zks.takeSnapshot();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- snapThreadMutex.release();
- }
- }
- }.start();
+ snapGenerator.takeSnapshot(false);
}
+
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
@@ -241,6 +284,7 @@ private void flush() throws IOException, RequestProcessorException {
while (!this.toFlush.isEmpty()) {
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+ lastRequestFlushLatency.addDataPoint(latency);
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
this.nextProcessor.processRequest(i);
}
@@ -251,6 +295,12 @@ private void flush() throws IOException, RequestProcessorException {
lastFlushTime = Time.currentElapsedTime();
}
+ public long getLastRequestFlushLatency() {
+ long result = (long) lastRequestFlushLatency.getAvg();
+ lastRequestFlushLatency.reset();
+ return result;
+ }
+
public void shutdown() {
LOG.info("Shutting down");
queuedRequests.add(REQUEST_OF_DEATH);
@@ -278,4 +328,7 @@ public void processRequest(final Request request) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
}
+ public int getQueuedRequestsSize() {
+ return queuedRequests.size();
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index f758f5de5ae..46fdae39287 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -79,6 +79,16 @@ public class ZKDatabase {
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
+ /**
+ * Used to track the approximate number of txns since the beginning of
+ * last snapshot happened. Leader is relying on this to schedule the
+ * snapshot on servers who have largest txns since last snapshot.
+ *
+ * It's different than txnCount defined in this class, which is used to
+ * track the snapshot locally.
+ */
+ protected AtomicInteger txnsSinceLastSnap = new AtomicInteger(0);
+
/**
* Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
*/
@@ -271,6 +281,7 @@ public ConcurrentHashMap getSessionWithTimeOuts() {
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn, TxnDigest digest) {
addCommittedProposal(hdr, txn, digest);
+ txnsSinceLastSnap.incrementAndGet();
}
};
@@ -631,6 +642,18 @@ public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedE
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
}
+ public int getTxnsSinceLastSnap() {
+ return txnsSinceLastSnap.get();
+ }
+
+ public void resetTxnSinceLastSnap() {
+ txnsSinceLastSnap.set(0);
+ }
+
+ public long getTxnsSizeSinceLastSnap() {
+ return snapLog.getTxnsSizeSinceLastSnap();
+ }
+
/**
* append to the underlying transaction log
* @param si the request to append
@@ -638,6 +661,7 @@ public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedE
*/
public boolean append(Request si) throws IOException {
txnCount.incrementAndGet();
+ txnsSinceLastSnap.incrementAndGet();
return this.snapLog.append(si);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f37225edbf0..5b746250d26 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -180,6 +180,7 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {
private final RequestPathMetricsCollector requestPathMetricsCollector;
private boolean localSessionEnabled = false;
+ private SnapshotGenerator snapGenerator;
protected enum State {
INITIAL,
RUNNING,
@@ -323,6 +324,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
+ this.snapGenerator = new SnapshotGenerator(this);
this.initialConfig = initialConfig;
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
@@ -345,6 +347,19 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio
txnLogFactory.getSnapDir());
}
+ public SnapshotGenerator getSnapshotGenerator() {
+ return this.snapGenerator;
+ }
+
+ // Visible for testing.
+ public void setSnapshotGenerator(SnapshotGenerator generator) {
+ this.snapGenerator = generator;
+ }
+
+ public void purge() throws IOException {
+ PurgeTxnLog.purge(txnLogFactory, txnLogFactory.getSnapRetainCount());
+ }
+
public String getInitialConfig() {
return initialConfig;
}
@@ -511,6 +526,7 @@ public void takeSnapshot() {
public void takeSnapshot(boolean syncSnap) {
long start = Time.currentElapsedTime();
try {
+ zkDb.resetTxnSinceLastSnap();
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
LOG.error("Severe unrecoverable error, exiting", e);
@@ -523,6 +539,10 @@ public void takeSnapshot(boolean syncSnap) {
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
}
+ public void resetTxnSinceLastSnap() {
+ zkDb.resetTxnSinceLastSnap();
+ }
+
@Override
public long getDataDirSize() {
if (zkDb == null) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
index 62969bac8a6..6957c500f2e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java
@@ -167,6 +167,16 @@ public class FileTxnLog implements TxnLog, Closeable {
*/
private long prevLogsRunningTotal;
+ private long txnsSizeSinceLastSnapshot;
+
+ public synchronized void resetTxnsSizeSinceLastSnap() {
+ txnsSizeSinceLastSnapshot = 0;
+ }
+
+ public synchronized long getTxnsSizeSinceLastSnap() {
+ return txnsSizeSinceLastSnapshot + getCurrentLogSize();
+ }
+
/**
* constructor for FileTxnLog. Take the directory
* where the txnlogs are stored
@@ -235,7 +245,9 @@ protected Checksum makeChecksumAlgorithm() {
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush();
- prevLogsRunningTotal += getCurrentLogSize();
+ long currentLogSize = getCurrentLogSize();
+ prevLogsRunningTotal += currentLogSize;
+ txnsSizeSinceLastSnapshot += currentLogSize;
this.logStream = null;
oa = null;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
index eddeae8d93e..a7232d642c4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
@@ -78,6 +78,16 @@ public class FileTxnSnapLog {
private static final String EMPTY_SNAPSHOT_WARNING = "No snapshot found, but there are log entries. ";
+ private int snapRetainCount = -1;
+
+ public void setSnapRetainCount(int snapRetainCount) {
+ this.snapRetainCount = snapRetainCount;
+ }
+
+ public int getSnapRetainCount() {
+ return this.snapRetainCount;
+ }
+
/**
* This listener helps
* the external apis calling
@@ -469,6 +479,7 @@ public void save(
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
try {
+ txnLog.resetTxnsSizeSinceLastSnap();
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
} catch (IOException e) {
if (snapshotFile.length() == 0) {
@@ -658,4 +669,8 @@ public void setTotalLogSize(long size) {
public long getTotalLogSize() {
return txnLog.getTotalLogSize();
}
+
+ public long getTxnsSizeSinceLastSnap() {
+ return txnLog.getTxnsSizeSinceLastSnap();
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
index b5572087fb0..5a6df8429ec 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/TxnLog.java
@@ -122,6 +122,10 @@ public interface TxnLog extends Closeable {
*/
long getTotalLogSize();
+ void resetTxnsSizeSinceLastSnap();
+
+ long getTxnsSizeSinceLastSnap();
+
/**
* an iterating interface for reading
* transaction logs.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
index eb6742f5c6d..c05818383a5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java
@@ -18,15 +18,21 @@
package org.apache.zookeeper.server.quorum;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.jute.Record;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.SnapshotGenerator;
import org.apache.zookeeper.server.TxnLogEntry;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
@@ -51,6 +57,7 @@ public class Follower extends Learner {
this.self = self;
this.zk = zk;
this.fzk = zk;
+ this.snapGenerator = zk.getSnapshotGenerator();
}
@Override
@@ -150,6 +157,85 @@ void followLeader() throws InterruptedException {
}
}
+ // Visible for testing
+ protected void snapPing(QuorumPacket qp) throws IOException {
+ int peerSnapPingVersion = -1;
+ long snapPingId = -1;
+ int snapCode = -1;
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ peerSnapPingVersion = dis.readInt();
+ snapPingId = dis.readLong();
+ snapCode = dis.readInt();
+ } catch (IOException e) {
+ LOG.error("Error while parsing SnapPing packet", e);
+ return;
+ }
+
+ if (peerSnapPingVersion != SnapPingManager.SNAP_PING_VERSION) {
+ LOG.warn("The SnapPing version on leader is {} which is not "
+ + "compatible with ours {}, will skip", peerSnapPingVersion,
+ SnapPingManager.SNAP_PING_VERSION);
+ if (fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) {
+ LOG.info("SnapPing version incompatible, start self snapshot");
+ fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(false);
+ }
+ return;
+ }
+
+ if (snapCode == SnapPingCode.CANCEL.ordinal()) {
+ if (fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) {
+ LOG.info("Snapshot schedule cancelled by leader, start self snapshot");
+ fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(false);
+ }
+ return;
+ }
+
+ // If SNAP schedule is enabled, then disabled the periodically SNAP
+ // in SyncRequestProcessor, and only take snapshot if there is no
+ // snapshot scheduled with too many txns since last SNAP.
+ //
+ // This is useful to deal with the following cases:
+ //
+ // 1. The follower accumulated too many txns before it is scheduled
+ // to take snapshot.
+ // 2. There are 2/5 servers down for a long time, and the leader is
+ // not issuing snap for a long time.
+ if (!fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened()) {
+ LOG.info("Snapshot schedule enabled on leader, stop self "
+ + "snapshot unless safety is threatened");
+ fzk.syncProcessor.setOnlySnapWhenSafetyIsThreatened(true);
+ }
+
+ // Check and take a snapshot if needed, do not consider the snapshot
+ // the syncProcessor is doing.
+ if (snapCode == SnapPingCode.SNAP.ordinal()) {
+ // Sync the snapshot when snapshot schedule is enabled, to recover
+ // faster and reduce the impact on txns fsync.
+ if (snapGenerator.takeSnapshot(
+ SnapshotGenerator.getFsyncSnapshotFromScheduler())) {
+ LOG.info("Taking a snapshot with SNAPPING");
+ }
+ } else if (snapCode == SnapPingCode.CHECK.ordinal()) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeLong(snapPingId);
+ dos.writeBoolean(snapGenerator.isSnapInProgress());
+ // Note: the txnsSinceLastSnap will only updated after we finished SNAP
+ dos.writeInt(zk.getZKDatabase().getTxnsSinceLastSnap());
+ dos.writeLong(fzk.syncProcessor == null ? 0
+ : fzk.syncProcessor.getLastRequestFlushLatency());
+ dos.writeInt(fzk.syncProcessor == null ? 0
+ : fzk.syncProcessor.getQueuedRequestsSize());
+ dos.writeLong(zk.getZKDatabase().getTxnsSizeSinceLastSnap());
+ qp.setData(bos.toByteArray());
+ writePacket(qp, true);
+ }
+ }
+
+ private SnapshotGenerator snapGenerator;
+
/**
* Examine the packet received in qp and dispatch based on its contents.
* @param qp
@@ -157,6 +243,9 @@ void followLeader() throws InterruptedException {
*/
protected void processPacket(QuorumPacket qp) throws Exception {
switch (qp.getType()) {
+ case Leader.SNAPPING:
+ snapPing(qp);
+ break;
case Leader.PING:
ping(qp);
break;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 0eb3722a47c..3611e905b4e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -47,6 +47,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -54,6 +55,7 @@
import javax.security.sasl.SaslException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.ExitCode;
@@ -61,10 +63,13 @@
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.SnapshotGenerator;
+import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
@@ -76,7 +81,7 @@
/**
* This class has the control logic for the Leader.
*/
-public class Leader extends LearnerMaster {
+public class Leader extends LearnerMaster implements SnapPingListener {
private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
@@ -115,10 +120,22 @@ public static int getAckLoggingFrequency() {
return ackLoggingFrequency;
}
+ SnapPingManager snapPingManager;
+
+ public void setSnapPingIntervalInSeconds(int seconds) {
+ snapPingManager.setSnapPingIntervalInSeconds(seconds);
+ }
+
+ public int getSnapPingIntervalInSeconds() {
+ return snapPingManager.getSnapPingIntervalInSeconds();
+ }
+
final LeaderZooKeeperServer zk;
final QuorumPeer self;
+ private SnapshotGenerator leaderSnapGenerator;
+
// VisibleForTesting
protected boolean quorumFormed = false;
@@ -158,6 +175,13 @@ public List getForwardingFollowers() {
}
}
+ public List getSnapPingListeners() {
+ List listeners = new ArrayList();
+ listeners.addAll(getForwardingFollowers());
+ listeners.add(this);
+ return listeners;
+ }
+
public List getNonVotingFollowers() {
List nonVotingFollowers = new ArrayList();
synchronized (forwardingFollowers) {
@@ -170,7 +194,8 @@ public List getNonVotingFollowers() {
return nonVotingFollowers;
}
- void addForwardingFollower(LearnerHandler lh) {
+ // VisibleForTesting
+ protected void addForwardingFollower(LearnerHandler lh) {
synchronized (forwardingFollowers) {
forwardingFollowers.add(lh);
}
@@ -303,6 +328,9 @@ public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
}
this.zk = zk;
+
+ this.leaderSnapGenerator = zk.getSnapshotGenerator();
+ this.snapPingManager = new SnapPingManager(this);
}
Optional createServerSocket(InetSocketAddress address, boolean portUnification, boolean sslQuorum) {
@@ -425,6 +453,12 @@ Optional createServerSocket(InetSocketAddress address, boolean por
*/
static final int INFORMANDACTIVATE = 19;
+ /**
+ * Get the information from follower to decide the next server which is
+ * going to take snapshot.
+ */
+ static final int SNAPPING = 100;
+
final ConcurrentMap outstandingProposals = new ConcurrentHashMap();
private final ConcurrentLinkedQueue toBeApplied = new ConcurrentLinkedQueue();
@@ -432,6 +466,8 @@ Optional createServerSocket(InetSocketAddress address, boolean por
// VisibleForTesting
protected final Proposal newLeaderProposal = new Proposal();
+ private ScheduledExecutorService snapPinger;
+
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private final AtomicBoolean stop = new AtomicBoolean(false);
@@ -805,6 +841,10 @@ void shutdown(String reason) {
closeSockets();
}
+ if (snapPingManager != null) {
+ snapPingManager.shutdown();
+ }
+
// NIO should not accept conenctions
self.setZooKeeperServer(null);
self.adminServer.setZooKeeperServer(null);
@@ -1750,4 +1790,73 @@ public void unregisterLearnerHandlerBean(final LearnerHandler learnerHandler) {
}
}
+ private SyncRequestProcessor getSyncRequestProcessor() {
+ if (zk.proposalProcessor == null) {
+ return null;
+ }
+ return zk.proposalProcessor.syncProcessor;
+ }
+
+ public void processSnapPing(SnapPingData spData) {
+ snapPingManager.processSnapPing(spData);
+ }
+
+ @Override
+ public void snapPing(long snapPingId, SnapPingCode code) {
+ SyncRequestProcessor syncProcessor = getSyncRequestProcessor();
+
+ if (syncProcessor != null && code != SnapPingCode.CANCEL) {
+ if (!syncProcessor.isOnlySnapWhenSafetyIsThreatened()) {
+ LOG.info("Snapshot schedule enabled on leader, stop self "
+ + "snapshot unless safety is threatened");
+ syncProcessor.setOnlySnapWhenSafetyIsThreatened(true);
+ }
+ }
+
+ switch (code) {
+ case CANCEL:
+ if (syncProcessor != null) {
+ LOG.info("Snapshot schedule cancelled by leader, start self snapshot");
+ syncProcessor.setOnlySnapWhenSafetyIsThreatened(false);
+ }
+ break;
+ case SNAP:
+ if (leaderSnapGenerator.takeSnapshot(
+ SnapshotGenerator.getFsyncSnapshotFromScheduler())) {
+ LOG.info("Taking a snapshot with SNAPPING");
+ }
+ break;
+ case CHECK:
+ long lastRequestFlushLatency = 0;
+ int syncProcessorQueuedRequests = 0;
+
+ if (syncProcessor != null) {
+ lastRequestFlushLatency = syncProcessor.getLastRequestFlushLatency();
+ syncProcessorQueuedRequests = syncProcessor.getQueuedRequestsSize();
+ }
+
+ ZKDatabase zkDB = zk.getZKDatabase();
+ SnapPingData spData = new SnapPingData(getSid(), snapPingId,
+ leaderSnapGenerator.isSnapInProgress(),
+ zkDB.getTxnsSinceLastSnap(),
+ lastRequestFlushLatency, syncProcessorQueuedRequests,
+ zkDB.getTxnsSizeSinceLastSnap(), 0);
+
+ processSnapPing(spData);
+ break;
+ case SKIP:
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public long getSid() {
+ return self.getId();
+ }
+
+ QuorumVerifier getQuorumVerifier() {
+ return self.getQuorumVerifier();
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
index 8d5b48ff589..cd26ebcfd5e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderBean.java
@@ -106,4 +106,13 @@ public void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs) {
leader.setMaxConcurrentDiffSyncs(maxConcurrentDiffSyncs);
}
+ @Override
+ public int getSnapPingIntervalInSeconds() {
+ return leader.getSnapPingIntervalInSeconds();
+ }
+
+ @Override
+ public void setSnapPingIntervalInSeconds(int seconds) {
+ leader.setSnapPingIntervalInSeconds(seconds);
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
index e64d2f3820d..03a9ace896b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderMXBean.java
@@ -85,4 +85,13 @@ public interface LeaderMXBean extends ZooKeeperServerMXBean {
*/
void setMaxConcurrentDiffSyncs(int maxConcurrentDiffSyncs);
+ /**
+ * @return the frequency of sending SNAPPING internal
+ */
+ int getSnapPingIntervalInSeconds();
+
+ /**
+ * @param seconds update the frequency of sending SNAPPING
+ */
+ void setSnapPingIntervalInSeconds(int seconds);
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 8834b5fbdb1..c5b5768fce7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -50,6 +50,8 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
PrepRequestProcessor prepRequestProcessor;
+ ProposalRequestProcessor proposalProcessor;
+
/**
* @throws IOException
*/
@@ -67,7 +69,7 @@ protected void setupRequestProcessors() {
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
- ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
+ proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 3bab398b41e..8d972f57230 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -21,7 +21,9 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
@@ -40,6 +42,7 @@
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.TxnLogProposalIterator;
@@ -48,6 +51,7 @@
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -59,7 +63,7 @@
* learner. All communication with a learner is handled by this
* class.
*/
-public class LearnerHandler extends ZooKeeperThread {
+public class LearnerHandler extends ZooKeeperThread implements SnapPingListener {
private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
@@ -82,10 +86,27 @@ public Socket getSocket() {
*/
protected long sid = 0;
- long getSid() {
+ protected volatile boolean snapInProgress;
+ protected volatile int txnsSinceLastSnap;
+ protected volatile long lastRequestFlushLatency;
+
+ @Override
+ public long getSid() {
return sid;
}
+ boolean isSnapInProgress() {
+ return snapInProgress;
+ }
+
+ int getTxnsSinceLastSnap() {
+ return txnsSinceLastSnap;
+ }
+
+ long getLastRequestFlushLatency() {
+ return lastRequestFlushLatency;
+ }
+
String getRemoteAddress() {
return sock == null ? "" : sock.getRemoteSocketAddress().toString();
}
@@ -677,6 +698,21 @@ public void run() {
learnerMaster.touch(sess, to);
}
break;
+ case Leader.SNAPPING:
+ // Only deal with SNAPPING on leader
+ if (learnerMaster instanceof Leader) {
+ try {
+ ByteArrayInputStream bais = new ByteArrayInputStream(qp.getData());
+ DataInputStream dataIS = new DataInputStream(bais);
+ SnapPingData spData = new SnapPingData(
+ sid, dataIS, getQueuedPacketsSize());
+ ((Leader) learnerMaster).processSnapPing(spData);
+ } catch (IOException e) {
+ LOG.info("Error while processing snapPing data "
+ + "from {}", sid, e);
+ }
+ }
+ break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
@@ -1079,6 +1115,20 @@ public void ping() {
}
}
+ @Override
+ public void snapPing(long snapPingId, SnapPingCode code) throws IOException {
+ if (!sendingThreadStarted) {
+ return;
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeInt(SnapPingManager.SNAP_PING_VERSION);
+ oa.writeLong(snapPingId);
+ oa.writeInt(code.ordinal());
+ oa.close();
+ queuePacket(new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null));
+ }
+
/**
* Queue leader packet of a given type
* @param type
@@ -1143,6 +1193,10 @@ public Queue getQueuedPackets() {
return queuedPackets;
}
+ public int getQueuedPacketsSize() {
+ return queuedPackets.size();
+ }
+
/**
* For testing, we need to reset this value
*/
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index a6f94ec3bd1..713e15d9814 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -173,7 +173,11 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServ
}
quorumPeer = getQuorumPeer();
- quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
+
+ FileTxnSnapLog txnSnapLog = new FileTxnSnapLog(
+ config.getDataLogDir(), config.getDataDir());
+ txnSnapLog.setSnapRetainCount(config.getSnapRetainCount());
+ quorumPeer.setTxnFactory(txnSnapLog);
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java
new file mode 100644
index 00000000000..6134986e23b
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingListener.java
@@ -0,0 +1,35 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
+
+/**
+ * The interface of SnapPing event distributer and handler.
+ */
+public interface SnapPingListener {
+
+ long SNAP_PING_ID_DONT_CARE = -1;
+
+ long getSid();
+
+ void snapPing(long snapPingId, SnapPingCode code) throws IOException;
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java
new file mode 100644
index 00000000000..e75a5170bde
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SnapPingManager.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the snap ping check, collect information, and based on the
+ * status and strategy to decide who should take snapshot.
+ */
+public class SnapPingManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapPingManager.class);
+
+ public static final int SNAP_PING_VERSION = 2;
+ public static final String SNAP_PING_INTERVAL_IN_SECONDS = "zookeeper.leader.snapPingIntervalInSeconds";
+
+ private int snapPingIntervalInSeconds;
+
+ private ScheduledExecutorService snapPinger;
+ private final SnapPingHandler spHandler;
+ private final Leader leader;
+
+ public SnapPingManager(Leader leader) {
+ this.leader = leader;
+ this.spHandler = new SnapPingHandler(leader);
+ this.snapPingIntervalInSeconds = Integer.getInteger(SNAP_PING_INTERVAL_IN_SECONDS, -1);
+ LOG.info("{} = {}", SNAP_PING_INTERVAL_IN_SECONDS, snapPingIntervalInSeconds);
+
+ if (snapPingIntervalInSeconds > 0) {
+ restartSnapPinger();
+ }
+ }
+
+ public void processSnapPing(SnapPingData spData) {
+ spHandler.processSnapPing(spData);
+ }
+
+ public void setSnapPingIntervalInSeconds(int seconds) {
+ if (seconds != snapPingIntervalInSeconds) {
+ snapPingIntervalInSeconds = seconds;
+ LOG.info("Updated snapPingIntervalInSeconds to {}", snapPingIntervalInSeconds);
+ }
+
+ if (snapPingIntervalInSeconds < 0) {
+ cancelSnapSchedule();
+ LOG.info("snapPingIntervalInSeconds is less than 0, going to "
+ + "disable snapshot schedule");
+ } else {
+ restartSnapPinger();
+ }
+ }
+
+ public int getSnapPingIntervalInSeconds() {
+ return snapPingIntervalInSeconds;
+ }
+
+ final Runnable snapPingRunnable = new Runnable() {
+ long snapPingId = 1;
+
+ @Override
+ public void run() {
+
+ // Don't need to have a strong guarantee of the current view of
+ // voting members when doing snap schedule.
+ Map currentVotingMembers =
+ leader.getQuorumVerifier().getVotingMembers();
+ List listeners = leader.getSnapPingListeners();
+
+ // Trigger a SnapPing to check the status
+ for (SnapPingListener listener : listeners) {
+ long sid = listener.getSid();
+ if (currentVotingMembers.containsKey(sid)) {
+ try {
+ listener.snapPing(snapPingId, SnapPingCode.CHECK);
+ } catch (IOException e) {
+ LOG.error("Exception when sending SNAPPING "
+ + "to {}, {}", sid, e);
+ }
+ }
+ }
+
+ if (++snapPingId < 0) {
+ snapPingId = 1;
+ }
+ }
+ };
+
+ public void shutdown() {
+ if (snapPinger != null) {
+ snapPinger.shutdownNow();
+ snapPinger = null;
+ }
+ }
+
+ private void restartSnapPinger() {
+ shutdown();
+ snapPinger = Executors.newSingleThreadScheduledExecutor();
+ snapPinger.scheduleAtFixedRate(snapPingRunnable, 0,
+ snapPingIntervalInSeconds, TimeUnit.SECONDS);
+ LOG.info("Started snapPinger with interval {}s", snapPingIntervalInSeconds);
+ }
+
+ private void cancelSnapSchedule() {
+ boolean snapPinerTerminated = true;
+ if (snapPinger != null) {
+ snapPinger.shutdownNow();
+ try {
+ snapPinerTerminated = snapPinger.awaitTermination(
+ 60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Got InterruptedException while waiting for "
+ + "snapPinger to terminate");
+ } finally {
+ snapPinger = null;
+ }
+ }
+
+ // This is unlikely to happen
+ if (!snapPinerTerminated) {
+ LOG.warn("SnapPinger is not terminated yet, learner may "
+ + "not enable self snapshot.");
+ }
+
+ Map currentVotingMembers =
+ leader.getQuorumVerifier().getVotingMembers();
+ // Resume self snapshot behavior on learner
+ List listeners = leader.getSnapPingListeners();
+ for (SnapPingListener listener : listeners) {
+ long sid = listener.getSid();
+ if (currentVotingMembers.containsKey(sid)) {
+ try {
+ listener.snapPing(SnapPingListener.SNAP_PING_ID_DONT_CARE,
+ SnapPingCode.CANCEL);
+ } catch (IOException e) {
+ LOG.error("Exception when sending SNAPPING "
+ + "to {}, {}", sid, e);
+ }
+ }
+ }
+ }
+
+ public static class SnapPingHandler {
+
+ public static final String SNAP_TXNS_THRESHOLD = "zookeeper.leader.snapTxnsThreshold";
+ public static final String SNAP_TXNS_SIZE_THRESHOLD_KB = "zookeeper.leader.snapTxnsSizeThresholdKB";
+ public static final String FLUSH_LATENCY_DRAIN_THRESHOLD = "zookeeper.flushLatencyDrainThreshold";
+ public static final String LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD = "zookeeper.learner.queueSizeDrainThreshold";
+ public static final String UNSCHEDULED_SNAP_THRESHOLD = "zookeeper.snapsync.unscheduledSnapshotThreshold";
+
+ private int snapTxnsThreshold;
+ private long snapTxnsSizeThresholdBytes;
+ private int flushLatencyDrainThreshold;
+ private int learnerQueueSizeDrainThreshold;
+ private int unscheduledSnapThreshold;
+ private int idleSnapPings = 0;
+
+ private final Leader leader;
+ private long currentSnapPingId = -1;
+ private Set data = new HashSet();
+
+ private int votingMemberSize = 0;
+ private Map currentVotingMembers;
+
+ public SnapPingHandler(Leader leader) {
+ this.leader = leader;
+
+ this.snapTxnsThreshold = Integer.getInteger(SNAP_TXNS_THRESHOLD, 100000);
+ // by default using 4GB
+ this.snapTxnsSizeThresholdBytes = Long.getLong(SNAP_TXNS_SIZE_THRESHOLD_KB, 4 * 1024 * 1024L) * 1024;
+ this.flushLatencyDrainThreshold = Integer.getInteger(
+ FLUSH_LATENCY_DRAIN_THRESHOLD, 200);
+ this.learnerQueueSizeDrainThreshold = Integer.getInteger(
+ LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD, 10000);
+ this.unscheduledSnapThreshold = Integer.getInteger(UNSCHEDULED_SNAP_THRESHOLD, 3);
+
+ LOG.info("{} = {}, {} = {} KB, {} = {}, {} = {}, {} = {}",
+ SNAP_TXNS_THRESHOLD, snapTxnsThreshold,
+ SNAP_TXNS_SIZE_THRESHOLD_KB, snapTxnsSizeThresholdBytes,
+ FLUSH_LATENCY_DRAIN_THRESHOLD, flushLatencyDrainThreshold,
+ LEARNER_QUEUE_SIZE_DRAIN_THRESHOLD, learnerQueueSizeDrainThreshold,
+ UNSCHEDULED_SNAP_THRESHOLD, unscheduledSnapThreshold);
+ }
+
+ private boolean snapsThresholdExceeded(int maxLearnerTxnsSinceLastSnap, long maxLearnerTxnsSizeSinceLastSnap) {
+ return maxLearnerTxnsSinceLastSnap > snapTxnsThreshold || ((snapTxnsSizeThresholdBytes > 0)
+ && maxLearnerTxnsSizeSinceLastSnap > snapTxnsSizeThresholdBytes);
+ }
+
+ public void processData() {
+ // Find out the total number of learners not taking snapshot.
+ int numOfMembersNotInSnap = 0;
+ int numOfMembersDraining = 0;
+ int maxLearnerTxnsSinceLastSnap = 0;
+ int numSnapsInProgress = 0;
+ long maxLearnerTxnsSizeSinceLastSnap = 0;
+ long learnerWithMaxTxnsNotInSnap = -1;
+ for (SnapPingData f : data) {
+ // Given the order of snapInProgress and txnsSinceLastSnap
+ // being assigned in LearnerHandler with received SNAPPING,
+ // and given how we're using it here, there is no need to
+ // do the atomic update and check for these.
+ long sid = f.getSid();
+ if (!currentVotingMembers.containsKey(sid)) {
+ continue;
+ }
+ if (f.isSnapInProgress()) {
+ numSnapsInProgress++;
+ continue;
+ }
+ numOfMembersNotInSnap++;
+ int txns = f.getTxnsSinceLastSnap();
+ long txnsSize = f.getTxnsSizeSinceLastSnap();
+ if (txns > maxLearnerTxnsSinceLastSnap
+ && txnsSize > maxLearnerTxnsSizeSinceLastSnap) {
+ maxLearnerTxnsSinceLastSnap = txns;
+ maxLearnerTxnsSizeSinceLastSnap = txnsSize;
+ learnerWithMaxTxnsNotInSnap = sid;
+ }
+ long lastRequestFlushLatency = f.getLastRequestFlushLatency();
+ if (lastRequestFlushLatency > flushLatencyDrainThreshold) {
+ numOfMembersDraining++;
+ LOG.info("{} has long flush delay {}, mark it as draining",
+ sid, lastRequestFlushLatency);
+ } else if (f.getQueuedRequestsSize() > learnerQueueSizeDrainThreshold) {
+ numOfMembersDraining++;
+ LOG.info("{} has long queue {}, mark it as draining",
+ sid, f.getQueuedRequestsSize());
+ } else if (f.getLearnerHandlerQueueSize() > learnerQueueSizeDrainThreshold) {
+ numOfMembersDraining++;
+ LOG.info("{} has long learner queue {}, mark it as draining",
+ sid, f.getLearnerHandlerQueueSize());
+ }
+ }
+
+ // Doesn't distinguish the server weight in the quorum hierarchical model
+ // for now.
+ int majority = currentVotingMembers.size() / 2 + 1;
+ int numOfCandidates = numOfMembersNotInSnap - majority - numOfMembersDraining;
+
+ long learnerSnapCandidate = -1;
+ if (snapsThresholdExceeded(maxLearnerTxnsSinceLastSnap, maxLearnerTxnsSizeSinceLastSnap)) {
+ // Schedule a snapshot when there is an available candidate OR when we have a majority running, but
+ // haven't been able to schedule a snapshot in a while due to a lack of candidates. This second
+ // condition should protect against all workers scheduling safety snapshots when running in a degraded
+ // state.
+ if (numOfCandidates > 0 || (numOfMembersNotInSnap >= majority && numSnapsInProgress == 0
+ && unscheduledSnapThreshold > -1 && ++idleSnapPings >= unscheduledSnapThreshold)) {
+ learnerSnapCandidate = learnerWithMaxTxnsNotInSnap;
+ idleSnapPings = 0;
+ LOG.info("Going to tell {} to take snapshot, txns since " + "last snapshot is {}, {}",
+ learnerSnapCandidate, maxLearnerTxnsSinceLastSnap, maxLearnerTxnsSizeSinceLastSnap);
+ if (numOfCandidates <= 0) {
+ ServerMetrics.getMetrics().MANAGER_INITIATED_SAFE_SNAPSHOT.add(1);
+ }
+ }
+ }
+
+ // Send SNAPPING to all current participants, non-voting
+ // members are not considered here, those non-voting ones
+ // will take snapshot periodically as before.
+ List listeners = leader.getSnapPingListeners();
+ for (SnapPingListener listener : listeners) {
+ long sid = listener.getSid();
+ if (currentVotingMembers.containsKey(sid)) {
+ try {
+ listener.snapPing(SnapPingListener.SNAP_PING_ID_DONT_CARE,
+ sid == learnerSnapCandidate
+ ? SnapPingCode.SNAP : SnapPingCode.SKIP);
+ } catch (IOException e) {
+ LOG.error("Exception when sending SNAPPING "
+ + "to {}, {}", sid, e);
+ }
+ }
+ }
+
+ // clear the processed data points
+ data.clear();
+ }
+
+ public synchronized void processSnapPing(SnapPingData spData) {
+ if (spData.getSnapPingId() > currentSnapPingId) {
+ if (!data.isEmpty()) {
+ processData();
+ }
+ currentSnapPingId = spData.snapPingId;
+ }
+ data.add(spData);
+
+ currentVotingMembers = leader.getQuorumVerifier().getVotingMembers();
+ votingMemberSize = currentVotingMembers.size();
+
+ if (data.size() == votingMemberSize) {
+ processData();
+ }
+ }
+ }
+
+ public static class SnapPingData {
+
+ private long sid;
+ private long snapPingId;
+ private boolean snapInProgress;
+ private int txnsSinceLastSnap;
+ private long lastRequestFlushLatency;
+ private int syncProcessorQueuedRequests;
+ private long txnsSizeSinceLastSnap;
+ private int learnerHandlerQueueSize;
+
+ public SnapPingData(long sid, DataInputStream dataIS,
+ int learnerHandlerQueueSize) throws IOException {
+ this(sid, dataIS.readLong(), dataIS.readBoolean(),
+ dataIS.readInt(), dataIS.readLong(), dataIS.readInt(),
+ dataIS.readLong(), learnerHandlerQueueSize);
+ }
+
+ public SnapPingData(long sid, long snapPingId, boolean snapInProgress,
+ int txnsSinceLastSnap, long lastRequestFlushLatency,
+ int syncProcessorQueuedRequests, long txnsSizeSinceLastSnap,
+ int learnerHandlerQueueSize) {
+ this.sid = sid;
+ this.snapPingId = snapPingId;
+ this.snapInProgress = snapInProgress;
+ this.txnsSinceLastSnap = txnsSinceLastSnap;
+ this.lastRequestFlushLatency = lastRequestFlushLatency;
+ this.syncProcessorQueuedRequests = syncProcessorQueuedRequests;
+ this.txnsSizeSinceLastSnap = txnsSizeSinceLastSnap;
+ this.learnerHandlerQueueSize = learnerHandlerQueueSize;
+ LOG.info("{}: {}, snap in progress: {}, txns since last "
+ + "snap: {}, last request flush delay: {}, "
+ + "sync processor queued requests: {}, "
+ + "txn size in bytes since last snap: {}, "
+ + "learner handler queue size: {}", snapPingId, sid,
+ snapInProgress, txnsSinceLastSnap,
+ lastRequestFlushLatency, syncProcessorQueuedRequests,
+ txnsSizeSinceLastSnap, learnerHandlerQueueSize);
+ }
+
+ public long getSid() {
+ return sid;
+ }
+
+ public long getSnapPingId() {
+ return snapPingId;
+ }
+
+ public boolean isSnapInProgress() {
+ return snapInProgress;
+ }
+
+ public int getTxnsSinceLastSnap() {
+ return txnsSinceLastSnap;
+ }
+
+ public long getLastRequestFlushLatency() {
+ return lastRequestFlushLatency;
+ }
+
+ public int getQueuedRequestsSize() {
+ return syncProcessorQueuedRequests;
+ }
+
+ public long getTxnsSizeSinceLastSnap() {
+ return txnsSizeSinceLastSnap;
+ }
+
+ public int getLearnerHandlerQueueSize() {
+ return learnerHandlerQueueSize;
+ }
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java
new file mode 100644
index 00000000000..3ed2f245c49
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SnapPingTest.java
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs.SnapPingCode;
+import org.apache.zookeeper.server.SnapshotGenerator;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.SnapPingManager.SnapPingData;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SnapPingTest extends ZKTestCase {
+
+ private ZKDatabase zkDb;
+ private Leader leader;
+ private SnapshotGenerator leaderSnapGenerator;
+ private static final long TXNS_SIZE_THRESHOLD = 4000 * 1024 * 1024L;
+
+ @Before
+ public void setup() throws Exception {
+ System.setProperty(
+ SnapPingManager.SnapPingHandler.SNAP_TXNS_SIZE_THRESHOLD_KB,
+ "" + (TXNS_SIZE_THRESHOLD / 1024));
+ // create a 5 members voting quorum
+ Map votingMembers = new HashMap();
+ for (long i = 1L; i <= 5L; i++) {
+ votingMembers.put(i, null);
+ }
+ QuorumPeer qp = new QuorumPeer();
+ QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+ when(quorumVerifierMock.getVotingMembers()).thenReturn(votingMembers);
+ when(quorumVerifierMock.getAllMembers()).thenReturn(LeaderBeanTest.getMockedPeerViews(qp.getId()));
+ qp.setQuorumVerifier(quorumVerifierMock, false);
+ qp.setId(5L);
+
+ zkDb = mock(ZKDatabase.class);
+ File tmpDir = ClientBase.createEmptyTestDir();
+ LeaderZooKeeperServer lzs = new LeaderZooKeeperServer(
+ new FileTxnSnapLog(tmpDir, tmpDir), qp, zkDb);
+ leaderSnapGenerator = mock(SnapshotGenerator.class);
+ lzs.setSnapshotGenerator(leaderSnapGenerator);
+ leader = new Leader(qp, lzs);
+
+ leader.addForwardingFollower(genLearnerHandler(1L));
+ leader.addForwardingFollower(genLearnerHandler(2L));
+ leader.addForwardingFollower(genLearnerHandler(3L));
+ leader.addForwardingFollower(genLearnerHandler(4L));
+ }
+
+ /**
+ * Verify when leader has the maximum txns since last snapshot, it will
+ * take snapshot. Non voting followers are being ignored.
+ */
+ @Test
+ public void testLeaderSnapSchedule() throws Exception {
+ // add a non-voting follower
+ leader.addForwardingFollower(genLearnerHandler(8L));
+
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0));
+
+ // take snapshot on leader since it has the most txns since last snap
+ verify(leaderSnapGenerator, times(1)).takeSnapshot(true);
+
+ for (SnapPingListener listener : leader.getForwardingFollowers()) {
+ long sid = listener.getSid();
+ if (sid == 8L) {
+ verify(listener, never()).snapPing(anyLong(), any());
+ } else {
+ verify(listener, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+ }
+
+ /**
+ * Verify on a 5 members ensemble, there could be 2 concurrent snapshot.
+ */
+ @Test
+ public void testSnapScheduleWithMultipleSnap() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, true, 5000000, 0, 0, 10000000, 0));
+
+ // won't take snapshot on leader since it's already in snapshot
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ if (lh.getSid() != 4L) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ } else {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SNAP);
+ }
+ }
+ }
+
+ /**
+ * When there are already 2 concurrent snapshot in 5 members ensemble,
+ * no more snapshot could be scheduled.
+ */
+ @Test
+ public void testMaxConcurrentSnapSchedule() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, true, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, true, 5000000, 0, 0, 10000000, 0));
+
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ // skip snap on all servers
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ /**
+ * With 2 out of 5 servers have high sync latency, we'll skip SNAP on
+ * all servers.
+ */
+ @Test
+ public void testSnapScheduleWithHighFlushLatency() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 1000, 0, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 1000, 0, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0));
+
+ // won't take snapshot on leader since it's already in snapshot
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ /**
+ * With 2 out of 5 servers have large sync request queue size, we'll skip
+ * SNAP on all servers.
+ */
+ @Test
+ public void testSnapScheduleWithLargeLearnerQueueSize() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 100000, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 100000, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0));
+
+ // won't take snapshot on leader since it's already in snapshot
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ /**
+ * With 2 out of 5 servers have large learner queue size, we'll skip SNAP on
+ * all servers.
+ */
+ @Test
+ public void testSnapScheduleWithLargeSyncRequestQueueSize() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 1000000));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 1000000));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000000, 0, 0, 8000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0));
+
+ // won't take snapshot on leader since it's already in snapshot
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ /**
+ * Test the logic of processing snapPing on follower.
+ */
+ @Test
+ public void testSnapPingPacketOnFollower() throws Exception {
+ QuorumPeer qp = new QuorumPeer();
+ QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+ qp.setQuorumVerifier(quorumVerifierMock, false);
+ File tmpDir = ClientBase.createEmptyTestDir();
+ FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+ ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog);
+
+ final AtomicInteger snapshotDurationInSeconds = new AtomicInteger(0);
+ FollowerZooKeeperServer fzk = new FollowerZooKeeperServer(
+ fileTxnSnapLog, qp, zkDb) {
+ @Override
+ public void takeSnapshot(boolean syncSnap) {
+ try {
+ Thread.sleep(snapshotDurationInSeconds.get() * 1000);
+ } catch (InterruptedException e) {}
+ }
+ };
+ fzk.setupRequestProcessors();
+ Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened());
+
+ LinkedBlockingQueue snapPacketsSent =
+ new LinkedBlockingQueue();
+
+ Follower follower = new Follower(qp, fzk) {
+ @Override
+ public void writePacket(QuorumPacket pp, boolean flush) {
+ if (pp.getType() == Leader.SNAPPING) {
+ try {
+ snapPacketsSent.put(pp);
+ } catch (InterruptedException e) { /* ignore */ }
+ }
+ }
+ };
+
+ snapshotDurationInSeconds.set(1);
+
+ // Send a SNAP message
+ follower.snapPing(buildSnapPingPacket(SnapPingCode.SNAP));
+ Assert.assertTrue(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened());
+
+ // Send a CHECK message
+ follower.snapPing(buildSnapPingPacket(SnapPingCode.CHECK));
+ QuorumPacket packetSent = snapPacketsSent.poll(1, TimeUnit.SECONDS);
+ Assert.assertNotNull(packetSent);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(packetSent.getData());
+ DataInputStream dataIS = new DataInputStream(bais);
+ long snapPingId = dataIS.readLong();
+ boolean snapInProgress = dataIS.readBoolean();
+ int txnsSinceLastSnap = dataIS.readInt();
+ long lastRequestFlushLatency = dataIS.readLong();
+ long syncProcessorQueueSize = dataIS.readInt();
+ Assert.assertEquals(snapPingId, 1);
+ Assert.assertTrue(snapInProgress);
+ Assert.assertEquals(0, txnsSinceLastSnap);
+ Assert.assertEquals(0, lastRequestFlushLatency);
+ Assert.assertEquals(0, syncProcessorQueueSize);
+
+ // Send a CANCEL message and make sure we enabled self SNAP again
+ follower.snapPing(buildSnapPingPacket(SnapPingCode.CANCEL));
+ Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened());
+ }
+
+ /**
+ * Make sure the packet will be ignored if the version is different.
+ */
+ @Test
+ public void testDifferentVersion() throws Exception {
+ QuorumPeer qp = new QuorumPeer();
+ QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
+ qp.setQuorumVerifier(quorumVerifierMock, false);
+ File tmpDir = ClientBase.createEmptyTestDir();
+ FileTxnSnapLog fileTxnSnapLog = new FileTxnSnapLog(tmpDir, tmpDir);
+ ZKDatabase zkDb = new ZKDatabase(fileTxnSnapLog);
+ FollowerZooKeeperServer fzk = new FollowerZooKeeperServer(
+ fileTxnSnapLog, qp, zkDb);
+ fzk.setupRequestProcessors();
+ Follower follower = new Follower(qp, fzk);
+
+ Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeInt(SnapPingManager.SNAP_PING_VERSION + 1);
+ oa.writeLong(1L);
+ oa.writeInt(SnapPingCode.CHECK.ordinal());
+ oa.close();
+
+ follower.snapPing(new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null));
+ // Make sure we don't enable safety snapshot when snap ping version
+ // is different
+ Assert.assertFalse(fzk.syncProcessor.isOnlySnapWhenSafetyIsThreatened());
+ }
+
+ @Test
+ public void testCancelSnapPingSchedule() throws Exception {
+ // Disable the snapPing manager via set the interval to -1
+ leader.snapPingManager.setSnapPingIntervalInSeconds(-1);
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.CANCEL);
+ }
+ }
+
+ /**
+ * Expect to trigger snap schedule even we didn't receive enough responses
+ * if we received a higher SnapPing id message.
+ */
+ @Test
+ public void testNotReceiveAllSnapPingResponse() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000000, 0, 0, 2000000, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000000, 0, 0, 4000000, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000000, 0, 0, 6000000, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000000, 0, 0, 10000000, 0));
+
+ // Received a new SnapPing with higher SnapPing Id before we received
+ // the all responses for the previous PING.
+ leader.processSnapPing(new SnapPingData(1L, 2L, false, 3000000, 0, 0, 6000000, 0));
+
+ // won't take snapshot on leader since it's already in snapshot
+ verify(leaderSnapGenerator, times(1)).takeSnapshot(true);
+
+ for (LearnerHandler lh : leader.getForwardingFollowers()) {
+ verify(lh, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ @Test
+ public void testScheduleBasedOnTxnSize() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 1000, 0, 0, TXNS_SIZE_THRESHOLD / 4, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 2000, 0, 0, TXNS_SIZE_THRESHOLD / 2, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 3000, 0, 0, TXNS_SIZE_THRESHOLD * 3 / 4, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 4000, 0, 0, TXNS_SIZE_THRESHOLD, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 5000, 0, 0, TXNS_SIZE_THRESHOLD * 5 / 4, 0));
+
+ // take snapshot on leader since it has the most txns size since last snap
+ verify(leaderSnapGenerator, times(1)).takeSnapshot(true);
+
+ for (SnapPingListener listener : leader.getForwardingFollowers()) {
+ long sid = listener.getSid();
+ verify(listener, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ @Test
+ public void testScheduleBasedOnTxnSizeSmallerThanThreshold() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 100, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 200, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0));
+ leader.processSnapPing(new SnapPingData(3L, 1L, false, 300, 0, 0, TXNS_SIZE_THRESHOLD * 3 / 40, 0));
+ leader.processSnapPing(new SnapPingData(4L, 1L, false, 400, 0, 0, TXNS_SIZE_THRESHOLD / 10, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 500, 0, 0, TXNS_SIZE_THRESHOLD * 5 / 40, 0));
+
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ for (SnapPingListener listener : leader.getForwardingFollowers()) {
+ long sid = listener.getSid();
+ verify(listener, times(1)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ /**
+ * Schedule snapshot after several idle snap pings even if two followers are down.
+ */
+ @Test
+ public void testScheduleBasedOnIdle() throws Exception {
+ leader.processSnapPing(new SnapPingData(1L, 1L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0));
+ leader.processSnapPing(new SnapPingData(2L, 1L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0));
+ leader.processSnapPing(new SnapPingData(5L, 1L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0));
+ leader.processSnapPing(new SnapPingData(1L, 2L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0));
+ leader.processSnapPing(new SnapPingData(2L, 2L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0));
+ leader.processSnapPing(new SnapPingData(5L, 2L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0));
+ leader.processSnapPing(new SnapPingData(1L, 3L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0));
+ leader.processSnapPing(new SnapPingData(2L, 3L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 20, 0));
+ leader.processSnapPing(new SnapPingData(5L, 3L, false, 500000, 0, 0, TXNS_SIZE_THRESHOLD * 3, 0));
+
+ verify(leaderSnapGenerator, never()).takeSnapshot(anyBoolean());
+
+ // At this point, the number of idle pings should be equal to the threshold of 3, so the next ping should
+ // result in a snapshot being scheduled.
+ leader.processSnapPing(new SnapPingData(1L, 4L, false, 200000, 0, 0, TXNS_SIZE_THRESHOLD / 40, 0));
+
+ verify(leaderSnapGenerator, times(1)).takeSnapshot(true);
+ for (SnapPingListener listener : leader.getForwardingFollowers()) {
+ long sid = listener.getSid();
+ verify(listener, times(3)).snapPing(
+ SnapPingListener.SNAP_PING_ID_DONT_CARE, SnapPingCode.SKIP);
+ }
+ }
+
+ private QuorumPacket buildSnapPingPacket(SnapPingCode code) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeInt(SnapPingManager.SNAP_PING_VERSION);
+ oa.writeLong(1L);
+ oa.writeInt(code.ordinal());
+ oa.close();
+ return new QuorumPacket(Leader.SNAPPING, -1, baos.toByteArray(), null);
+ }
+
+ private LearnerHandler genLearnerHandler(long sid) {
+ LearnerHandler lh = mock(LearnerHandler.class);
+ when(lh.getSid()).thenReturn(sid);
+ return lh;
+ }
+}