Skip to content

Commit

Permalink
Enable bookkeeper table service in pulsar standalone (#1922)
Browse files Browse the repository at this point in the history

This change is enabling bookkeeper table service component in pulsar standalone. So people can try out stateful functions in pulsar.

Signed-off-by: Sijie Guo <sijie@apache.org>
  • Loading branch information
sijie committed Jun 20, 2018
1 parent c69003d commit da48f42
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 28 deletions.
14 changes: 7 additions & 7 deletions pom.xml
Expand Up @@ -121,7 +121,7 @@ flexible messaging model and an intuitive client API.</description>
<testRealAWS>false</testRealAWS>
<testRetryCount>1</testRetryCount>

<bookkeeper.version>4.7.0</bookkeeper.version>
<bookkeeper.version>4.7.1</bookkeeper.version>
<zookeeper.version>3.5.4-beta</zookeeper.version>
<netty.version>4.1.22.Final</netty.version>
<storm.version>1.0.5</storm.version>
Expand Down Expand Up @@ -280,12 +280,12 @@ flexible messaging model and an intuitive client API.</description>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
<version>${bookkeeper.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-server</artifactId>
<version>${bookkeeper.version}</version>
</dependency>

<dependency>
Expand Down
Expand Up @@ -87,6 +87,12 @@ public class PulsarStandaloneStarter {
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";

@Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable stream storage")
private boolean noStreamStorage = false;

@Parameter(names = { "--stream-storage-port" }, description = "Local bookies stream storage port")
private int streamStoragePort = 4181;

@Parameter(names = { "-a", "--advertised-address" }, description = "Standalone broker advertised address")
private String advertisedAddress = null;

Expand Down Expand Up @@ -170,8 +176,9 @@ void start() throws Exception {

if (!onlyBroker) {
// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData, config.getAdvertisedAddress());
bkEnsemble.startStandalone();
bkEnsemble = new LocalBookkeeperEnsemble(
numOfBk, zkPort, bkPort, streamStoragePort, zkDir, bkDir, wipeData, config.getAdvertisedAddress());
bkEnsemble.startStandalone(!noStreamStorage);
}

if (noBroker) {
Expand All @@ -189,6 +196,7 @@ void start() throws Exception {
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + streamStoragePort);
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
config.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/run-unit-tests.sh
Expand Up @@ -25,11 +25,12 @@ rm -rf ./pulsar-dist
mkdir pulsar-dist
tar xfz ../all/target/apache-pulsar*bin.tar.gz -C pulsar-dist --strip-components 1

PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone --no-functions-worker > broker.log &
PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone --no-functions-worker --no-stream-storage > broker.log &
standalone_pid=$!;

PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone-ssl.conf pulsar-dist/bin/pulsar standalone \
--no-functions-worker \
--no-stream-storage \
--zookeeper-port 2191 --bookkeeper-port 3191 \
--zookeeper-dir data2/standalone/zookeeper --bookkeeper-dir \
data2/standalone/bookkeeper > broker-tls.log &
Expand Down
Expand Up @@ -572,7 +572,7 @@ public void testStateGetter() throws Exception {
"--namespace", namespace,
"--name", fnName,
"--key", "test-key",
"--storage-service-url", "127.0.0.1:4181"
"--storage-service-url", "bk://127.0.0.1:4181"
});

assertEquals(
Expand Down
6 changes: 0 additions & 6 deletions pulsar-client-tools/pom.xml
Expand Up @@ -88,12 +88,6 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- functions related dependencies (end) -->
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.commons.lang.StringUtils;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isBlank;
Expand Down Expand Up @@ -798,7 +797,7 @@ void runCmd() throws Exception {

try (StorageClient client = StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.addEndpoints(NetUtils.parseEndpoint(stateStorageServiceUrl))
.serviceUri(stateStorageServiceUrl)
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
Expand Down
6 changes: 0 additions & 6 deletions pulsar-functions/instance/pom.xml
Expand Up @@ -74,12 +74,6 @@
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Expand Up @@ -43,7 +43,6 @@
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
Expand Down Expand Up @@ -262,7 +261,7 @@ private void setupStateTable() throws Exception {

// TODO (sijie): use endpoint for now
StorageClientSettings settings = StorageClientSettings.newBuilder()
.addEndpoints(NetUtils.parseEndpoint(stateStorageServiceUrl))
.serviceUri(stateStorageServiceUrl)
.clientName("function-" + tableNs + "/" + tableName)
.build();

Expand Down
5 changes: 5 additions & 0 deletions pulsar-zookeeper-utils/pom.xml
Expand Up @@ -44,6 +44,11 @@
<artifactId>bookkeeper-server</artifactId>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-server</artifactId>
</dependency>

<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>prometheus-metrics-provider</artifactId>
Expand Down
Expand Up @@ -23,6 +23,7 @@

package org.apache.pulsar.zookeeper;

import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
import static org.apache.commons.io.FileUtils.cleanDirectory;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

Expand All @@ -33,17 +34,35 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -69,15 +88,27 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort)

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData) {
this(numberOfBookies, zkPort, bkBasePort, zkDataDirName, bkDataDirName, clearOldData, null);
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
}

public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData, String advertisedAddress) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}

public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int bkBasePort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
this.numberOfBookies = numberOfBookies;
this.HOSTPORT = "127.0.0.1:" + zkPort;
this.ZooKeeperDefaultPort = zkPort;
this.initialPort = bkBasePort;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
this.clearOldData = clearOldData;
Expand All @@ -100,6 +131,10 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort,
ServerConfiguration bsConfs[];
Integer initialPort = 5000;

// Stream/Table Storage
StreamStorageLifecycleComponent streamStorage;
Integer streamStoragePort = 4181;

/**
* @param args
*/
Expand Down Expand Up @@ -213,6 +248,65 @@ private void runBookies(ServerConfiguration baseConf) throws Exception {
}
}

private void runStreamStorage(CompositeConfiguration conf) throws Exception {
String zkServers = "127.0.0.1:" + ZooKeeperDefaultPort;
String metadataServiceUriStr = "zk://" + zkServers + "/ledgers";
URI metadataServiceUri = URI.create(metadataServiceUriStr);

// zookeeper servers
conf.setProperty("metadataServiceUri", metadataServiceUriStr);
// dlog settings
conf.setProperty("dlog.bkcEnsembleSize", 1);
conf.setProperty("dlog.bkcWriteQuorumSize", 1);
conf.setProperty("dlog.bkcAckQuorumSize", 1);
// stream storage port
conf.setProperty("storageserver.grpc.port", streamStoragePort);

// initialize the stream storage metadata
ClusterInitializer initializer = new ZkClusterInitializer(zkServers);
initializer.initializeCluster(metadataServiceUri, 2);

// load the stream storage component
ServerConfiguration serverConf = new ServerConfiguration();
serverConf.loadConf(conf);
BookieConfiguration bkConf = new BookieConfiguration(serverConf);

this.streamStorage = new StreamStorageLifecycleComponent(bkConf, NullStatsLogger.INSTANCE);
this.streamStorage.start();
LOG.debug("Local BK stream storage started (port: {})", streamStoragePort);

// create a default namespace
try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri("bk://localhost:4181")
.backoffPolicy(Backoff.Jitter.of(
Type.EXPONENTIAL,
1000,
10000,
30
))
.build())
.buildAdmin()) {

try {
NamespaceProperties ns = FutureUtils.result(admin.getNamespace("default"));
LOG.info("'default' namespace for table service : {}", ns);
} catch (NamespaceNotFoundException nnfe) {
LOG.info("Creating default namespace");
try {
NamespaceProperties ns =
FutureUtils.result(admin.createNamespace("default", NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build()));
LOG.info("Successfully created 'default' namespace :\n{}", ns);
} catch (NamespaceExistsException nee) {
// namespace already exists
LOG.warn("Namespace 'default' already existed.");
}
}
}
}

public void start() throws Exception {
LOG.debug("Local ZK/BK starting ...");
ServerConfiguration conf = new ServerConfiguration();
Expand All @@ -232,6 +326,10 @@ public void start() throws Exception {
}

public void startStandalone() throws Exception {
startStandalone(false);
}

public void startStandalone(boolean enableStreamStorage) throws Exception {
LOG.debug("Local ZK/BK starting ...");
ServerConfiguration conf = new ServerConfiguration();
conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
Expand All @@ -248,9 +346,17 @@ public void startStandalone() throws Exception {
runZookeeper(1000);
initializeZookeper();
runBookies(conf);
if (enableStreamStorage) {
runStreamStorage(new CompositeConfiguration());
}
}

public void stop() throws Exception {
if (null != streamStorage) {
LOG.debug("Local bk stream storage stopping ...");
streamStorage.close();
}

LOG.debug("Local ZK/BK stopping ...");
for (BookieServer bookie : bs) {
bookie.shutdown();
Expand Down

0 comments on commit da48f42

Please sign in to comment.