Skip to content

Commit

Permalink
ZOOKEEPER-4511: Fix flaky test FileTxnSnapLogMetricsTest.testFileTxnS…
Browse files Browse the repository at this point in the history
…napLogMetrics (#37)

This test writes txns to trigger snapshot and expects some txns remain
in txn log. But snapshot taking is asynchronous, thus all txns could be
written to snapshot. So in restarting, it is possible that no txns to
load after snapshot restored. This will fail assertion.

This commit solves this by disable automic snapshot taking by
`SyncRequestProcessor.setSnapCount(Integer.MAX_VALUE)`.

Author: Kezhu Wang <kezhuw@gmail.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Mate Szalay-Beko <symat@apache.org>

Closes apache#1852 from kezhuw/ZOOKEEPER-4511-FileTxnSnapLogMetricsTest-testFileTxnSnapLogMetrics

Co-authored-by: Kezhu Wang <kezhuw@gmail.com>
  • Loading branch information
anurag-harness and kezhuw committed Jan 13, 2023
1 parent c7638cd commit fe05160
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.LocalDateTime;
import java.time.Instant;
import org.apache.zookeeper.util.ServiceUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -100,8 +100,8 @@ public interface WaitForCondition {
* @throws InterruptedException
*/
public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
final LocalDateTime deadline = LocalDateTime.now().plusSeconds(timeout);
while (LocalDateTime.now().isBefore(deadline)) {
final Instant deadline = Instant.now().plusSeconds(timeout);
while (Instant.now().isBefore(deadline)) {
if (condition.evaluate()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,75 @@
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileTxnSnapLogMetricsTest extends ZKTestCase {

private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLogMetricsTest.class);

CountDownLatch allCreatedLatch;
@TempDir
File logDir;

private class MockWatcher implements Watcher {
@TempDir
File snapDir;

@Override
public void process(WatchedEvent e) {
LOG.info("all nodes created");
allCreatedLatch.countDown();
}
private ServerCnxnFactory startServer() throws Exception {
ZooKeeperServer zkServer = new ZooKeeperServer(snapDir, logDir, 3000);
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(0, -1);
cnxnFactory.startup(zkServer);
return cnxnFactory;
}

@AfterEach
public void cleanup() throws Exception {
SyncRequestProcessor.setSnapCount(ZooKeeperServer.getSnapCount());
}

@Test
public void testFileTxnSnapLogMetrics() throws Exception {
SyncRequestProcessor.setSnapCount(100);

QuorumUtil util = new QuorumUtil(1);
util.startAll();
ServerCnxnFactory cnxnFactory = startServer();
String connectString = "127.0.0.1:" + cnxnFactory.getLocalPort();

allCreatedLatch = new CountDownLatch(1);
// Snapshot in load data.
assertEquals(1L, MetricsUtils.currentServerMetrics().get("cnt_snapshottime"));

byte[] data = new byte[500];
// make sure a snapshot is taken and some txns are not in a snapshot
ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
ZooKeeper zk = ClientBase.createZKClient(connectString);
for (int i = 0; i < 150; i++) {
zk.create("/path" + i, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

if (null == zk.exists("/path149", new MockWatcher())) {
allCreatedLatch.await();
}
// It is possible that above writes will trigger more than one snapshot due to randomization.
WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L;
waitFor("no snapshot in 10s", newSnapshot, 10);

// Pauses snapshot and logs more txns.
cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close();
zk.create("/" + 1000, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/" + 1001, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// Restart server to count startup metrics.
cnxnFactory.shutdown();
ServerMetrics.getMetrics().resetAll();
int leader = util.getLeaderServer();
// restart a server so it will read the snapshot and the txn logs
util.shutdown(leader);
util.start(leader);
cnxnFactory = startServer();

Map<String, Object> values = MetricsUtils.currentServerMetrics();
LOG.info("txn loaded during start up {}", values.get("max_startup_txns_loaded"));
Expand All @@ -90,7 +101,7 @@ public void testFileTxnSnapLogMetrics() throws Exception {
assertEquals(1L, values.get("cnt_startup_snap_load_time"));
assertThat((long) values.get("max_startup_snap_load_time"), greaterThan(0L));

util.shutdownAll();
cnxnFactory.shutdown();
}

}

0 comments on commit fe05160

Please sign in to comment.