Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ private class TimedRunnable implements SafeRunnable {

@Override
public void safeRun() {
taskPendingStats.registerSuccessfulEvent(initNanos, TimeUnit.NANOSECONDS);
taskPendingStats.registerSuccessfulEvent(
MathUtils.elapsedNanos(initNanos),
TimeUnit.NANOSECONDS);
long startNanos = MathUtils.nowInNano();
this.runnable.safeRun();
long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos);
Expand Down Expand Up @@ -325,7 +327,7 @@ public ListenableFuture<?> submitOrdered(Object orderingKey, SafeRunnable r) {
* @param r
*/
public void submitOrdered(long orderingKey, SafeRunnable r) {
chooseThread(orderingKey).execute(r);
chooseThread(orderingKey).execute(timedRunnable(r));
}

/**
Expand All @@ -335,7 +337,7 @@ public void submitOrdered(long orderingKey, SafeRunnable r) {
* @param r
*/
public void submitOrdered(int orderingKey, SafeRunnable r) {
chooseThread(orderingKey).execute(r);
chooseThread(orderingKey).execute(timedRunnable(r));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// Registration
protected static final String REGISTRATION_MANAGER_CLASS = "registrationManagerClass";

// Stats
protected static final String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";

/**
* Construct a default configuration object.
*/
Expand Down Expand Up @@ -2412,6 +2415,28 @@ public ServerConfiguration setTLSTrustStorePasswordPath(String arg) {
return this;
}


/**
* Whether to enable recording task execution stats.
*
* @return flag to enable/disable recording task execution stats.
*/
public boolean getEnableTaskExecutionStats() {
return getBoolean(ENABLE_TASK_EXECUTION_STATS, false);
}

/**
* Enable/Disable recording task execution stats.
*
* @param enabled
* flag to enable/disable recording task execution stats.
* @return client configuration.
*/
public ServerConfiguration setEnableTaskExecutionStats(boolean enabled) {
setProperty(ENABLE_TASK_EXECUTION_STATS, enabled);
return this;
}

/**
* Gets the minimum safe Usable size to be available in index directory for Bookie to create Index File while
* replaying journal at the time of Bookie Start in Readonly Mode (in bytes).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,21 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
this.serverCfg = serverCfg;
this.bookie = bookie;
this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(),
"BookieReadThread-" + serverCfg.getBookiePort(),
serverCfg.getMaxPendingReadRequestPerThread());
this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(),
"BookieWriteThread-" + serverCfg.getBookiePort(),
serverCfg.getMaxPendingAddRequestPerThread());
this.longPollThreadPool =
createExecutor(
this.readThreadPool = createExecutor(
this.serverCfg.getNumReadWorkerThreads(),
"BookieReadThreadPool",
serverCfg.getMaxPendingReadRequestPerThread(),
statsLogger);
this.writeThreadPool = createExecutor(
this.serverCfg.getNumAddWorkerThreads(),
"BookieWriteThreadPool",
serverCfg.getMaxPendingAddRequestPerThread(),
statsLogger);
this.longPollThreadPool = createExecutor(
this.serverCfg.getNumLongPollWorkerThreads(),
"BookieLongPollThread-" + serverCfg.getBookiePort(), OrderedScheduler.NO_TASK_LIMIT);
"BookieLongPollThread",
OrderedScheduler.NO_TASK_LIMIT,
statsLogger);
this.requestTimer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(),
this.serverCfg.getRequestTimerTickDurationMs(),
Expand Down Expand Up @@ -191,12 +196,21 @@ public void close() {
shutdownExecutor(readThreadPool);
}

private OrderedSafeExecutor createExecutor(int numThreads, String nameFormat, int maxTasksInQueue) {
private OrderedSafeExecutor createExecutor(
int numThreads,
String nameFormat,
int maxTasksInQueue,
StatsLogger statsLogger) {
if (numThreads <= 0) {
return null;
} else {
return OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat)
.maxTasksInQueue(maxTasksInQueue).build();
return OrderedSafeExecutor.newBuilder()
.numThreads(numThreads)
.name(nameFormat)
.traceTaskExecution(serverCfg.getEnableTaskExecutionStats())
.statsLogger(statsLogger)
.maxTasksInQueue(maxTasksInQueue)
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public abstract class BookKeeperClusterTestCase {
protected final List<File> tmpDirs = new LinkedList<File>();
protected final List<BookieServer> bs = new LinkedList<BookieServer>();
protected final List<ServerConfiguration> bsConfs = new LinkedList<ServerConfiguration>();
private final Map<BookieSocketAddress, TestStatsProvider> bsLoggers = new HashMap<>();
protected int numBookies;
protected BookKeeperTestClient bkc;

Expand Down Expand Up @@ -198,6 +199,7 @@ protected void stopBKCluster() throws Exception {
}
}
bs.clear();
bsLoggers.clear();
}

protected void cleanupTempDirs() throws Exception {
Expand Down Expand Up @@ -235,6 +237,7 @@ protected ServerConfiguration newServerConfiguration(int port, String zkServers,
ledgerDirNames[i] = ledgerDirs[i].getPath();
}
conf.setLedgerDirNames(ledgerDirNames);
conf.setEnableTaskExecutionStats(true);
return conf;
}

Expand Down Expand Up @@ -307,6 +310,7 @@ public ServerConfiguration killBookie(BookieSocketAddress addr) throws Exception
if (toRemove != null) {
stopAutoRecoveryService(toRemove);
bs.remove(toRemove);
bsLoggers.remove(addr);
return bsConfs.remove(toRemoveIndex);
}
return null;
Expand Down Expand Up @@ -346,6 +350,7 @@ public ServerConfiguration killBookie(int index) throws Exception {
server.shutdown();
stopAutoRecoveryService(server);
bs.remove(server);
bsLoggers.remove(server.getLocalAddress());
return bsConfs.remove(index);
}

Expand Down Expand Up @@ -480,16 +485,14 @@ public void restartBookie(BookieSocketAddress addr) throws Exception {
int toRemoveIndex = 0;
for (BookieServer server : bs) {
if (server.getLocalAddress().equals(addr)) {
server.shutdown();
toRemove = server;
break;
}
++toRemoveIndex;
}
if (toRemove != null) {
stopAutoRecoveryService(toRemove);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line seems to related to the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for typo, it does not seem to be related to the issue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopAutoRecoveryService gets called as part of of killBookie. I refactored it a little to make the bs/bsLoggers state maintenance slightly less error prone.

bs.remove(toRemove);
ServerConfiguration newConfig = bsConfs.remove(toRemoveIndex);
ServerConfiguration newConfig = bsConfs.get(toRemoveIndex);
killBookie(toRemoveIndex);
Thread.sleep(1000);
bs.add(startBookie(newConfig));
bsConfs.add(newConfig);
Expand Down Expand Up @@ -517,6 +520,7 @@ public void restartBookies(ServerConfiguration newConf)
stopAutoRecoveryService(server);
}
bs.clear();
bsLoggers.clear();
Thread.sleep(1000);
// restart them to ensure we can't
for (ServerConfiguration conf : bsConfs) {
Expand Down Expand Up @@ -557,8 +561,11 @@ public int startNewBookie()
*/
protected BookieServer startBookie(ServerConfiguration conf)
throws Exception {
BookieServer server = new BookieServer(conf);
TestStatsProvider provider = new TestStatsProvider();
BookieServer server = new BookieServer(conf, provider.getStatsLogger(""));
BookieSocketAddress address = Bookie.getBookieAddress(conf);
bsLoggers.put(address, provider);

if (bkc == null) {
bkc = new BookKeeperTestClient(baseClientConf);
}
Expand Down Expand Up @@ -588,7 +595,8 @@ protected BookieServer startBookie(ServerConfiguration conf)
*/
protected BookieServer startBookie(ServerConfiguration conf, final Bookie b)
throws Exception {
BookieServer server = new BookieServer(conf) {
TestStatsProvider provider = new TestStatsProvider();
BookieServer server = new BookieServer(conf, provider.getStatsLogger("")) {
@Override
protected Bookie newBookie(ServerConfiguration conf) {
return b;
Expand All @@ -604,6 +612,7 @@ protected Bookie newBookie(ServerConfiguration conf) {
: bkc.waitForWritableBookie(address);

server.start();
bsLoggers.put(server.getLocalAddress(), provider);

waitForBookie.get(30, TimeUnit.SECONDS);
LOG.info("New bookie '{}' has been created.", address);
Expand Down Expand Up @@ -723,4 +732,18 @@ public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception {
public static boolean isCreatedFromIp(BookieSocketAddress addr) {
return addr.getSocketAddress().toString().startsWith("/");
}

public void resetBookieOpLoggers() {
for (TestStatsProvider provider : bsLoggers.values()) {
provider.clear();
}
}

public TestStatsProvider getStatsProvider(BookieSocketAddress addr) {
return bsLoggers.get(addr);
}

public TestStatsProvider getStatsProvider(int index) throws Exception {
return getStatsProvider(bs.get(index).getLocalAddress());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.bookkeeper.test;

import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
import static org.junit.Assert.assertTrue;

import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.util.MathUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* Basic tests to verify that stats are being updated as expected.
*/
public class OpStatTest extends BookKeeperClusterTestCase {
private LedgerHandle lh;

public OpStatTest() {
super(1);
}

@Before
@Override
public void setUp() throws Exception {
super.setUp();
lh = bkc.createLedger(1, 1, BookKeeper.DigestType.CRC32, "".getBytes());
resetBookieOpLoggers();
}

@After
@Override
public void tearDown() throws Exception {
lh.close();
lh = null;
super.tearDown();
}

private void validateOpStat(TestStatsProvider stats, String path, BiConsumer<Long, Double> f) {
assertTrue(stats != null);
TestStatsProvider.TestOpStatsLogger logger = stats.getOpStatsLogger(path);
assertTrue(logger != null);
f.accept(logger.getSuccessCount(), logger.getSuccessAverage());
}

private void validateOpStat(TestStatsProvider stats, String paths[], BiConsumer<Long, Double> f) {
for (String path : paths) {
validateOpStat(stats, path, f);
}
}

@Test
public void testTopLevelBookieWriteCounters() throws Exception {
long startNanos = MathUtils.nowInNano();
lh.addEntry("test".getBytes());
long elapsed = MathUtils.elapsedNanos(startNanos);
TestStatsProvider stats = getStatsProvider(0);
validateOpStat(stats, new String[]{
SERVER_SCOPE + ".ADD_ENTRY",
SERVER_SCOPE + ".ADD_ENTRY_REQUEST",
SERVER_SCOPE + ".BookieWriteThreadPool.task_queued",
SERVER_SCOPE + ".BookieWriteThreadPool.task_execution",
SERVER_SCOPE + ".CHANNEL_WRITE"
}, (count, average) -> {
assertTrue(count == 1);
assertTrue(average > 0);
assertTrue(average <= elapsed);
});
validateOpStat(stats, new String[]{
SERVER_SCOPE + ".CHANNEL_WRITE"
}, (count, average) -> {
assertTrue(count > 0);
assertTrue(average > 0);
assertTrue(average <= elapsed);
});
}

@Test
public void testTopLevelBookieReadCounters() throws Exception {
long startNanos = MathUtils.nowInNano();
lh.addEntry("test".getBytes());
lh.readEntries(0, 0);
long elapsed = MathUtils.elapsedNanos(startNanos);
TestStatsProvider stats = getStatsProvider(0);
validateOpStat(stats, new String[]{
SERVER_SCOPE + ".READ_ENTRY",
SERVER_SCOPE + ".READ_ENTRY_REQUEST",
SERVER_SCOPE + ".BookieReadThreadPool.task_queued",
SERVER_SCOPE + ".BookieReadThreadPool.task_execution",
}, (count, average) -> {
assertTrue(count == 1);
assertTrue(average > 0);
assertTrue(average <= elapsed);
});
validateOpStat(stats, new String[]{
SERVER_SCOPE + ".CHANNEL_WRITE"
}, (count, average) -> {
assertTrue(count > 0);
assertTrue(average > 0);
assertTrue(average <= elapsed);
});
}
}
Loading