Skip to content

Commit

Permalink
ISSUE #1540: Bookie/BookieServer components shutdown will fail to end…
Browse files Browse the repository at this point in the history
… exit the BookieProcess

Descriptions of the changes in this PR:

 ### Motivation

Fixes the issue at #1540.

If Bookie/BookieServer components are shutdown internally because of any fatal errors
(ExitCode - INVALID_CONF, SERVER_EXCEPTION, ZK_EXPIRED, ZK_REG_FAIL, BOOKIE_EXCEPTION) then
it will go through shutdown method logic and shutdowns components internal to Bookie/BookieServer
but it will not succeed in bringing down the bookie process.

This is because in BookieServer.main / server.Main.doMain it would wait for the startComponent
future to complete
http://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java#L227 .
The startComponent future will be market complete only in runtime shutdownhook -
https://github.com/apache/bookkeeper/blob/master/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/component/ComponentStarter.java#L66.

But the problem is nowhere in Bookie/BookieProcess shutdown we are calling System.exit() and hence
the runtime shutdownhook is not executed to mark the startComponent future to complete. Hence
Main.doMain will wait forever on this future though Bookie/BookieServer components are shutdown
because of known fatal errors.

 ### Regression

Issue #508 introduced this regression. Before this change, the main thread is blocking using `BookieServer#join()`.
When bookie is dead for any reason, the DeathWatchThread will kill the bookie and bookie server. so the main thread will quite.
However after #508 is introduced, the lifecycle management is disconnected from the bookie and bookie server. so when they are dead,
lifecycle management is unaware of the situation and the main thread doesn't quite.

 ### Changes

- Add `UncaughtExceptionHandler` to lifecycle components
- When a lifecycle component hits an error, it can use `UncaughtExceptionHandler` to notify lifecycle component stack to shutdown the whole stack

Master Issue: #1540

Author: Sijie Guo <sijie@apache.org>

Reviewers: Andrey Yegorov <None>, Charan Reddy Guttapalem <reddycharan18@gmail.com>, Enrico Olivelli <eolivelli@gmail.com>

This closes #1543 from sijie/fix_lifcycle_components, closes #1540
  • Loading branch information
sijie committed Jul 23, 2018
1 parent 424204e commit 50f29ed
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.common.component;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -35,6 +36,7 @@ public abstract class AbstractLifecycleComponent<ConfT extends ComponentConfigur
protected final Lifecycle lifecycle = new Lifecycle();
private final Set<LifecycleListener> listeners = new CopyOnWriteArraySet<>();
protected final StatsLogger statsLogger;
protected volatile UncaughtExceptionHandler uncaughtExceptionHandler;

protected AbstractLifecycleComponent(String componentName,
ConfT conf,
Expand All @@ -43,6 +45,11 @@ protected AbstractLifecycleComponent(String componentName,
this.statsLogger = statsLogger;
}

@Override
public void setExceptionHandler(UncaughtExceptionHandler handler) {
this.uncaughtExceptionHandler = handler;
}

protected StatsLogger getStatsLogger() {
return statsLogger;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,19 @@ public void run() {
*/
public static CompletableFuture<Void> startComponent(LifecycleComponent component) {
CompletableFuture<Void> future = new CompletableFuture<>();
Runtime.getRuntime().addShutdownHook(new Thread(
new ComponentShutdownHook(component, future), "component-shutdown-thread"));
final Thread shutdownHookThread = new Thread(
new ComponentShutdownHook(component, future),
"component-shutdown-thread"
);

// register a shutdown hook
Runtime.getRuntime().addShutdownHook(shutdownHookThread);

// register a component exception handler
component.setExceptionHandler((t, e) -> {
// start the shutdown hook when an uncaught exception happen in the lifecycle component.
shutdownHookThread.start();
});

log.info("Starting component {}.", component.getName());
component.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.bookkeeper.common.component;

import java.lang.Thread.UncaughtExceptionHandler;

/**
* A component based on lifecycle management.
*/
Expand All @@ -36,4 +38,13 @@ public interface LifecycleComponent extends AutoCloseable {
void stop();

void close();

/**
* Set the default handler invoked when a lifecycle component
* abruptly terminates due an uncaught exception.
*
* @param handler handler invoked when an uncaught exception happens
* in the lifecycle component.
*/
void setExceptionHandler(UncaughtExceptionHandler handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;

/**
Expand Down Expand Up @@ -121,4 +122,9 @@ public void stop() {
public void close() {
components.reverse().forEach(component -> component.close());
}

@Override
public void setExceptionHandler(UncaughtExceptionHandler handler) {
components.forEach(component -> component.setExceptionHandler(handler));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

package org.apache.bookkeeper.common.component;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.component.ComponentStarter.ComponentShutdownHook;
import org.junit.Test;

Expand Down Expand Up @@ -50,4 +56,30 @@ public void testComponentShutdownHook() throws Exception {
future.get();
}

@Test
public void testExceptionHandler() throws Exception {
// prepare a mock lifecycle component
LifecycleComponent component = mock(LifecycleComponent.class);
when(component.getName()).thenReturn("test-exception-handler");
AtomicReference<UncaughtExceptionHandler> exceptionHandlerRef = new AtomicReference<>();
doAnswer(invocationOnMock -> {
UncaughtExceptionHandler handler = invocationOnMock.getArgument(0);
exceptionHandlerRef.set(handler);
return null;
}).when(component).setExceptionHandler(any(UncaughtExceptionHandler.class));

// start the future
CompletableFuture<Void> startFuture = ComponentStarter.startComponent(component);
verify(component, times(1)).start();
verify(component, times(1)).setExceptionHandler(eq(exceptionHandlerRef.get()));

// if an exception is signaled through exception handler,
// the startFuture will be completed and the component will be shutdown
exceptionHandlerRef.get().uncaughtException(
Thread.currentThread(), new Exception("test-exception-handler"));

startFuture.get();
verify(component, times(1)).close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@
package org.apache.bookkeeper.common.component;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

/**
Expand Down Expand Up @@ -113,4 +120,54 @@ public void testStartStopClose() {
verify(component2).close();
}

@Test
public void testSetExceptionHandler() {
LifecycleComponent component1 = mock(LifecycleComponent.class);
LifecycleComponent component2 = mock(LifecycleComponent.class);

LifecycleComponentStack stack = LifecycleComponentStack.newBuilder()
.withName("set-exception-handler-stack")
.addComponent(component1)
.addComponent(component2)
.build();

UncaughtExceptionHandler handler = mock(UncaughtExceptionHandler.class);

stack.setExceptionHandler(handler);
verify(component1, times(1)).setExceptionHandler(eq(handler));
verify(component2, times(1)).setExceptionHandler(eq(handler));
}

@Test
public void testExceptionHandlerShutdownLifecycleComponentStack() throws Exception {
LifecycleComponent component1 = mock(LifecycleComponent.class);
LifecycleComponent component2 = mock(LifecycleComponent.class);
AtomicReference<UncaughtExceptionHandler> handlerRef1 = new AtomicReference<>();
doAnswer(invocationOnMock -> {
handlerRef1.set(invocationOnMock.getArgument(0));
return null;
}).when(component1).setExceptionHandler(any(UncaughtExceptionHandler.class));

LifecycleComponentStack stack = LifecycleComponentStack.newBuilder()
.withName("exception-handler-shutdown-lifecycle-component-stack")
.addComponent(component1)
.addComponent(component2)
.build();

CompletableFuture<Void> startFuture = ComponentStarter.startComponent(stack);
verify(component1, times(1)).start();
verify(component1, times(1)).setExceptionHandler(eq(handlerRef1.get()));
verify(component2, times(1)).start();
verify(component2, times(1)).setExceptionHandler(eq(handlerRef1.get()));

// if an exception is signaled through any component,
// the startFuture will be completed and all the components will be shutdown
handlerRef1.get().uncaughtException(
Thread.currentThread(), new Exception("exception-handler-shutdown-lifecycle-component-stack"));

startFuture.get();
verify(component1, times(1)).close();
verify(component2, times(1)).close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,26 @@
@SuppressWarnings("serial")
public abstract class BookieException extends Exception {

private int code;
private final int code;

public BookieException(int code) {
super();
this.code = code;
}

public BookieException(int code, Throwable t) {
super(t);
this.code = code;
}

public BookieException(int code, String reason) {
super(reason);
this.code = code;
}

public BookieException(int code, String reason, Throwable t) {
super(reason, t);
this.code = code;
}

public static BookieException create(int code) {
Expand Down Expand Up @@ -85,10 +90,6 @@ public interface Code {
int OperationRejectedException = -108;
}

public void setCode(int code) {
this.code = code;
}

public int getCode() {
return this.code;
}
Expand Down Expand Up @@ -299,4 +300,5 @@ public UnknownBookieIdException(Throwable cause) {
super(Code.UnknownBookieIdException, cause);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
Expand Down Expand Up @@ -73,6 +74,9 @@ public class BookieServer {
// Expose Stats
private final StatsLogger statsLogger;

// Exception handler
private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;

public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException, SecurityException {
Expand Down Expand Up @@ -110,6 +114,18 @@ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
this.nettyServer.setRequestProcessor(this.requestProcessor);
}

/**
* Currently the uncaught exception handler is used for DeathWatcher to notify
* lifecycle management that a bookie is dead for some reasons.
*
* <p>in future, we can register this <tt>exceptionHandler</tt> to critical threads
* so when those threads are dead, it will automatically trigger lifecycle management
* to shutdown the process.
*/
public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler) {
this.uncaughtExceptionHandler = exceptionHandler;
}

protected Bookie newBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
return conf.isForceReadOnlyBookie()
Expand All @@ -128,6 +144,9 @@ public void start() throws IOException, UnavailableException, InterruptedExcepti

running = true;
deathWatcher = new DeathWatcher(conf);
if (null != uncaughtExceptionHandler) {
deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
deathWatcher.start();

// fixes test flappers at random places until ISSUE#1400 is resolved
Expand Down Expand Up @@ -236,6 +255,13 @@ private class DeathWatcher extends BookieCriticalThread {
DeathWatcher(ServerConfiguration conf) {
super("BookieDeathWatcher-" + conf.getBookiePort());
watchInterval = conf.getDeathWatchInterval();
// set a default uncaught exception handler to shutdown the bookie server
// when it notices the bookie is not running any more.
setUncaughtExceptionHandler((thread, cause) -> {
LOG.info("BookieDeathWatcher exited loop due to uncaught exception from thread {}",
thread.getName(), cause);
shutdown();
});
}

@Override
Expand All @@ -248,11 +274,13 @@ public void run() {
Thread.currentThread().interrupt();
}
if (!isBookieRunning()) {
shutdown();
break;
LOG.info("BookieDeathWatcher noticed the bookie is not running any more, exiting the watch loop!");
// death watcher has noticed that bookie is not running any more
// throw an exception to fail the death watcher thread and it will
// trigger the uncaught exception handler to handle this "bookie not running" situation.
throw new RuntimeException("Bookie is not running any more");
}
}
LOG.info("BookieDeathWatcher exited loop!");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.server.service;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
Expand All @@ -42,6 +43,11 @@ public BookieService(BookieConfiguration conf,
this.server = new BookieServer(conf.getServerConf(), statsLogger);
}

@Override
public void setExceptionHandler(UncaughtExceptionHandler handler) {
server.setExceptionHandler(handler);
}

public BookieServer getServer() {
return server;
}
Expand Down
Loading

0 comments on commit 50f29ed

Please sign in to comment.