diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 5ad8d4feba8..6f8f1c30c73 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -63,7 +63,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -87,7 +86,6 @@ import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.meta.LedgerMetadataSerDe; import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; -import org.apache.bookkeeper.meta.UnderreplicatedLedger; import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; @@ -95,9 +93,8 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.replication.AuditorElector; import org.apache.bookkeeper.replication.ReplicationException; -import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; -import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand; import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand; @@ -912,58 +909,13 @@ int runCmd(CommandLine cmdLine) throws Exception { final boolean printMissingReplica = cmdLine.hasOption("printmissingreplica"); final boolean printReplicationWorkerId = cmdLine.hasOption("printreplicationworkerid"); - final Predicate> predicate; - if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) { - predicate = replicasList -> (replicasList.contains(includingBookieId) - && !replicasList.contains(excludingBookieId)); - } else if (!StringUtils.isBlank(includingBookieId)) { - predicate = replicasList -> replicasList.contains(includingBookieId); - } else if (!StringUtils.isBlank(excludingBookieId)) { - predicate = replicasList -> !replicasList.contains(excludingBookieId); - } else { - predicate = null; - } - - runFunctionWithLedgerManagerFactory(bkConf, mFactory -> { - LedgerUnderreplicationManager underreplicationManager; - try { - underreplicationManager = mFactory.newLedgerUnderreplicationManager(); - } catch (KeeperException | CompatibilityException e) { - throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e); - } - Iterator iter = underreplicationManager.listLedgersToRereplicate(predicate); - while (iter.hasNext()) { - UnderreplicatedLedger underreplicatedLedger = iter.next(); - long urLedgerId = underreplicatedLedger.getLedgerId(); - System.out.println(ledgerIdFormatter.formatLedgerId(urLedgerId)); - long ctime = underreplicatedLedger.getCtime(); - if (ctime != UnderreplicatedLedger.UNASSIGNED_CTIME) { - System.out.println("\tCtime : " + ctime); - } - if (printMissingReplica) { - underreplicatedLedger.getReplicaList().forEach((missingReplica) -> { - System.out.println("\tMissingReplica : " + missingReplica); - }); - } - if (printReplicationWorkerId) { - try { - String replicationWorkerId = underreplicationManager - .getReplicationWorkerIdRereplicatingLedger(urLedgerId); - if (replicationWorkerId != null) { - System.out.println("\tReplicationWorkerId : " + replicationWorkerId); - } - } catch (UnavailableException e) { - LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId, - e.getMessage()); - } - } - } - return null; - }); - + ListUnderReplicatedCommand.LURFlags flags = new ListUnderReplicatedCommand.LURFlags() + .missingReplica(includingBookieId) + .excludingMissingReplica(excludingBookieId) + .printMissingReplica(printMissingReplica) + .printReplicationWorkerId(printReplicationWorkerId); + ListUnderReplicatedCommand cmd = new ListUnderReplicatedCommand(ledgerIdFormatter); + cmd.apply(bkConf, flags); return 0; } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java new file mode 100644 index 00000000000..1142d31c66e --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory; + +import com.beust.jcommander.Parameter; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.meta.exceptions.MetadataException; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommand; +import org.apache.bookkeeper.tools.framework.CliFlags; +import org.apache.bookkeeper.tools.framework.CliSpec; +import org.apache.bookkeeper.util.LedgerIdFormatter; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Command to listing under replicated ledgers. + */ +public class ListUnderReplicatedCommand extends BookieCommand { + + static final Logger LOG = LoggerFactory.getLogger(ListUnderReplicatedCommand.class); + + private static final String NAME = "listunderreplicated"; + private static final String DESC = "List ledgers marked as underreplicated, with oprional options to specify " + + "missingreplica (BookieId) and to exclude missingreplica."; + private static final String DEFAULT = ""; + + private LedgerIdFormatter ledgerIdFormatter; + + public ListUnderReplicatedCommand() { + this(new LURFlags()); + } + + public ListUnderReplicatedCommand(LedgerIdFormatter ledgerIdFormatter) { + this(); + this.ledgerIdFormatter = ledgerIdFormatter; + } + + private ListUnderReplicatedCommand(LURFlags flags) { + super(CliSpec.newBuilder() + .withName(NAME) + .withDescription(DESC) + .withFlags(flags) + .build()); + } + + /** + * Flags for list under replicated command. + */ + @Accessors(fluent = true) + @Setter + public static class LURFlags extends CliFlags{ + + @Parameter(names = { "-pmr", "--printmissingreplica" }, description = "Whether to print missingreplicas list?") + private boolean printMissingReplica; + + @Parameter(names = { "-prw", + "--printreplicationworkerid" }, description = "Whether wo print replicationworkerid?") + private boolean printReplicationWorkerId; + + @Parameter(names = { "-mr", "--missingreplica" }, description = "Bookie Id of missing replica") + private String missingReplica = DEFAULT; + + @Parameter(names = { "-emr", "--excludingmissingreplica" }, description = "Bookie Id of missing replica to " + + "ignore") + private String excludingMissingReplica = DEFAULT; + + @Parameter(names = {"-l", "--ledgeridformatter"}, description = "Set ledger id formatter") + private String ledgerIdFormatter = DEFAULT; + } + + @Override + public boolean apply(ServerConfiguration conf, LURFlags cmdFlags) { + if (!cmdFlags.ledgerIdFormatter.equals(DEFAULT) && ledgerIdFormatter == null) { + ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(cmdFlags.ledgerIdFormatter, conf); + } else if (ledgerIdFormatter == null) { + ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf); + } + try { + return handler(conf, cmdFlags); + } catch (Exception e) { + throw new UncheckedExecutionException(e.getMessage(), e); + } + } + + public boolean handler(ServerConfiguration bkConf, LURFlags flags) throws MetadataException, ExecutionException { + final String includingBookieId = flags.missingReplica; + final String excludingBookieId = flags.excludingMissingReplica; + final boolean printMissingReplica = flags.printMissingReplica; + final boolean printReplicationWorkerId = flags.printReplicationWorkerId; + + final Predicate> predicate; + if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) { + predicate = replicasList -> (replicasList.contains(includingBookieId) + && !replicasList.contains(excludingBookieId)); + } else if (!StringUtils.isBlank(includingBookieId)) { + predicate = replicasList -> replicasList.contains(includingBookieId); + } else if (!StringUtils.isBlank(excludingBookieId)) { + predicate = replicasList -> !replicasList.contains(excludingBookieId); + } else { + predicate = null; + } + + runFunctionWithLedgerManagerFactory(bkConf, mFactory -> { + LedgerUnderreplicationManager underreplicationManager; + try { + underreplicationManager = mFactory.newLedgerUnderreplicationManager(); + } catch (KeeperException | ReplicationException.CompatibilityException e) { + throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e); + } + Iterator iter = underreplicationManager.listLedgersToRereplicate(predicate); + while (iter.hasNext()) { + UnderreplicatedLedger underreplicatedLedger = iter.next(); + long urLedgerId = underreplicatedLedger.getLedgerId(); + System.out.println(ledgerIdFormatter.formatLedgerId(urLedgerId)); + long ctime = underreplicatedLedger.getCtime(); + if (ctime != UnderreplicatedLedger.UNASSIGNED_CTIME) { + System.out.println("\tCtime : " + ctime); + } + if (printMissingReplica) { + underreplicatedLedger.getReplicaList().forEach((missingReplica) -> { + System.out.println("\tMissingReplica : " + missingReplica); + }); + } + if (printReplicationWorkerId) { + try { + String replicationWorkerId = underreplicationManager + .getReplicationWorkerIdRereplicatingLedger(urLedgerId); + if (replicationWorkerId != null) { + System.out.println("\tReplicationWorkerId : " + replicationWorkerId); + } + } catch (ReplicationException.UnavailableException e) { + LOG.error("Failed to get ReplicationWorkerId rereplicating ledger {} -- {}", urLedgerId, + e.getMessage()); + } + } + } + return null; + }); + return true; + } + +} + diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/package-info.java index fa1bbbff0a1..9294bf9ba3a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/package-info.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/package-info.java @@ -18,6 +18,6 @@ */ /** - * This package provides all toggle commands. + * This package provides all autorecovery commands. */ package org.apache.bookkeeper.tools.cli.commands.autorecovery; \ No newline at end of file diff --git a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/AutoRecoveryCommandGroup.java b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/AutoRecoveryCommandGroup.java index d59412b3aa7..9cee7ff40a0 100644 --- a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/AutoRecoveryCommandGroup.java +++ b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/AutoRecoveryCommandGroup.java @@ -20,6 +20,7 @@ import static org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_INFRA_SERVICE; +import org.apache.bookkeeper.tools.cli.commands.autorecovery.ListUnderReplicatedCommand; import org.apache.bookkeeper.tools.cli.commands.autorecovery.LostBookieRecoveryDelayCommand; import org.apache.bookkeeper.tools.common.BKFlags; import org.apache.bookkeeper.tools.framework.CliCommandGroup; @@ -37,11 +38,11 @@ public class AutoRecoveryCommandGroup extends CliCommandGroup { .withName(NAME) .withDescription(DESC) .withCategory(CATEGORY_INFRA_SERVICE) + .addCommand(new ListUnderReplicatedCommand()) .addCommand(new LostBookieRecoveryDelayCommand()) .build(); public AutoRecoveryCommandGroup() { super(spec); } -} - +} \ No newline at end of file diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java new file mode 100644 index 00000000000..5fefe585026 --- /dev/null +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.tools.cli.commands.autorecovery; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +import java.util.ArrayList; +import java.util.Vector; +import java.util.function.Function; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManagerFactory; +import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.meta.UnderreplicatedLedger; +import org.apache.bookkeeper.replication.ReplicationException; +import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; +import org.apache.zookeeper.KeeperException; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Unit test for {@link ListUnderReplicatedCommand}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ListUnderReplicatedCommand.class, MetadataDrivers.class, UnderreplicatedLedger.class }) +public class ListUnderReplicatedCommandTest extends BookieCommandTestBase { + + private UnderreplicatedLedger ledger; + private LedgerManagerFactory factory; + private LedgerUnderreplicationManager underreplicationManager; + + public ListUnderReplicatedCommandTest() { + super(3, 0); + } + + @Override + public void setup() throws Exception { + super.setup(); + + PowerMockito.whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf); + + PowerMockito.mockStatic(MetadataDrivers.class); + factory = mock(LedgerManagerFactory.class); + PowerMockito.doAnswer(invocationOnMock -> { + Function function = invocationOnMock.getArgument(1); + function.apply(factory); + return true; + }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class), + any(Function.class)); + + underreplicationManager = mock(LedgerUnderreplicationManager.class); + when(factory.newLedgerUnderreplicationManager()).thenReturn(underreplicationManager); + + ledger = mock(UnderreplicatedLedger.class); + when(ledger.getLedgerId()).thenReturn(1L); + when(ledger.getCtime()).thenReturn(1L); + + Vector ledgers = new Vector<>(); + ledgers.add(ledger); + + when(underreplicationManager.listLedgersToRereplicate(any())).thenReturn(ledgers.iterator()); + + } + + @Test + public void testWithoutArgs() + throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + testCommand(""); + verify(factory, times(1)).newLedgerUnderreplicationManager(); + verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); + verify(ledger, times(1)).getLedgerId(); + verify(ledger, times(1)).getCtime(); + } + + @Test + public void testMissingReplica() + throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + testCommand("-mr", ""); + verify(factory, times(1)).newLedgerUnderreplicationManager(); + verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); + verify(ledger, times(1)).getLedgerId(); + verify(ledger, times(1)).getCtime(); + } + + @Test + public void testExcludingMissingReplica() + throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + testCommand("-emr", ""); + verify(factory, times(1)).newLedgerUnderreplicationManager(); + verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); + verify(ledger, times(1)).getLedgerId(); + verify(ledger, times(1)).getCtime(); + } + + @Test + public void testPrintMissingReplica() + throws InterruptedException, ReplicationException.CompatibilityException, KeeperException { + + ArrayList list = new ArrayList<>(); + list.add("replica"); + + when(ledger.getReplicaList()).thenReturn(list); + testCommand("-pmr"); + verify(factory, times(1)).newLedgerUnderreplicationManager(); + verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); + verify(ledger, times(1)).getLedgerId(); + verify(ledger, times(1)).getCtime(); + verify(ledger, times(1)).getReplicaList(); + } + + @Test + public void testPrintReplicationWorkerId() throws ReplicationException.UnavailableException, InterruptedException, + ReplicationException.CompatibilityException, KeeperException { + when(underreplicationManager.getReplicationWorkerIdRereplicatingLedger(1L)).thenReturn("test"); + + testCommand("-prw"); + verify(factory, times(1)).newLedgerUnderreplicationManager(); + verify(underreplicationManager, times(1)).listLedgersToRereplicate(any()); + verify(ledger, times(1)).getLedgerId(); + verify(ledger, times(1)).getCtime(); + verify(underreplicationManager, times(1)).getReplicationWorkerIdRereplicatingLedger(1L); + } + + @Test + public void testCommand1() { + ListUnderReplicatedCommand cmd = new ListUnderReplicatedCommand(); + cmd.apply(bkFlags, new String[] { "" }); + } + + private void testCommand(String... args) { + ListUnderReplicatedCommand cmd = new ListUnderReplicatedCommand(); + Assert.assertTrue(cmd.apply(bkFlags, args)); + } + +} +