Skip to content

Commit

Permalink
ZOOKEEPER-2845: Apply commit log when restarting server.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Evans committed Feb 13, 2018
1 parent c9da2e2 commit ac8408a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 21 deletions.
41 changes: 27 additions & 14 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Expand Up @@ -201,30 +201,43 @@ public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
return sessionsWithTimeouts;
}


private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn){
addCommittedProposal(hdr, txn);
}
};

/**
* load the database from the disk onto memory and also add
* the transactions to the committedlog in memory.
* @return the last valid zxid on disk
* @throws IOException
*/
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
addCommittedProposal(r);
}
};

long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}


/**
* Fast forward the database adding transactions from the committed log into memory.
* @return the last valid zxid.
* @throws IOException
*/
public long fastForwardDataBase() throws IOException {
long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}

private void addCommittedProposal(TxnHeader hdr, Record txn) {
Request r = new Request(null, 0, hdr.getCxid(), hdr.getType(), null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
addCommittedProposal(r);
}

/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
Expand Down
24 changes: 17 additions & 7 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Expand Up @@ -512,14 +512,24 @@ public synchronized void shutdown(boolean fullyShutDown) {
firstProcessor.shutdown();
}

if (fullyShutDown && zkDb != null) {
zkDb.clear();
if (zkDb != null) {
if (fullyShutDown) {
zkDb.clear();
} else {
// else there is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot
try {
//This will fast forward the database to the latest recorded transactions
zkDb.fastForwardDataBase();
} catch (IOException e) {
LOG.error("Error updating DB", e);
zkDb.clear();
}
}
}
// else there is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot

unregisterJMX();
}
Expand Down
Expand Up @@ -136,6 +136,22 @@ public File getSnapDir() {
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
return fastForwardFromEdits(dt, sessions, listener);
}

/**
* This function will fast forward the server database to have the latest
* transactions in it. This is the same as restore, but only reads from
* the transaction logs and not restores from a snapshot.
* @param dt the datatree to write transactions to.
* @param sessions the sessions to be restored.
* @param listener the playback listener to run on the
* database transactions.
* @return the highest zxid restored.
* @throws IOException
*/
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
FileTxnLog txnLog = new FileTxnLog(dataDir);
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
Expand Down

0 comments on commit ac8408a

Please sign in to comment.