Skip to content

Commit

Permalink
Utility to rebuild interleaved storage index files
Browse files Browse the repository at this point in the history
We came across a case where the a ledger had been deleted from
zookeeper accidently. It was possible to recover the ledger metadata
from the zookeeper journal and old snapshots, but the bookies had
deleted the indices by this time. However, even if the index is
deleted, the data still exists in the entrylog.

This utility scans the entrylog to rebuild the index, thereby making
the ledger available again.

Author: Ivan Kelly <ivank@apache.org>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #1642 from ivankelly/regen-from-entrylogger
  • Loading branch information
ivankelly authored and sijie committed Sep 7, 2018
1 parent 40185e1 commit 3188933
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 35 deletions.
Expand Up @@ -629,17 +629,10 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
for (File journalDirectory : conf.getJournalDirs()) {
this.journalDirectories.add(getCurrentDirectory(journalDirectory));
}
DiskChecker diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker,
statsLogger.scope(LD_LEDGER_SCOPE));

File[] idxDirs = conf.getIndexDirs();
if (null == idxDirs) {
this.indexDirsManager = this.ledgerDirsManager;
} else {
this.indexDirsManager = new LedgerDirsManager(conf, idxDirs, diskChecker,
statsLogger.scope(LD_INDEX_SCOPE));
}
DiskChecker diskChecker = createDiskChecker(conf);
this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
this.ledgerDirsManager);

// instantiate zookeeper client to initialize ledger manager
this.metadataDriver = instantiateMetadataDriver(conf);
Expand Down Expand Up @@ -675,7 +668,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
}
}

if (null == idxDirs) {
if (ledgerDirsManager == indexDirsManager) {
this.idxMonitor = this.ledgerMonitor;
} else {
this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker, indexDirsManager);
Expand Down Expand Up @@ -1546,4 +1539,22 @@ public int getExitCode() {
return exitCode;
}

static DiskChecker createDiskChecker(ServerConfiguration conf) {
return new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
}

static LedgerDirsManager createLedgerDirsManager(ServerConfiguration conf, DiskChecker diskChecker,
StatsLogger statsLogger) {
return new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker, statsLogger);
}

static LedgerDirsManager createIndexDirsManager(ServerConfiguration conf, DiskChecker diskChecker,
StatsLogger statsLogger, LedgerDirsManager fallback) {
File[] idxDirs = conf.getIndexDirs();
if (null == idxDirs) {
return fallback;
} else {
return new LedgerDirsManager(conf, idxDirs, diskChecker, statsLogger);
}
}
}
Expand Up @@ -18,7 +18,7 @@

package org.apache.bookkeeper.bookie;

import static com.google.common.base.Charsets.UTF_8;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
Expand Down Expand Up @@ -51,6 +51,7 @@
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
Expand Down Expand Up @@ -187,6 +189,7 @@ public class BookieShell implements Tool {
static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage";
static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index";
static final String CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE = "regenerate-interleaved-storage-index-file";
static final String CMD_HELP = "help";

final ServerConfiguration bkConf = new ServerConfiguration();
Expand Down Expand Up @@ -2816,6 +2819,69 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}

/**
* Regenerate an index file for interleaved storage.
*/
class RegenerateInterleavedStorageIndexFile extends MyCommand {
Options opts = new Options();

public RegenerateInterleavedStorageIndexFile() {
super(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE);
Option ledgerOption = new Option("l", "ledgerIds", true,
"Ledger(s) whose index needs to be regenerated."
+ " Multiple can be specified, comma separated.");
ledgerOption.setRequired(true);
ledgerOption.setValueSeparator(',');
ledgerOption.setArgs(Option.UNLIMITED_VALUES);

opts.addOption(ledgerOption);
opts.addOption("dryRun", false,
"Process the entryLogger, but don't write anything.");
opts.addOption("password", true,
"The bookie stores the password in the index file, so we need it to regenerate. "
+ "This must match the value in the ledger metadata.");
opts.addOption("b64password", true,
"The password in base64 encoding, for cases where the password is not UTF-8.");
}

@Override
Options getOptions() {
return opts;
}

@Override
String getDescription() {
return "Regenerate an interleaved storage index file, from available entrylogger files.";
}

@Override
String getUsage() {
return CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE;
}

@Override
int runCmd(CommandLine cmdLine) throws Exception {
byte[] password;
if (cmdLine.hasOption("password")) {
password = cmdLine.getOptionValue("password").getBytes(UTF_8);
} else if (cmdLine.hasOption("b64password")) {
password = Base64.getDecoder().decode(cmdLine.getOptionValue("b64password"));
} else {
LOG.error("The password must be specified to regenerate the index file.");
return 1;
}
Set<Long> ledgerIds = Arrays.stream(cmdLine.getOptionValues("ledgerIds"))
.map((id) -> Long.parseLong(id)).collect(Collectors.toSet());
boolean dryRun = cmdLine.hasOption("dryRun");

LOG.info("=== Rebuilding index file for {} ===", ledgerIds);
ServerConfiguration conf = new ServerConfiguration(bkConf);
new InterleavedStorageRegenerateIndexOp(conf, ledgerIds, password).initiate(dryRun);
LOG.info("-- Done rebuilding index file for {} --", ledgerIds);
return 0;
}
}

final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();

{
Expand Down Expand Up @@ -2849,6 +2915,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new ConvertToInterleavedStorageCmd());
commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new RebuildDbLedgerLocationsIndexCmd());
commands.put(CMD_REGENERATE_INTERLEAVED_STORAGE_INDEX_FILE, new RegenerateInterleavedStorageIndexFile());
commands.put(CMD_HELP, new HelpCmd());
commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
Expand Down
@@ -0,0 +1,230 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Scan all entries in the entry log and rebuild the index file for one ledger.
*/
public class InterleavedStorageRegenerateIndexOp {
private static final Logger LOG = LoggerFactory.getLogger(InterleavedStorageRegenerateIndexOp.class);

private final ServerConfiguration conf;
private final Set<Long> ledgerIds;
private final byte[] masterKey;

public InterleavedStorageRegenerateIndexOp(ServerConfiguration conf, Set<Long> ledgerIds, byte[] password)
throws NoSuchAlgorithmException {
this.conf = conf;
this.ledgerIds = ledgerIds;
this.masterKey = DigestManager.generateMasterKey(password);
}

static class RecoveryStats {
long firstEntry = Long.MAX_VALUE;
long lastEntry = Long.MIN_VALUE;
long numEntries = 0;

void registerEntry(long entryId) {
numEntries++;
if (entryId < firstEntry) {
firstEntry = entryId;
}
if (entryId > lastEntry) {
lastEntry = entryId;
}
}

long getNumEntries() {
return numEntries;
}

long getFirstEntry() {
return firstEntry;
}

long getLastEntry() {
return lastEntry;
}
}

public void initiate(boolean dryRun) throws IOException {
LOG.info("Starting index rebuilding");

DiskChecker diskChecker = Bookie.createDiskChecker(conf);
LedgerDirsManager ledgerDirsManager = Bookie.createLedgerDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE);
LedgerDirsManager indexDirsManager = Bookie.createIndexDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager);
EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
final LedgerCache ledgerCache;
if (dryRun) {
ledgerCache = new DryRunLedgerCache();
} else {
ledgerCache = new LedgerCacheImpl(conf, new SnapshotMap<Long, Boolean>(),
indexDirsManager, NullStatsLogger.INSTANCE);
}

Set<Long> entryLogs = entryLogger.getEntryLogsSet();

int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
long startTime = System.nanoTime();

LOG.info("Scanning {} entry logs", totalEntryLogs);

Map<Long, RecoveryStats> stats = new HashMap<>();
for (long entryLogId : entryLogs) {
LOG.info("Scanning {}", entryLogId);
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
long entryId = entry.getLong(8);

stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId);

// Actual location indexed is pointing past the entry size
long location = (entryLogId << 32L) | (offset + 4);

if (LOG.isDebugEnabled()) {
LOG.debug("Rebuilding {}:{} at location {} / {}", ledgerId, entryId, location >> 32,
location & (Integer.MAX_VALUE - 1));
}

if (!ledgerCache.ledgerExists(ledgerId)) {
ledgerCache.setMasterKey(ledgerId, masterKey);
ledgerCache.setFenced(ledgerId);
}
ledgerCache.putEntryOffset(ledgerId, entryId, location);
}

@Override
public boolean accept(long ledgerId) {
return ledgerIds.contains(ledgerId);
}
});

ledgerCache.flushLedger(true);

++completedEntryLogs;
LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs,
totalEntryLogs);
}

LOG.info("Rebuilding indices done");
for (long ledgerId : ledgerIds) {
RecoveryStats ledgerStats = stats.get(ledgerId);
if (ledgerStats == null || ledgerStats.getNumEntries() == 0) {
LOG.info(" {} - No entries found", ledgerId);
} else {
LOG.info(" {} - Found {} entries, from {} to {}", ledgerId,
ledgerStats.getNumEntries(), ledgerStats.getFirstEntry(), ledgerStats.getLastEntry());
}
}
LOG.info("Total time: {}", DurationFormatUtils.formatDurationHMS(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)));
}


static class DryRunLedgerCache implements LedgerCache {
@Override
public void close() {
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
return false;
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
throw new UnsupportedOperationException();
}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
return false;
}
@Override
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
}
@Override
public long getEntryOffset(long ledger, long entry) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void flushLedger(boolean doAll) throws IOException {
}
@Override
public long getLastEntry(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Long getLastAddConfirmed(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void deleteLedger(long ledgerId) throws IOException {
}
@Override
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
}
@Override
public ByteBuf getExplicitLac(long ledgerId) {
throw new UnsupportedOperationException();
}
}
}

0 comments on commit 3188933

Please sign in to comment.