Skip to content

Commit

Permalink
BP-29 (task 4): use metadata service uri for constructing registratio…
Browse files Browse the repository at this point in the history
…n manager and ledger manager factory

Descriptions of the changes in this PR:

The main changes are at two places:

- Deprecating using ServerConfiguration#getRegistrationManagerClass(). Changed Bookie.java to use MetadataBookieDriver to getRegistrationManager().
- Use MetadataBookieDriver/MetadataClientDriver to retrieve ledger manager factory in Bookie.java / BookKeeper.java

The other changes are in-place changes for replacing using registration manager to using metadata drivers.

- provides a few util functions in `MetadataDrivers` to run `functions` with registration manager / ledger manager factory

Master Issue: #1123

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #1251 from sijie/use_metadata_service_uri_in_registration_manager
  • Loading branch information
sijie committed Mar 16, 2018
1 parent fa4e9d7 commit b53c0c3
Show file tree
Hide file tree
Showing 49 changed files with 1,206 additions and 1,138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
Expand Down Expand Up @@ -70,9 +71,11 @@
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
Expand All @@ -84,7 +87,6 @@
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
Expand Down Expand Up @@ -125,8 +127,7 @@ public class Bookie extends BookieCriticalThread {
LedgerDirsMonitor idxMonitor;

// Registration Manager for managing registration
RegistrationManager registrationManager;

protected final MetadataBookieDriver metadataDriver;

private int exitCode = ExitCode.OK;

Expand Down Expand Up @@ -227,30 +228,20 @@ public boolean accept(File dir, String name) {
}
}

@VisibleForTesting
public synchronized void setRegistrationManager(RegistrationManager rm) {
this.registrationManager = rm;
this.getStateManager().setRegistrationManager(rm);
}

@VisibleForTesting
public synchronized RegistrationManager getRegistrationManager() {
return this.registrationManager;
}

/**
* Check that the environment for the bookie is correct.
* This means that the configuration has stayed the same as the
* first run and the filesystem structure is up to date.
*/
private void checkEnvironment(RegistrationManager rm) throws BookieException, IOException {
private void checkEnvironment(MetadataBookieDriver metadataDriver)
throws BookieException, IOException {
List<File> allLedgerDirs = new ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
+ indexDirsManager.getAllLedgerDirs().size());
allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
if (indexDirsManager != ledgerDirsManager) {
allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
}
if (rm == null) { // exists only for testing, just make sure directories are correct
if (metadataDriver == null) { // exists only for testing, just make sure directories are correct

for (File journalDirectory : journalDirectories) {
checkDirectoryStructure(journalDirectory);
Expand All @@ -262,7 +253,7 @@ private void checkEnvironment(RegistrationManager rm) throws BookieException, IO
return;
}

checkEnvironmentWithStorageExpansion(conf, rm, journalDirectories, allLedgerDirs);
checkEnvironmentWithStorageExpansion(conf, metadataDriver, journalDirectories, allLedgerDirs);

checkIfDirsOnSameDiskPartition(allLedgerDirs);
checkIfDirsOnSameDiskPartition(journalDirectories);
Expand Down Expand Up @@ -402,9 +393,10 @@ private static void stampNewCookie(ServerConfiguration conf,

public static void checkEnvironmentWithStorageExpansion(
ServerConfiguration conf,
RegistrationManager rm,
MetadataBookieDriver metadataDriver,
List<File> journalDirectories,
List<File> allLedgerDirs) throws BookieException {
RegistrationManager rm = metadataDriver.getRegistrationManager();
try {
// 1. retrieve the instance id
String instanceId = rm.getClusterInstanceId();
Expand Down Expand Up @@ -631,24 +623,22 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
}

// instantiate zookeeper client to initialize ledger manager
this.registrationManager = instantiateRegistrationManager(conf);
checkEnvironment(this.registrationManager);
this.metadataDriver = instantiateMetadataDriver(conf);
checkEnvironment(this.metadataDriver);
try {
if (registrationManager != null) {
if (this.metadataDriver != null) {
// current the registration manager is zookeeper only
ledgerManagerFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
conf,
registrationManager.getLayoutManager());
ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
ledgerManager = ledgerManagerFactory.newLedgerManager();
} else {
ledgerManagerFactory = null;
ledgerManager = null;
}
} catch (IOException | InterruptedException e) {
} catch (MetadataException e) {
throw new MetadataStoreException("Failed to initialize ledger manager", e);
}
stateManager = new BookieStateManager(conf, statsLogger, registrationManager, ledgerDirsManager);
stateManager = new BookieStateManager(conf, statsLogger, metadataDriver, ledgerDirsManager);
// register shutdown handler using trigger mode
stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
// Initialise ledgerDirMonitor. This would look through all the
Expand Down Expand Up @@ -938,23 +928,31 @@ public void diskJustWritable(File disk) {
}

/**
* Instantiate the registration manager for the Bookie.
* Instantiate the metadata driver for the Bookie.
*/
private RegistrationManager instantiateRegistrationManager(ServerConfiguration conf) throws BookieException {
// Create the registration manager instance
Class<? extends RegistrationManager> managerCls;
private MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf) throws BookieException {
try {
managerCls = conf.getRegistrationManagerClass();
String metadataServiceUriStr = conf.getMetadataServiceUri();
if (null == metadataServiceUriStr) {
return null;
}

MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
URI.create(metadataServiceUriStr));
driver.initialize(
conf,
() -> {
stateManager.forceToUnregistered();
// schedule a re-register operation
stateManager.registerBookie(false);
},
statsLogger);
return driver;
} catch (MetadataException me) {
throw new MetadataStoreException("Failed to initialize metadata bookie driver", me);
} catch (ConfigurationException e) {
throw new BookieIllegalOpException(e);
}

RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
return manager.initialize(conf, () -> {
stateManager.forceToUnregistered();
// schedule a re-register operation
stateManager.registerBookie(false);
}, statsLogger);
}

/*
Expand Down Expand Up @@ -1072,8 +1070,8 @@ synchronized int shutdown(int exitCode) {

}
// Shutdown the ZK client
if (registrationManager != null) {
registrationManager.close();
if (metadataDriver != null) {
metadataDriver.close();
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Expand Down

0 comments on commit b53c0c3

Please sign in to comment.