Skip to content

Commit

Permalink
BOOKKEEPER-500: Fencing doesn't work when restarting bookies. (sijie …
Browse files Browse the repository at this point in the history
…via ivank)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1423419 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
ivankelly committed Dec 18, 2012
1 parent e5b0dd0 commit d69986c
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ Trunk (unreleased changes)

BOOKKEEPER-496: Ensure that the auditor and replication worker will shutdown if they lose their ZK session (ivank)

BOOKKEEPER-500: Fencing doesn't work when restarting bookies. (sijie via ivank)

hedwig-protocol:

BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -89,6 +90,7 @@ public class Bookie extends Thread {
final HandleFactory handles;

static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
static final long METAENTRY_ID_FENCE_KEY = -0x2000;

// ZK registration path for this bookie
private final String bookieRegistrationPath;
Expand Down Expand Up @@ -155,6 +157,75 @@ public void writeComplete(int rc, long ledgerId, long entryId,
}
}

final static Future<Boolean> SUCCESS_FUTURE = new Future<Boolean>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) { return false; }
@Override
public Boolean get() { return true; }
@Override
public Boolean get(long timeout, TimeUnit unit) { return true; }
@Override
public boolean isCancelled() { return false; }
@Override
public boolean isDone() {
return true;
}
};

static class CountDownLatchFuture<T> implements Future<T> {

T value = null;
volatile boolean done = false;
CountDownLatch latch = new CountDownLatch(1);

@Override
public boolean cancel(boolean mayInterruptIfRunning) { return false; }
@Override
public T get() throws InterruptedException {
latch.await();
return value;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {
latch.await(timeout, unit);
return value;
}

@Override
public boolean isCancelled() { return false; }

@Override
public boolean isDone() {
return done;
}

void setDone(T value) {
this.value = value;
done = true;
latch.countDown();
}
}

static class FutureWriteCallback implements WriteCallback {

CountDownLatchFuture<Boolean> result =
new CountDownLatchFuture<Boolean>();

@Override
public void writeComplete(int rc, long ledgerId, long entryId,
InetSocketAddress addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
new Object[] { entryId, ledgerId, addr, rc });
}
result.setDone(0 == rc);
}

public Future<Boolean> getResult() {
return result;
}
}

/**
* SyncThread is a background thread which flushes ledger index pages periodically.
* Also it takes responsibility of garbage collecting journal files.
Expand Down Expand Up @@ -451,6 +522,19 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
+ " but layout version (" + journalVersion
+ ") is too old to hold this");
}
} else if (entryId == METAENTRY_ID_FENCE_KEY) {
if (journalVersion >= 4) {
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
key = ledgerStorage.readMasterKey(ledgerId);
}
LedgerDescriptor handle = handles.getHandle(ledgerId, key);
handle.setFenced();
} else {
throw new IOException("Invalid journal. Contains fenceKey "
+ " but layout version (" + journalVersion
+ ") is too old to hold this");
}
} else {
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
Expand Down Expand Up @@ -934,10 +1018,26 @@ public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] mast
* This method is idempotent. Once a ledger is fenced, it can
* never be unfenced. Fencing a fenced ledger has no effect.
*/
public void fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException {
public Future<Boolean> fenceLedger(long ledgerId, byte[] masterKey) throws IOException, BookieException {
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
boolean success;
synchronized (handle) {
handle.setFenced();
success = handle.setFenced();
}
if (success) {
// fenced first time, we should add the key to journal ensure we can rebuild
ByteBuffer bb = ByteBuffer.allocate(8 + 8);
bb.putLong(ledgerId);
bb.putLong(METAENTRY_ID_FENCE_KEY);
bb.flip();

FutureWriteCallback fwc = new FutureWriteCallback();
LOG.debug("record fenced state for ledger {} in journal.", ledgerId);
journal.logAddEntry(bb, fwc, null);
return fwc.getResult();
} else {
// already fenced
return SUCCESS_FUTURE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* <li>magic bytes: 4 bytes, 'BKLE', version: 4 bytes
* <li>len of master key: indicates length of master key. -1 means no master key stored in header.
* <li>master key: master key
* <li>state: bit map to indicate the state, 32 bits.
* </ul>
* <b>Index page</b> is a fixed-length page, which contains serveral entries which point to the offsets of data stored in entry loggers.
* </p>
Expand All @@ -52,6 +53,7 @@ class FileInfo {
static Logger LOG = LoggerFactory.getLogger(FileInfo.class);

static final int NO_MASTER_KEY = -1;
static final int STATE_FENCED_BIT = 0x1;

private FileChannel fc;
private File lf;
Expand All @@ -69,6 +71,10 @@ class FileInfo {
private boolean isClosed;
private long sizeSinceLastwrite;

// bit map for states of the ledger.
private int stateBits;
private boolean needFlushHeader = false;

// file access mode
protected String mode;

Expand Down Expand Up @@ -118,6 +124,8 @@ synchronized public void readHeader() throws IOException {
}
masterKey = new byte[length];
bb.get(masterKey);
stateBits = bb.getInt();
needFlushHeader = false;
} else {
throw new IOException("Ledger index file does not exist");
}
Expand Down Expand Up @@ -163,11 +171,43 @@ private void writeHeader() throws IOException {
bb.putInt(headerVersion);
bb.putInt(masterKey.length);
bb.put(masterKey);
bb.putInt(stateBits);
bb.rewind();
fc.position(0);
fc.write(bb);
}

synchronized public boolean isFenced() throws IOException {
checkOpen(false);
return (stateBits & STATE_FENCED_BIT) == STATE_FENCED_BIT;
}

/**
* @return true if set fence succeed, otherwise false when
* it already fenced or failed to set fenced.
*/
synchronized public boolean setFenced() throws IOException {
checkOpen(false);
LOG.debug("Try to set fenced state in file info {} : state bits {}.", lf, stateBits);
if ((stateBits & STATE_FENCED_BIT) != STATE_FENCED_BIT) {
// not fenced yet
stateBits |= STATE_FENCED_BIT;
needFlushHeader = true;
return true;
} else {
return false;
}
}

// flush the header when header is changed
synchronized public void flushHeader() throws IOException {
if (needFlushHeader) {
checkOpen(true);
writeHeader();
needFlushHeader = false;
}
}

synchronized public long size() throws IOException {
checkOpen(false);
long rc = size-START_OF_DATA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public void shutdown() throws InterruptedException {
}
}

@Override
public boolean setFenced(long ledgerId) throws IOException {
return ledgerCache.setFenced(ledgerId);
}

@Override
public boolean isFenced(long ledgerId) throws IOException {
return ledgerCache.isFenced(ledgerId);
}

@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
ledgerCache.setMasterKey(ledgerId, masterKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class JournalChannel implements Closeable {

int HEADER_SIZE = 8; // 4byte magic word, 4 byte version
int MIN_COMPAT_JOURNAL_FORMAT_VERSION = 1;
int CURRENT_JOURNAL_FORMAT_VERSION = 3;
int CURRENT_JOURNAL_FORMAT_VERSION = 4;

public final static long preAllocSize = 4*1024*1024;
public final static ByteBuffer zeros = ByteBuffer.allocate(512);
Expand Down Expand Up @@ -155,4 +155,4 @@ int read(ByteBuffer dst)
public void close() throws IOException {
fc.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
* head scheduling.
*/
interface LedgerCache extends Closeable {

boolean setFenced(long ledgerId) throws IOException;
boolean isFenced(long ledgerId) throws IOException;

void setMasterKey(long ledgerId, byte[] masterKey) throws IOException;
byte[] readMasterKey(long ledgerId) throws IOException, BookieException;
boolean ledgerExists(long ledgerId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ private void flushLedger(long l) throws IOException {
synchronized(this) {
HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
if (pageMap == null || pageMap.isEmpty()) {
FileInfo fi = null;
try {
fi = getFileInfo(l, null);
fi.flushHeader();
} finally {
if (null != fi) {
fi.release();
}
}
return;
}
firstEntryList = new LinkedList<Long>();
Expand Down Expand Up @@ -451,6 +460,8 @@ public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
});
ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
fi = getFileInfo(l, null);
// flush the header if necessary
fi.flushHeader();
int start = 0;
long lastOffset = -1;
for(int i = 0; i < entries.size(); i++) {
Expand Down Expand Up @@ -789,6 +800,38 @@ private void evictFileInfoIfNecessary() throws IOException {
}
}

@Override
public boolean setFenced(long ledgerId) throws IOException {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
if (null != fi) {
return fi.setFenced();
}
return false;
} finally {
if (null != fi) {
fi.release();
}
}
}

@Override
public boolean isFenced(long ledgerId) throws IOException {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
if (null != fi) {
return fi.isFenced();
}
return false;
} finally {
if (null != fi) {
fi.release();
}
}
}

@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
FileInfo fi = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ static LedgerDescriptor createReadOnly(long ledgerId,

abstract long getLedgerId();

abstract void setFenced() throws IOException;
abstract boolean isFenced();
abstract boolean setFenced() throws IOException;
abstract boolean isFenced() throws IOException;

abstract long addEntry(ByteBuffer entry) throws IOException;
abstract ByteBuffer readEntry(long entryId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
final LedgerStorage ledgerStorage;
private long ledgerId;

volatile private boolean fenced = false;
final byte[] masterKey;

LedgerDescriptorImpl(byte[] masterKey, long ledgerId, LedgerStorage ledgerStorage) {
Expand All @@ -60,13 +59,13 @@ public long getLedgerId() {
}

@Override
void setFenced() throws IOException {
fenced = true;
boolean setFenced() throws IOException {
return ledgerStorage.setFenced(ledgerId);
}

@Override
boolean isFenced() {
return fenced;
boolean isFenced() throws IOException {
return ledgerStorage.isFenced(ledgerId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class LedgerDescriptorReadOnlyImpl extends LedgerDescriptorImpl {
}

@Override
void setFenced() throws IOException {
boolean setFenced() throws IOException {
assert false;
throw new IOException("Invalid action on read only descriptor");
}
Expand Down
Loading

0 comments on commit d69986c

Please sign in to comment.