From e9961f2730495974b8cd322665b242fe2c90075f Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 4 Feb 2016 11:12:57 -0800 Subject: [PATCH] BOOKKEEPER-883: Test timeout in bookkeeper-benchmark Problem: The BenchReadThroughputLatency is tight with FlatLedgerManager. so lots of assumptions are made based on how the znodes are changed when ledgers are created. There was a change introduced LedgerIdGenerator, which broke the assumptions that made by BenchReadThroughputLatency. Fix: - Use a hashset to cache processed ledgers on reacting on children changes - Remove unpredictable test on next ledger - Fix an error logging on FlatLedgerManager processing ledgers --- .../benchmark/BenchReadThroughputLatency.java | 68 +++++++++---------- .../bookkeeper/benchmark/TestBenchmark.java | 54 +-------------- .../bookkeeper/meta/FlatLedgerManager.java | 5 ++ .../bookkeeper/meta/ZkLedgerIdGenerator.java | 4 +- 4 files changed, 45 insertions(+), 86 deletions(-) diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java index d5baaa400fa..1cdd5640b45 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java @@ -19,42 +19,34 @@ */ package org.apache.bookkeeper.benchmark; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.AsyncCallback.AddCallback; - +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.PosixParser; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Enumeration; -import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; import java.util.ArrayList; +import java.util.Set; import java.util.regex.Pattern; import java.util.regex.Matcher; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.cli.ParseException; - import static com.google.common.base.Charsets.UTF_8; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BenchReadThroughputLatency { static final Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class); @@ -90,7 +82,7 @@ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] p try { bk = new BookKeeper(conf); while (true) { - lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, + lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, passwd); long lastConfirmed = Math.min(lh.getLastAddConfirmed(), absoluteLimit); if (lastConfirmed == lastRead) { @@ -154,7 +146,7 @@ private static void usage(Options options) { @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Options options = new Options(); - options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " + options.addOption("ledger", true, "Ledger to read. If empty, read all ledgers which come available. " + " Cannot be used with -listen"); options.addOption("listen", true, "Listen for creation of ledgers, and read each one fully"); options.addOption("password", true, "Password used to access ledgers (default 'benchPasswd')"); @@ -207,11 +199,12 @@ public void process(WatchedEvent event) { } } }); + final Set processedLedgers = new HashSet(); try { zk.register(new Watcher() { public void process(WatchedEvent event) { try { - if (event.getState() == Event.KeeperState.SyncConnected + if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.None) { connectedLatch.countDown(); } else if (event.getType() == Event.EventType.NodeCreated @@ -229,22 +222,29 @@ public void process(WatchedEvent event) { ledgers.add(child); } } - Collections.sort(ledgers, ZK_LEDGER_COMPARE); - String last = ledgers.get(ledgers.size() - 1); - final Matcher m = LEDGER_PATTERN.matcher(last); - if (m.find()) { - int ledgersLeft = numLedgers.decrementAndGet(); - Thread t = new Thread() { - public void run() { - readLedger(conf, Long.valueOf(m.group(1)), passwd); + for (String ledger : ledgers) { + synchronized (processedLedgers) { + if (processedLedgers.contains(ledger)) { + continue; + } + final Matcher m = LEDGER_PATTERN.matcher(ledger); + if (m.find()) { + int ledgersLeft = numLedgers.decrementAndGet(); + final Long ledgerId = Long.valueOf(m.group(1)); + processedLedgers.add(ledger); + Thread t = new Thread() { + public void run() { + readLedger(conf, ledgerId, passwd); + } + }; + t.start(); + if (ledgersLeft <= 0) { + shutdownLatch.countDown(); } - }; - t.start(); - if (ledgersLeft <= 0) { - shutdownLatch.countDown(); + } else { + LOG.error("Cant file ledger id in {}", ledger); + } } - } else { - LOG.error("Cant file ledger id in {}", last); } } else { LOG.warn("Unknown event {}", event); diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java index ec3cd610f53..f5108ec715b 100644 --- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java +++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java @@ -19,29 +19,16 @@ */ package org.apache.bookkeeper.benchmark; -import org.junit.BeforeClass; -import org.junit.AfterClass; -import org.junit.Test; -import org.junit.Assert; - import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.LocalBookKeeper; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class TestBenchmark extends BookKeeperClusterTestCase { @@ -114,7 +101,7 @@ public void run() { if (!t.isAlive()) { break; } - Thread.sleep(1000); // wait for 10 seconds for reading to finish + Thread.sleep(100); } Assert.assertFalse("Thread should be finished", t.isAlive()); @@ -122,40 +109,5 @@ public void run() { BenchReadThroughputLatency.main(new String[] { "--zookeeper", zkUtil.getZooKeeperConnectString(), "--ledger", String.valueOf(lastLedgerId)}); - - final long nextLedgerId = lastLedgerId+1; - t = new Thread() { - public void run() { - try { - BenchReadThroughputLatency.main(new String[] { - "--zookeeper", zkUtil.getZooKeeperConnectString(), - "--ledger", String.valueOf(nextLedgerId)}); - } catch (Throwable t) { - LOG.error("Error reading", t); - threwException.set(true); - } - } - }; - t.start(); - - Assert.assertTrue("Thread should be running", t.isAlive()); - BookKeeper bk = new BookKeeper(zkUtil.getZooKeeperConnectString()); - LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.CRC32, "benchPasswd".getBytes()); - try { - for (int j = 0; j < 100; j++) { - lh.addEntry(data); - } - } finally { - lh.close(); - bk.close(); - } - for (int i = 0; i < 60; i++) { - if (!t.isAlive()) { - break; - } - Thread.sleep(1000); // wait for 10 seconds for reading to finish - } - Assert.assertFalse("Thread should be finished", t.isAlive()); - Assert.assertFalse("A thread has thrown an exception, check logs", threwException.get()); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java index 6bd32160693..3172247e60b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java @@ -87,6 +87,11 @@ public void asyncProcessLedgers(final Processor processor, asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc); } + @Override + protected boolean isSpecialZnode(String znode) { + return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode); + } + @Override public LedgerRangeIterator getLedgerRanges() { return new LedgerRangeIterator() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java index a6c5b7bacb4..b54c8918150 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java @@ -42,6 +42,8 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator { static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class); + static final String LEDGER_ID_GEN_PREFIX = "ID-"; + final ZooKeeper zk; final String ledgerIdGenPath; final String ledgerPrefix; @@ -55,7 +57,7 @@ public ZkLedgerIdGenerator(ZooKeeper zk, } else { this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName; } - this.ledgerPrefix = this.ledgerIdGenPath + "/ID-"; + this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX; } @Override