From 39bd1a3eb9171a014845fff97648341cbfb40a11 Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Tue, 1 Aug 2017 13:25:51 -0700 Subject: [PATCH 1/4] ZOOKEEPER-2872: Interrupted snapshot sync causes data loss --- .../main/org/apache/zookeeper/server/ZooKeeperServer.java | 8 ++++++-- .../org/apache/zookeeper/server/persistence/FileSnap.java | 7 +++++-- .../zookeeper/server/persistence/FileTxnSnapLog.java | 8 +++++--- .../org/apache/zookeeper/server/persistence/SnapShot.java | 8 +++++--- .../main/org/apache/zookeeper/server/quorum/Learner.java | 8 ++++++-- .../org/apache/zookeeper/server/quorum/Zab1_0Test.java | 4 ++-- src/java/test/org/apache/zookeeper/test/TruncateTest.java | 2 +- 7 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index f45ac09069b..1704826b1cf 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -300,9 +300,13 @@ public void loadData() throws IOException, InterruptedException { takeSnapshot(); } - public void takeSnapshot(){ + public void takeSnapshot() { + takeSnapshot(false); + } + + public void takeSnapshot(boolean syncSnap){ try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java b/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java index 30ce7c7e856..15ab77ec6e4 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java @@ -37,6 +37,7 @@ import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; +import org.apache.zookeeper.common.AtomicFileOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.zookeeper.server.DataTree; @@ -213,11 +214,13 @@ protected void serialize(DataTree dt,Map sessions, * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into + * @param fsync sync the file immediately after write */ - public synchronized void serialize(DataTree dt, Map sessions, File snapShot) + public synchronized void serialize(DataTree dt, Map sessions, File snapShot, boolean fsync) throws IOException { if (!close) { - try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot)); + try (OutputStream sessOS = new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : + new FileOutputStream(snapShot)); CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 8c2919f9d89..3a03c81e3a7 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -196,7 +196,7 @@ public long restore(DataTree dt, Map sessions, if (trustEmptyDB) { /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ - save(dt, (ConcurrentHashMap)sessions); + save(dt, (ConcurrentHashMap)sessions, false); /* return a zxid of 0, since we know the database is empty */ return 0L; @@ -335,16 +335,18 @@ public long getLastLoggedZxid() { * @param dataTree the datatree to be serialized onto disk * @param sessionsWithTimeouts the session timeouts to be * serialized onto disk + * @param syncSnap sync the snapshot immediately after write * @throws IOException */ public void save(DataTree dataTree, - ConcurrentHashMap sessionsWithTimeouts) + ConcurrentHashMap sessionsWithTimeouts, + boolean syncSnap) throws IOException { long lastZxid = dataTree.lastProcessedZxid; File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); - snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); + snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } diff --git a/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java b/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java index c964afc6d83..257c12d95d0 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java @@ -44,11 +44,13 @@ long deserialize(DataTree dt, Map sessions) /** * persist the datatree and the sessions into a persistence storage * @param dt the datatree to be serialized - * @param sessions + * @param sessions the session timeouts to be serialized + * @param name the object name to store snapshot into + * @param fsync sync the snapshot immediately after write * @throws IOException */ - void serialize(DataTree dt, Map sessions, - File name) + void serialize(DataTree dt, Map sessions, + File name, boolean fsync) throws IOException; /** diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index f2cccd2a958..b01f836874c 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -364,6 +364,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{ readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); + boolean syncSnapshot = false; synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); @@ -387,6 +388,9 @@ else if (qp.getType() == Leader.SNAP) { throw new IOException("Missing signature"); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" @@ -513,7 +517,7 @@ else if (qp.getType() == Leader.SNAP) { } } if (isPreZAB1_0) { - zk.takeSnapshot(); + zk.takeSnapshot(syncSnapshot); self.setCurrentEpoch(newEpoch); } self.setZooKeeperServer(zk); @@ -533,7 +537,7 @@ else if (qp.getType() == Leader.SNAP) { } if (snapshotNeeded) { - zk.takeSnapshot(); + zk.takeSnapshot(syncSnapshot); } self.setCurrentEpoch(newEpoch); diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index e7dc90bd9d3..df5c48dc8a4 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -435,7 +435,7 @@ public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversa Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0)); // Generate snapshot and close files. - snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); + snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false); snapLog.close(); QuorumPeer peer = createQuorumPeer(tmpDir); @@ -1367,7 +1367,7 @@ public void testInitialAcceptedCurrent() throws Exception { FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir); File version2 = new File(tmpDir, "version-2"); version2.mkdir(); - logFactory.save(new DataTree(), new ConcurrentHashMap()); + logFactory.save(new DataTree(), new ConcurrentHashMap(), false); long zxid = ZxidUtils.makeZxid(3, 3); logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), diff --git a/src/java/test/org/apache/zookeeper/test/TruncateTest.java b/src/java/test/org/apache/zookeeper/test/TruncateTest.java index 955eb1e7ad8..ede38d0b9ff 100644 --- a/src/java/test/org/apache/zookeeper/test/TruncateTest.java +++ b/src/java/test/org/apache/zookeeper/test/TruncateTest.java @@ -75,7 +75,7 @@ public void testTruncationStreamReset() throws Exception { ZKDatabase zkdb = new ZKDatabase(snaplog); // make sure to snapshot, so that we have something there when // truncateLog reloads the db - snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts()); + snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false); for (int i = 1; i <= 100; i++) { append(zkdb, i); From 8f5eb92a0d688a8c3c857135bb66027da8824899 Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Thu, 10 Aug 2017 18:39:07 -0700 Subject: [PATCH 2/4] only close OutputStream once in FileSnap --- .../apache/zookeeper/server/persistence/FileSnap.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java b/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java index 15ab77ec6e4..2ea714e47f3 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java @@ -219,9 +219,10 @@ protected void serialize(DataTree dt,Map sessions, public synchronized void serialize(DataTree dt, Map sessions, File snapShot, boolean fsync) throws IOException { if (!close) { - try (OutputStream sessOS = new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : - new FileOutputStream(snapShot)); - CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) { + try (CheckedOutputStream crcOut = + new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : + new FileOutputStream(snapShot)), + new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); @@ -229,7 +230,7 @@ public synchronized void serialize(DataTree dt, Map sessions, Fil long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); - sessOS.flush(); + crcOut.flush(); } } } From 311288af8282f52ba575c66108a5352ededbee57 Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Fri, 11 Aug 2017 10:39:20 -0700 Subject: [PATCH 3/4] fix verification of snapshot in mock testing --- .../test/org/apache/zookeeper/server/quorum/Zab1_0Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index df5c48dc8a4..acac87edfab 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -714,7 +714,7 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Assert.assertEquals(1, f.self.getAcceptedEpoch()); Assert.assertEquals(1, f.self.getCurrentEpoch()); //Make sure that we did take the snapshot now - verify(f.zk).takeSnapshot(); + verify(f.zk).takeSnapshot(true); Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid()); // Make sure the data was recorded in the filesystem ok From 17a7c40c62bb0a6683ff67fae8fa72bf7c69539c Mon Sep 17 00:00:00 2001 From: Brian Nixon Date: Tue, 15 Aug 2017 10:50:25 -0700 Subject: [PATCH 4/4] rearrange declaration order --- src/java/main/org/apache/zookeeper/server/quorum/Learner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index b01f836874c..726e688fd28 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -361,10 +361,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception{ // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot // For SNAP and TRUNC the snapshot is needed to save that history boolean snapshotNeeded = true; + boolean syncSnapshot = false; readPacket(qp); LinkedList packetsCommitted = new LinkedList(); LinkedList packetsNotCommitted = new LinkedList(); - boolean syncSnapshot = false; synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));