Skip to content

Commit

Permalink
Migrate command readjournal
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

- Use `bkctl` execute `readjournal` command

### Motivation

- Migrate from bookieshelll




Reviewers: Jia Zhai <zhaijia@apache.org>, Sijie Guo <sijie@apache.org>

This closes #2000 from zymap/command-readjournal
  • Loading branch information
zymap authored and sijie committed Mar 29, 2019
1 parent 6c3f33f commit 9524a9f
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 180 deletions.
Expand Up @@ -116,8 +116,8 @@ public class Bookie extends BookieCriticalThread {
final HandleFactory handles;
final boolean entryLogPerLedgerEnabled;

static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
static final long METAENTRY_ID_FENCE_KEY = -0x2000;
public static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
public static final long METAENTRY_ID_FENCE_KEY = -0x2000;
public static final long METAENTRY_ID_FORCE_LEDGER = -0x4000;
static final long METAENTRY_ID_LEDGER_EXPLICITLAC = -0x8000;

Expand Down
Expand Up @@ -29,7 +29,6 @@
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -38,7 +37,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -51,7 +49,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Formatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -71,10 +68,10 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
Expand Down Expand Up @@ -107,10 +104,12 @@
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatUtil;
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListFilesOnDiscCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadJournalCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand;
Expand All @@ -122,7 +121,6 @@
import org.apache.bookkeeper.tools.cli.commands.cookie.UpdateCookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.EntryFormatter;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LedgerIdFormatter;
Expand Down Expand Up @@ -1437,55 +1435,25 @@ public int runCmd(CommandLine cmdLine) throws Exception {
return -1;
}

long journalId = -1L;
String filename = "";
try {
journalId = Long.parseLong(leftArgs[0]);
} catch (NumberFormatException nfe) {
filename = leftArgs[0];
}

boolean printMsg = false;
if (cmdLine.hasOption("m")) {
printMsg = true;
}

Journal journal = null;
if (getJournals().size() > 1) {
if (!cmdLine.hasOption("dir")) {
System.err.println("ERROR: invalid or missing journal directory");
printUsage();
return -1;
}

File journalDirectory = new File(cmdLine.getOptionValue("dir"));
for (Journal j : getJournals()) {
if (j.getJournalDirectory().equals(journalDirectory)) {
journal = j;
break;
}
}

if (journal == null) {
System.err.println("ERROR: journal directory not found");
printUsage();
return -1;
}
} else {
journal = getJournals().get(0);
}

long journalId;
try {
journalId = Long.parseLong(leftArgs[0]);
} catch (NumberFormatException nfe) {
// not a journal id
File f = new File(leftArgs[0]);
String name = f.getName();
if (!name.endsWith(".txn")) {
// not a journal file
System.err.println("ERROR: invalid journal file name " + leftArgs[0]);
printUsage();
return -1;
}
String idString = name.split("\\.")[0];
journalId = Long.parseLong(idString, 16);
}
// scan journal
scanJournal(journal, journalId, printMsg);
return 0;
ReadJournalCommand.ReadJournalFlags flags = new ReadJournalCommand.ReadJournalFlags().msg(printMsg)
.fileName(filename).journalId(journalId)
.dir(cmdLine.getOptionValue("dir"));
ReadJournalCommand cmd = new ReadJournalCommand(ledgerIdFormatter, entryFormatter);
boolean result = cmd.apply(bkConf, flags);
return result ? 0 : -1;
}

@Override
Expand Down Expand Up @@ -1626,7 +1594,6 @@ Options getOptions() {
}
}


/**
* Command to print help message.
*/
Expand Down Expand Up @@ -1803,7 +1770,6 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}


/**
* Print which node has the auditor lock.
*/
Expand Down Expand Up @@ -2493,7 +2459,6 @@ public interface UpdateLedgerNotifier {
void progress(long updated, long issued);
}


/**
* Convert bookie indexes from InterleavedStorage to DbLedgerStorage format.
*/
Expand Down Expand Up @@ -2898,29 +2863,6 @@ protected void scanEntryLog(long logId, EntryLogScanner scanner) throws IOExcept
entryLogger.scanEntryLog(logId, scanner);
}

private synchronized List<Journal> getJournals() throws IOException {
if (null == journals) {
journals = Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length);
int idx = 0;
for (File journalDir : bkConf.getJournalDirs()) {
journals.add(new Journal(idx++, new File(journalDir, BookKeeperConstants.CURRENT_DIR), bkConf,
new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()))));
}
}
return journals;
}

/**
* Scan journal file.
*
* @param journalId Journal File Id
* @param scanner Journal File Scanner
*/
protected void scanJournal(Journal journal, long journalId, JournalScanner scanner) throws IOException {
journal.scanJournal(journalId, 0L, scanner);
}

///
/// Bookie Shell Commands
///
Expand Down Expand Up @@ -2953,7 +2895,7 @@ public boolean accept(long ledgerId) {

@Override
public void process(long ledgerId, long startPos, ByteBuf entry) {
formatEntry(startPos, entry, printMsg);
FormatUtil.formatEntry(startPos, entry, printMsg, ledgerIdFormatter, entryFormatter);
}
});
}
Expand Down Expand Up @@ -2985,7 +2927,7 @@ public void process(long candidateLedgerId, long startPos, ByteBuf entry) {
if ((candidateLedgerId == entrysLedgerId) && (candidateLedgerId == ledgerId)
&& ((entrysEntryId == entryId) || (entryId == -1))) {
entryFound.setValue(true);
formatEntry(startPos, entry, printMsg);
FormatUtil.formatEntry(startPos, entry, printMsg, ledgerIdFormatter, entryFormatter);
}
}
});
Expand Down Expand Up @@ -3034,7 +2976,7 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry) {
*/
long entryEndPos = entryStartPos + entrySize + 4 - 1;
if (((rangeEndPos == -1) || (entryStartPos <= rangeEndPos)) && (rangeStartPos <= entryEndPos)) {
formatEntry(entryStartPos, entry, printMsg);
FormatUtil.formatEntry(entryStartPos, entry, printMsg, ledgerIdFormatter, entryFormatter);
entryFound.setValue(true);
}
}
Expand All @@ -3049,40 +2991,6 @@ public void process(long ledgerId, long entryStartPos, ByteBuf entry) {
}
}

/**
* Scan a journal file.
*
* @param journalId Journal File Id
* @param printMsg Whether printing the entry data.
*/
protected void scanJournal(Journal journal, long journalId, final boolean printMsg) throws Exception {
System.out.println("Scan journal " + journalId + " (" + Long.toHexString(journalId) + ".txn)");
scanJournal(journal, journalId, new JournalScanner() {
boolean printJournalVersion = false;

@Override
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {
if (!printJournalVersion) {
System.out.println("Journal Version : " + journalVersion);
printJournalVersion = true;
}
formatEntry(offset, Unpooled.wrappedBuffer(entry), printMsg);
}
});
}

/**
* Print last log mark.
*/
protected void printLastLogMark() throws IOException {
for (Journal journal : getJournals()) {
LogMark lastLogMark = journal.getLastLogMark().getCurMark();
System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "("
+ Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - "
+ lastLogMark.getLogFileOffset());
}
}

/**
* Format the entry into a readable format.
*
Expand All @@ -3102,70 +3010,6 @@ private void formatEntry(LedgerEntry entry, boolean printMsg) {
}
}

/**
* Format the message into a readable format.
*
* @param pos
* File offset of the message stored in entry log file
* @param recBuff
* Entry Data
* @param printMsg
* Whether printing the message body
*/
private void formatEntry(long pos, ByteBuf recBuff, boolean printMsg) {
int entrySize = recBuff.readableBytes();
long ledgerId = recBuff.readLong();
long entryId = recBuff.readLong();

System.out.println("--------- Lid=" + ledgerIdFormatter.formatLedgerId(ledgerId) + ", Eid=" + entryId
+ ", ByteOffset=" + pos + ", EntrySize=" + entrySize + " ---------");
if (entryId == Bookie.METAENTRY_ID_LEDGER_KEY) {
int masterKeyLen = recBuff.readInt();
byte[] masterKey = new byte[masterKeyLen];
recBuff.readBytes(masterKey);
System.out.println("Type: META");
System.out.println("MasterKey: " + bytes2Hex(masterKey));
System.out.println();
return;
}
if (entryId == Bookie.METAENTRY_ID_FENCE_KEY) {
System.out.println("Type: META");
System.out.println("Fenced");
System.out.println();
return;
}
// process a data entry
long lastAddConfirmed = recBuff.readLong();
System.out.println("Type: DATA");
System.out.println("LastConfirmed: " + lastAddConfirmed);
if (!printMsg) {
System.out.println();
return;
}
// skip digest checking
recBuff.skipBytes(8);
System.out.println("Data:");
System.out.println();
try {
byte[] ret = new byte[recBuff.readableBytes()];
recBuff.readBytes(ret);
entryFormatter.formatEntry(ret);
} catch (Exception e) {
System.out.println("N/A. Corrupted.");
}
System.out.println();
}

static String bytes2Hex(byte[] data) {
StringBuilder sb = new StringBuilder(data.length * 2);
Formatter formatter = new Formatter(sb);
for (byte b : data) {
formatter.format("%02x", b);
}
formatter.close();
return sb.toString();
}

private static int getOptionIntValue(CommandLine cmdLine, String option, int defaultVal) {
if (cmdLine.hasOption(option)) {
String val = cmdLine.getOptionValue(option);
Expand Down
Expand Up @@ -21,7 +21,7 @@

package org.apache.bookkeeper.bookie;

import static org.apache.bookkeeper.bookie.BookieShell.bytes2Hex;
import static org.apache.bookkeeper.tools.cli.commands.bookie.FormatUtil.bytes2Hex;

import io.netty.buffer.ByteBuf;
import java.io.Closeable;
Expand Down

0 comments on commit 9524a9f

Please sign in to comment.