diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 3d78c334a92..ded7d142959 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -107,6 +107,7 @@ import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand; @@ -2548,42 +2549,10 @@ String getUsage() { @Override int runCmd(CommandLine cmdLine) throws Exception { - LOG.info("=== Converting to DbLedgerStorage ==="); - ServerConfiguration conf = new ServerConfiguration(bkConf); - - InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage(); - Bookie.mountLedgerStorageOffline(conf, interleavedStorage); - - DbLedgerStorage dbStorage = new DbLedgerStorage(); - Bookie.mountLedgerStorageOffline(conf, dbStorage); - - int convertedLedgers = 0; - for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId)); - } - - LedgerCache.LedgerIndexMetadata fi = interleavedStorage.readLedgerIndexMetadata(ledgerId); - - LedgerCache.PageEntriesIterable pages = interleavedStorage.getIndexEntries(ledgerId); - - long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, fi.fenced, fi.masterKey, pages); - if (LOG.isDebugEnabled()) { - LOG.debug(" -- done. fenced={} entries={}", fi.fenced, numberOfEntries); - } - - // Remove index from old storage - interleavedStorage.deleteLedger(ledgerId); - - if (++convertedLedgers % 1000 == 0) { - LOG.info("Converted {} ledgers", convertedLedgers); - } - } - - dbStorage.shutdown(); - interleavedStorage.shutdown(); - - LOG.info("---- Done Converting ----"); + ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand(); + ConvertToDBStorageCommand.CTDBFlags flags = new ConvertToDBStorageCommand.CTDBFlags(); + cmd.setLedgerIdFormatter(ledgerIdFormatter); + cmd.apply(bkConf, flags); return 0; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 3b5bf0114df..59ea9ec0374 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; +import lombok.Getter; import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; @@ -84,6 +85,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class); EntryLogger entryLogger; + @Getter LedgerCache ledgerCache; protected CheckpointSource checkpointSource; protected Checkpointer checkpointer; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java new file mode 100644 index 00000000000..dc48b6c277f --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommand.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.bookie; + +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.UncheckedExecutionException; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; +import org.apache.bookkeeper.bookie.LedgerCache; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; +import org.apache.bookkeeper.tools.framework.CliFlags; +import org.apache.bookkeeper.tools.framework.CliSpec; +import org.apache.bookkeeper.util.LedgerIdFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A command to convert bookie indexes from InterleavedStorage to DbLedgerStorage format. + */ +public class ConvertToDBStorageCommand extends BookieCommand { + + private static final Logger LOG = LoggerFactory.getLogger(ConvertToDBStorageCommand.class); + private static final String NAME = "converttodbstorage"; + private static final String DESC = "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format"; + private static final String NOT_INIT = "default formatter"; + + @Setter + private LedgerIdFormatter ledgerIdFormatter; + + public ConvertToDBStorageCommand() { + this(new CTDBFlags()); + } + public ConvertToDBStorageCommand(CTDBFlags flags) { + super(CliSpec.newBuilder().withFlags(flags).withName(NAME).withDescription(DESC).build()); + } + + /** + * Flags for this command. + */ + @Accessors(fluent = true) + @Setter + public static class CTDBFlags extends CliFlags { + @Parameter(names = { "-l", "--ledgeridformatter" }, description = "Set ledger id formatter") + private String ledgerIdFormatter = NOT_INIT; + } + + @Override + public boolean apply(ServerConfiguration conf, CTDBFlags cmdFlags) { + initLedgerIdFormatter(conf, cmdFlags); + try { + return handle(conf); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } + + private boolean handle(ServerConfiguration conf) throws Exception { + LOG.info("=== Converting to DbLedgerStorage ==="); + ServerConfiguration bkConf = new ServerConfiguration(conf); + + InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage(); + Bookie.mountLedgerStorageOffline(bkConf, interleavedStorage); + + DbLedgerStorage dbStorage = new DbLedgerStorage(); + Bookie.mountLedgerStorageOffline(bkConf, dbStorage); + + int convertedLedgers = 0; + for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId)); + } + + LedgerCache.LedgerIndexMetadata fi = interleavedStorage.readLedgerIndexMetadata(ledgerId); + + LedgerCache.PageEntriesIterable pages = interleavedStorage.getIndexEntries(ledgerId); + + long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, fi.fenced, fi.masterKey, pages); + if (LOG.isDebugEnabled()) { + LOG.debug(" -- done. fenced={} entries={}", fi.fenced, numberOfEntries); + } + + // Remove index from old storage + interleavedStorage.deleteLedger(ledgerId); + + if (++convertedLedgers % 1000 == 0) { + LOG.info("Converted {} ledgers", convertedLedgers); + } + } + + dbStorage.shutdown(); + interleavedStorage.shutdown(); + + LOG.info("---- Done Converting ----"); + return true; + } + + private void initLedgerIdFormatter(ServerConfiguration conf, CTDBFlags flags) { + if (this.ledgerIdFormatter != null) { + return; + } + if (flags.ledgerIdFormatter.equals(NOT_INIT)) { + this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf); + } else { + this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf); + } + } + +} diff --git a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java index 940d482f2d1..86129a4f7cf 100644 --- a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java +++ b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_INFRA_SERVICE; import org.apache.bookkeeper.tools.cli.BKCtl; +import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand; @@ -48,6 +49,7 @@ public class BookieCommandGroup extends CliCommandGroup { .addCommand(new FormatCommand()) .addCommand(new SanityTestCommand()) .addCommand(new LedgerCommand()) + .addCommand(new ConvertToDBStorageCommand()) .build(); public BookieCommandGroup() { diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java new file mode 100644 index 00000000000..cc370873af6 --- /dev/null +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToDBStorageCommandTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.bookie; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.doNothing; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.verifyNew; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.Iterator; +import java.util.Vector; +import java.util.stream.LongStream; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.InterleavedLedgerStorage; +import org.apache.bookkeeper.bookie.LedgerCache; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; +import org.apache.bookkeeper.conf.AbstractConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Unit test for {@link ConvertToDBStorageCommand}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ Bookie.class, ConvertToDBStorageCommand.class }) +public class ConvertToDBStorageCommandTest extends BookieCommandTestBase { + + private InterleavedLedgerStorage interleavedLedgerStorage; + private DbLedgerStorage dbStorage; + private LedgerCache.LedgerIndexMetadata metadata; + private LedgerCache.PageEntriesIterable entries; + + public ConvertToDBStorageCommandTest() { + super(3, 0); + } + + @Override + public void setup() throws Exception { + super.setup(); + + whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf); + whenNew(ServerConfiguration.class).withParameterTypes(AbstractConfiguration.class).withArguments(conf) + .thenReturn(conf); + + interleavedLedgerStorage = mock(InterleavedLedgerStorage.class); + whenNew(InterleavedLedgerStorage.class).withNoArguments().thenReturn(interleavedLedgerStorage); + doNothing().when(interleavedLedgerStorage).shutdown(); + when(interleavedLedgerStorage.getActiveLedgersInRange(anyLong(), anyLong())).thenReturn(this::getLedgerId); + metadata = mock(LedgerCache.LedgerIndexMetadata.class); + when(interleavedLedgerStorage.readLedgerIndexMetadata(anyLong())).thenReturn(metadata); + entries = mock(LedgerCache.PageEntriesIterable.class); + when(interleavedLedgerStorage.getIndexEntries(anyLong())).thenReturn(entries); + + dbStorage = mock(DbLedgerStorage.class); + whenNew(DbLedgerStorage.class).withNoArguments().thenReturn(dbStorage); + doNothing().when(dbStorage).shutdown(); + when(dbStorage.addLedgerToIndex(anyLong(), anyBoolean(), eq(new byte[0]), + any(LedgerCache.PageEntriesIterable.class))).thenReturn(1L); + + PowerMockito.mockStatic(Bookie.class); + PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf), eq(interleavedLedgerStorage))) + .thenReturn(PowerMockito.mock(InterleavedLedgerStorage.class)); + PowerMockito.when(Bookie.mountLedgerStorageOffline(eq(conf), eq(dbStorage))).thenReturn(dbStorage); + } + + private Iterator getLedgerId() { + Vector longs = new Vector<>(); + LongStream.range(0L, 10L).forEach(longs::add); + return longs.iterator(); + } + + @Test + public void testCTDB() { + ConvertToDBStorageCommand cmd = new ConvertToDBStorageCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" })); + + try { + verifyNew(ServerConfiguration.class, times(1)).withArguments(conf); + verifyNew(InterleavedLedgerStorage.class, times(1)).withNoArguments(); + verifyNew(DbLedgerStorage.class, times(1)).withNoArguments(); + + verify(interleavedLedgerStorage, times(10)).readLedgerIndexMetadata(anyLong()); + verify(interleavedLedgerStorage, times(10)).getIndexEntries(anyLong()); + verify(dbStorage, times(10)) + .addLedgerToIndex(anyLong(), anyBoolean(), any(), any(LedgerCache.PageEntriesIterable.class)); + verify(interleavedLedgerStorage, times(10)).deleteLedger(anyLong()); + + verify(dbStorage, times(1)).shutdown(); + verify(interleavedLedgerStorage, times(1)).shutdown(); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } +}