Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.math.RoundingMode;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Files;
Expand Down Expand Up @@ -96,18 +95,19 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.AuditorSelector;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
Expand All @@ -125,7 +125,6 @@
import org.apache.bookkeeper.util.Tool;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
Expand All @@ -143,7 +142,6 @@
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1874,27 +1872,30 @@ String getUsage() {

@Override
int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeper zk = null;
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better give a name to the thread?

try {
String metadataServiceUri = bkConf.getMetadataServiceUri();
String zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataServiceUri));
zk = ZooKeeperClient.newBuilder()
.connectString(zkServers)
.sessionTimeoutMs(bkConf.getZkTimeout())
.build();
BookieSocketAddress bookieId = AuditorElector.getCurrentAuditor(bkConf, zk);
if (bookieId == null) {
LOG.info("No auditor elected");
return -1;
}
LOG.info("Auditor: " + getBookieSocketAddrStringRepresentation(bookieId));
ClientConfiguration bkClientConf = new ClientConfiguration(bkConf);
return MetadataDrivers.runFunctionWithMetadataClientDriver(bkClientConf, metadataClientDriver -> {
try (AuditorSelector selector = metadataClientDriver.getAuditorSelector("")) {
BookieSocketAddress bookieId = null;
try {
bookieId = selector.getCurrentAuditor();
} catch (MetadataException e) {
// it is in a closure, we can't throw checked exception, so rethrow it
// as runtime exception.
throw e.asRuntimeException();
}
if (bookieId == null) {
LOG.info("No auditor elected");
return -1;
}
LOG.info("Auditor: " + getBookieSocketAddrStringRepresentation(bookieId));
return 0;
}
}, executor);
} finally {
if (zk != null) {
zk.close();
}
executor.shutdown();
}

return 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
Expand All @@ -64,17 +63,18 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
import org.apache.bookkeeper.meta.AuditorSelector;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
Expand All @@ -96,19 +96,13 @@ public class BookKeeperAdmin implements AutoCloseable {
private static final Logger VERBOSE = LoggerFactory.getLogger("verbose");

// BookKeeper client instance
private BookKeeper bkc;
private final BookKeeper bkc;
private final boolean ownsBK;

// LedgerFragmentReplicator instance
private LedgerFragmentReplicator lfr;
private final LedgerFragmentReplicator lfr;

/*
* Random number generator used to choose an available bookie server to
* replicate data from a dead bookie.
*/
private Random rand = new Random();

private LedgerManagerFactory mFactory;
private final LedgerManagerFactory mFactory;

/*
* underreplicationManager is not initialized as part of constructor use its
Expand Down Expand Up @@ -189,6 +183,10 @@ public ClientConfiguration getConf() {
return bkc.getConf();
}

public BookKeeper getBkc() {
return bkc;
}

/**
* Gracefully release resources that this client uses.
*
Expand Down Expand Up @@ -1424,11 +1422,15 @@ public void triggerAudit()
throw new UnavailableException("Autorecovery is disabled. So giving up!");
}

BookieSocketAddress auditorId =
AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle());
if (auditorId == null) {
LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
// we are using auditor selector to query the leader, so the `bookieId` here is not used anyway
try (AuditorSelector selector = bkc.getMetadataClientDriver().getAuditorSelector("")) {
BookieSocketAddress auditorId = selector.getCurrentAuditor();
if (auditorId == null) {
LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
}
} catch (MetadataException e) {
throw new UnavailableException("Failed to get current auditor from metadata store", e);
}

int previousLostBookieRecoveryDelayValue = urlManager.getLostBookieRecoveryDelay();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.meta;

import java.io.IOException;
import java.util.concurrent.Future;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieSocketAddress;

/**
* Interface for selecting an auditor.
*/
public interface AuditorSelector extends AutoCloseable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing the whole patch I suggest calling this interface 'AuditorElectionManager'


/**
* Selector listener to listen on auditor changes.
*/
interface SelectorListener {

/**
* Trigger when it is selected as a leader.
*/
void onLeaderSelected() throws IOException;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better 'election' over 'selection'

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is already AuditorElector. The selector interface is used by AuditorElector for selecting who is the leader. That's why we called it AuditorSelector rather than AuditorElector.

and selector is pretty standard in curator : https://curator.apache.org/curator-recipes/leader-election.html


/**
* Trigger on each selection attempt.
*/
void onSelectionAttempt();

/**
* Trigger when it lost its leadership.
*/
void onLeaderExpired();

}

/**
* Whether the selector is running or not.
*
* @return true if the selector is running, otherwise false.
*/
boolean isRunning();

/**
* Trigger the selecting process with the provided <tt>listener</tt>.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Election'

*
* @param listener the listener to listen on events triggered by selection.
* @return a future when the selector is running
*/
Future<?> select(SelectorListener listener)
throws MetadataException;

/**
* Get the current auditor.
*
* @return current auditor.
*/
BookieSocketAddress getCurrentAuditor() throws MetadataException;

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ LedgerManagerFactory getLedgerManagerFactory()
*/
LayoutManager getLayoutManager();

/**
* Return the auditor selector.
*
* @param bookieId bookie id
* @return the auditor selector.
*/
AuditorSelector getAuditorSelector(String bookieId);

@Override
void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public MetadataException(Code code, Throwable cause) {
super(cause);
this.code = code;
}

public MetadataRuntimeException asRuntimeException() {
return new MetadataRuntimeException(
code, getMessage(), getCause()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.bookkeeper.meta.exceptions;

/**
* Metadata Runtime Exception.
*/
public class MetadataRuntimeException extends RuntimeException {

private static final long serialVersionUID = -7833758146152388503L;

private final Code code;

public MetadataRuntimeException(Code code, String message) {
super(message);
this.code = code;
}

public MetadataRuntimeException(Code code, String message, Throwable cause) {
super(message, cause);
this.code = code;
}

public MetadataRuntimeException(Code code, Throwable cause) {
super(cause);
this.code = code;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void close() {
if (null != rmToClose) {
rmToClose.close();
}

super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.AuditorSelector;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.LayoutManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
Expand Down Expand Up @@ -250,16 +252,26 @@ public synchronized LedgerManagerFactory getLedgerManagerFactory()
return lmFactory;
}

@Override
public void close() {
if (null != lmFactory) {
public AuditorSelector getAuditorSelector(String bookieId) {
return new ZkAuditorSelector(bookieId, zk, new ServerConfiguration(conf));
}

protected void closeLedgerManagerFactory() {
LedgerManagerFactory lmToClose;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: lmFactoryToClose ?

synchronized (this) {
lmToClose = lmFactory;
lmFactory = null;
}
if (null != lmToClose) {
try {
lmFactory.close();
lmToClose.close();
} catch (IOException e) {
log.warn("Failed to close zookeeper based ledger manager", e);
}
lmFactory = null;
}
}

protected void closeZooKeeper() {
if (ownZKHandle && null != zk) {
try {
zk.close();
Expand All @@ -270,4 +282,10 @@ public void close() {
zk = null;
}
}

@Override
public void close() {
closeLedgerManagerFactory();
closeZooKeeper();
}
}
Loading