Skip to content

Commit

Permalink
refine broker starter command (#1023)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaijack authored and merlimat committed Jan 4, 2018
1 parent f5d2a4d commit 1fe28e7
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 55 deletions.
2 changes: 1 addition & 1 deletion bin/pulsar
Expand Up @@ -196,7 +196,7 @@ OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
cd "$PULSAR_HOME"
if [ $COMMAND == "broker" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-broker.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter $PULSAR_BROKER_CONF $@
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarBrokerStarter --broker-conf $PULSAR_BROKER_CONF $@
elif [ $COMMAND == "bookie" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"bookkeeper.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.bookkeeper.proto.BookieServer --conf $PULSAR_BOOKKEEPER_CONF $@
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Expand Up @@ -159,6 +159,12 @@ enablePersistentTopics=true
# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

// Enable to run bookie autorecovery along with broker
enableRunBookieAutoRecoveryTogether=false

### --- Authentication --- ###

# Enable TLS
Expand Down
Expand Up @@ -160,6 +160,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true;

// Enable to run bookie along with broker
private boolean enableRunBookieTogether = false;

// Enable to run bookie autorecovery along with broker
private boolean enableRunBookieAutoRecoveryTogether = false;

/***** --- TLS --- ****/
// Enable TLS
private boolean tlsEnabled = false;
Expand Down Expand Up @@ -291,7 +297,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
// Frequency of report to collect
private int loadBalancerHostUsageCheckIntervalMinutes = 1;
// Enable/disable automatic bundle unloading for load-shedding
// Enable/disable automatic bundle unloading for load-shedding
@FieldContext(dynamic = true)
private boolean loadBalancerSheddingEnabled = true;
// Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded
Expand Down Expand Up @@ -678,6 +684,22 @@ public void setEnableNonPersistentTopics(boolean enableNonPersistentTopics) {
this.enableNonPersistentTopics = enableNonPersistentTopics;
}

public boolean isEnableRunBookieTogether() {
return enableRunBookieTogether;
}

public void setEnableRunBookieTogether(boolean enableRunBookieTogether) {
this.enableRunBookieTogether = enableRunBookieTogether;
}

public boolean isEnableRunBookieAutoRecoveryTogether() {
return enableRunBookieAutoRecoveryTogether;
}

public void setEnableRunBookieAutoRecoveryTogether(boolean enableRunBookieAutoRecoveryTogether) {
this.enableRunBookieAutoRecoveryTogether = enableRunBookieAutoRecoveryTogether;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
106 changes: 66 additions & 40 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
Expand Up @@ -28,17 +28,18 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.ea.agentloader.AgentLoader;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.util.Arrays;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.aspectj.weaver.loadtime.Agent;
Expand All @@ -57,16 +58,19 @@ private static ServiceConfiguration loadConfig(String configFile) throws Excepti
return config;
}

private static class BookieArguments {
@VisibleForTesting
private static class StarterArguments {
@Parameter(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
private String brokerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";

@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with broker")
@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
private boolean runBookie = false;

@Parameter(names = {"-ra", "--run-bookie-autorecovery"}, description = "Run Bookie Autorecovery together with broker")
private boolean runBookieAutoRecovery = false;

@Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
private String bookieConfigFile;
private String bookieConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";

@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
Expand All @@ -88,34 +92,62 @@ private static ServerConfiguration readBookieConfFile(String bookieConfigFile) t
return bookieConf;
}

private static class PulsarBookieStarter {
private static boolean argsContains(String[] args, String arg) {
return Arrays.asList(args).contains(arg);
}

private static class BrokerStarter {
private final ServiceConfiguration brokerConfig;
private final PulsarService pulsarService;
private final BookieServer bookieServer;
private final AutoRecoveryMain autoRecoveryMain;
private final StatsProvider bookieStatsProvider;
private final ServerConfiguration bookieConfig;

PulsarBookieStarter(String[] args) throws Exception{
BookieArguments bookieArguments = new BookieArguments();
JCommander jcommander = new JCommander(bookieArguments);
jcommander.setProgramName("PulsarBrokerStarter <broker.conf>");
BrokerStarter(String[] args) throws Exception{
StarterArguments starterArguments = new StarterArguments();
JCommander jcommander = new JCommander(starterArguments);
jcommander.setProgramName("PulsarBrokerStarter");

// parse args by jcommander
// parse args by JCommander
jcommander.parse(args);
if (bookieArguments.help) {
if (starterArguments.help) {
jcommander.usage();
System.exit(-1);
}
if ((bookieArguments.runBookie || bookieArguments.runBookieAutoRecovery)
&& isBlank(bookieArguments.bookieConfigFile)) {

// init broker config and pulsar service
if (isBlank(starterArguments.brokerConfigFile)) {
jcommander.usage();
throw new IllegalArgumentException("Need to specify a configuration file for broker");
} else {
brokerConfig = loadConfig(starterArguments.brokerConfigFile);
pulsarService = new PulsarService(brokerConfig);
}

// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
checkState(starterArguments.runBookie == false,
"runBookie should be false if has no argument specified");
starterArguments.runBookie = brokerConfig.isEnableRunBookieTogether();
}
if (!argsContains(args, "-ra") && !argsContains(args, "--run-bookie-autorecovery")) {
checkState(starterArguments.runBookieAutoRecovery == false,
"runBookieAutoRecovery should be false if has no argument specified");
starterArguments.runBookieAutoRecovery = brokerConfig.isEnableRunBookieAutoRecoveryTogether();
}

if ((starterArguments.runBookie || starterArguments.runBookieAutoRecovery)
&& isBlank(starterArguments.bookieConfigFile)) {
jcommander.usage();
throw new IllegalArgumentException("No configuration file for Bookie");
}

// init stats provider
if (bookieArguments.runBookie || bookieArguments.runBookieAutoRecovery) {
checkState(isNotBlank(bookieArguments.bookieConfigFile),
if (starterArguments.runBookie || starterArguments.runBookieAutoRecovery) {
checkState(isNotBlank(starterArguments.bookieConfigFile),
"No configuration file for Bookie");
bookieConfig = readBookieConfFile(bookieArguments.bookieConfigFile);
bookieConfig = readBookieConfFile(starterArguments.bookieConfigFile);
Class<? extends StatsProvider> statsProviderClass = bookieConfig.getStatsProviderClass();
bookieStatsProvider = ReflectionUtils.newInstance(statsProviderClass);
} else {
Expand All @@ -124,7 +156,7 @@ && isBlank(bookieArguments.bookieConfigFile)) {
}

// init bookie server
if (bookieArguments.runBookie) {
if (starterArguments.runBookie) {
checkNotNull(bookieConfig, "No ServerConfiguration for Bookie");
checkNotNull(bookieStatsProvider, "No Stats Provider for Bookie");
bookieServer = new BookieServer(bookieConfig, bookieStatsProvider.getStatsLogger(""));
Expand All @@ -133,7 +165,7 @@ && isBlank(bookieArguments.bookieConfigFile)) {
}

// init bookie AutorecoveryMain
if (bookieArguments.runBookieAutoRecovery) {
if (starterArguments.runBookieAutoRecovery) {
checkNotNull(bookieConfig, "No ServerConfiguration for Bookie Autorecovery");
autoRecoveryMain = new AutoRecoveryMain(bookieConfig);
} else {
Expand All @@ -154,9 +186,14 @@ public void start() throws Exception {
autoRecoveryMain.start();
log.info("started bookie autoRecoveryMain.");
}

pulsarService.start();
log.info("PulsarService started.");
}

public void join() throws InterruptedException {
pulsarService.waitUntilClosed();

if (bookieServer != null) {
bookieServer.join();
}
Expand All @@ -166,59 +203,48 @@ public void join() throws InterruptedException {
}

public void shutdown() {
pulsarService.getShutdownService().run();
log.info("Shut down broker service successfully.");

if (bookieStatsProvider != null) {
bookieStatsProvider.stop();
log.info("Shut down bookieStatsProvider successfully.");
}
if (bookieServer != null) {
bookieServer.shutdown();
log.info("Shut down bookieServer successfully.");
}
if (autoRecoveryMain != null) {
autoRecoveryMain.shutdown();
log.info("Shut down autoRecoveryMain successfully.");
}
}
}


public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("Need to specify a configuration file");
}

Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage(), exception);
});

String configFile = args[0];
ServiceConfiguration config = loadConfig(configFile);

// load aspectj-weaver agent for instrumentation
AgentLoader.loadAgentClass(Agent.class.getName(), null);

PulsarBookieStarter bookieStarter = new PulsarBookieStarter(Arrays.copyOfRange(args, 1, args.length));
bookieStarter.start();

@SuppressWarnings("resource")
final PulsarService service = new PulsarService(config);
BrokerStarter starter = new BrokerStarter(args);
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
service.getShutdownService().run();
log.info("Shut down broker service successfully.");
bookieStarter.shutdown();
starter.shutdown();
})
);

try {
service.start();
log.info("PulsarService started");
} catch (PulsarServerException e) {
starter.start();
} catch (Exception e) {
log.error("Failed to start pulsar service.", e);

Runtime.getRuntime().halt(1);
}

service.waitUntilClosed();

bookieStarter.join();
starter.join();
}

private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStarter.class);
Expand Down

0 comments on commit 1fe28e7

Please sign in to comment.