Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BOOKKEEPER-855: handle session expire event in bookie #1

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 106 additions & 32 deletions bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,18 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
Expand All @@ -61,12 +65,10 @@
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -119,6 +121,7 @@ public class Bookie extends BookieCriticalThread {

// ZK registration path for this bookie
protected final String bookieRegistrationPath;
protected final String bookieReadonlyRegistrationPath;

private final LedgerDirsManager ledgerDirsManager;
private LedgerDirsManager indexDirsManager;
Expand All @@ -142,7 +145,11 @@ public class Bookie extends BookieCriticalThread {
final protected String zkBookieRegPath;
final protected String zkBookieReadOnlyPath;

final private AtomicBoolean zkRegistered = new AtomicBoolean(false);
final protected AtomicBoolean readOnly = new AtomicBoolean(false);
// executor to manage the state changes for a bookie.
final ExecutorService stateService = Executors.newSingleThreadExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateService needs to be shut down at bookie stop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat yup, that's a good catch. will fix it.

new ThreadFactoryBuilder().setNameFormat("BookieStateService-%d").build());

// Expose Stats
private final Counter writeBytes;
Expand Down Expand Up @@ -468,6 +475,8 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
this.bookieReadonlyRegistrationPath =
this.bookieRegistrationPath + BookKeeperConstants.READONLY;
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
Expand Down Expand Up @@ -512,7 +521,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
// ZK ephemeral node for this Bookie.
String myID = getMyId();
zkBookieRegPath = this.bookieRegistrationPath + myID;
zkBookieReadOnlyPath = this.bookieRegistrationPath + BookKeeperConstants.READONLY + "/" + myID;
zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;

// Expose Stats
writeBytes = statsLogger.getCounter(WRITE_BYTES);
Expand All @@ -522,7 +531,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
readEntryStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY);
addBytesStats = statsLogger.getOpStatsLogger(BOOKIE_ADD_ENTRY_BYTES);
readBytesStats = statsLogger.getOpStatsLogger(BOOKIE_READ_ENTRY_BYTES);
// 1 : up, 0 : readonly
// 1 : up, 0 : readonly, -1 : unregistered
statsLogger.registerGauge(SERVER_STATUS, new Gauge<Number>() {
@Override
public Number getDefaultValue() {
Expand All @@ -531,7 +540,7 @@ public Number getDefaultValue() {

@Override
public Number getSample() {
return readOnly.get() ? 0 : 1;
return zkRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
}
});
}
Expand All @@ -541,6 +550,7 @@ private String getMyId() throws UnknownHostException {
}

void readJournal() throws IOException, BookieException {
long startTs = MathUtils.now();
journal.replay(new JournalScanner() {
@Override
public void process(int journalVersion, long offset, ByteBuffer recBuff) throws IOException {
Expand Down Expand Up @@ -590,6 +600,8 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
}
}
});
long elapsedTs = MathUtils.now() - startTs;
LOG.info("Finished replaying journal in {} ms.", elapsedTs);
}

@Override
Expand Down Expand Up @@ -632,9 +644,9 @@ synchronized public void start() {
// if setting it in bookie thread, the watcher might run before bookie thread.
running = true;
try {
registerBookie(conf);
} catch (IOException e) {
LOG.error("Couldn't register bookie with zookeeper, shutting down", e);
registerBookie(true).get();
} catch (Exception e) {
LOG.error("Couldn't register bookie with zookeeper, shutting down : ", e);
shutdown(ExitCode.ZK_REG_FAIL);
}
}
Expand Down Expand Up @@ -800,19 +812,46 @@ public void process(WatchedEvent event) {
/**
* Register as an available bookie
*/
protected void registerBookie(ServerConfiguration conf) throws IOException {
protected Future<Void> registerBookie(final boolean throwException) {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
try {
doRegisterBookie();
} catch (IOException ioe) {
if (throwException) {
throw ioe;
} else {
LOG.error("Couldn't register bookie with zookeeper, shutting down : ", ioe);
triggerBookieShutdown(ExitCode.ZK_REG_FAIL);
}
}
return (Void)null;
}
});
}

protected void doRegisterBookie() throws IOException {
doRegisterBookie(readOnly.get() ? zkBookieReadOnlyPath : zkBookieRegPath);
}

private void doRegisterBookie(final String regPath) throws IOException {
if (null == zk) {
// zookeeper instance is null, means not register itself to zk
return;
}

zkRegistered.set(false);

// ZK ephemeral node for this Bookie.
try{
if (!checkRegNodeAndWaitExpired(zkBookieRegPath)) {
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
zk.create(zkBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
zk.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LOG.info("Registered myself in ZooKeeper at {}.", regPath);
}
zkRegistered.set(true);
} catch (KeeperException ke) {
LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
// Throw an IOException back up. This will cause the Bookie
Expand All @@ -832,14 +871,31 @@ protected void registerBookie(ServerConfiguration conf) throws IOException {
/**
* Transition the bookie from readOnly mode to writable
*/
private Future<Void> transitionToWritableMode() {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
doTransitionToWritableMode();
return null;
}
});
}

@VisibleForTesting
public void transitionToWritableMode() {
public void doTransitionToWritableMode() {
if (shuttingdown) {
return;
}
if (!readOnly.compareAndSet(true, false)) {
return;
}
LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
// change zookeeper state only when using zookeeper
if (null == zk) {
return;
}
try {
this.registerBookie(conf);
doRegisterBookie(zkBookieRegPath);
} catch (IOException e) {
LOG.warn("Error in transitioning back to writable mode : ", e);
transitionToReadOnlyMode();
Expand All @@ -863,12 +919,21 @@ public void transitionToWritableMode() {
/**
* Transition the bookie to readOnly mode
*/
private Future<Void> transitionToReadOnlyMode() {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() {
doTransitionToReadOnlyMode();
return (Void)null;
}
});
}

@VisibleForTesting
public void transitionToReadOnlyMode() {
public void doTransitionToReadOnlyMode() {
if (shuttingdown) {
return;
}

if (!readOnly.compareAndSet(false, true)) {
return;
}
Expand All @@ -882,22 +947,20 @@ public void transitionToReadOnlyMode() {
}
LOG.info("Transitioning Bookie to ReadOnly mode,"
+ " and will serve only read requests from clients!");
// change zookeeper state only when using zookeeper
if (null == zk) {
return;
}
try {
if (null == zk.exists(this.bookieRegistrationPath
+ BookKeeperConstants.READONLY, false)) {
if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
try {
zk.create(this.bookieRegistrationPath
+ BookKeeperConstants.READONLY, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// this node is just now created by someone.
}
}
if (!checkRegNodeAndWaitExpired(zkBookieReadOnlyPath)) {
// Create the readonly node
zk.create(zkBookieReadOnlyPath,
new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
doRegisterBookie(zkBookieReadOnlyPath);
try {
// Clear the current registered node
zk.delete(zkBookieRegPath, -1);
Expand Down Expand Up @@ -948,25 +1011,31 @@ public boolean isReadOnly() {
*
* @return zk client instance
*/
private ZooKeeper newZookeeper(ServerConfiguration conf) throws IOException, InterruptedException,
KeeperException {
private ZooKeeper newZookeeper(final ServerConfiguration conf)
throws IOException, InterruptedException, KeeperException {
Set<Watcher> watchers = new HashSet<Watcher>();
watchers.add(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!running) {
// do nothing until first registration
return;
}
// Check for expired connection.
if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
LOG.error("ZK client connection to the ZK server has expired!");
shutdown(ExitCode.ZK_EXPIRED);
if (event.getType().equals(EventType.None) &&
event.getState().equals(KeeperState.Expired)) {
zkRegistered.set(false);
// schedule a re-register operation
registerBookie(false);
}
}
});
return ZooKeeperClient.newBuilder()
.connectString(conf.getZkServers())
.sessionTimeoutMs(conf.getZkTimeout())
.watchers(watchers)
.operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
conf.getZkTimeout(), Integer.MAX_VALUE))
.operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
.build();
}

Expand All @@ -982,7 +1051,9 @@ public void run() {
journal.start();
// wait until journal quits
journal.join();
LOG.info("Journal thread quits.");
} catch (InterruptedException ie) {
LOG.warn("Interrupted on running journal thread : ", ie);
}
// if the journal thread quits due to shutting down, it is ok
if (!shuttingdown) {
Expand Down Expand Up @@ -1033,6 +1104,9 @@ synchronized int shutdown(int exitCode) {
// mark bookie as in shutting down progress
shuttingdown = true;

// Shutdown the state service
stateService.shutdown();

// Shutdown journal
journal.shutdown();
this.join();
Expand Down
Loading