Skip to content

Commit

Permalink
Move initialize dlog namespace metadata to bin/pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Dec 2, 2020
1 parent ee4cddf commit 6423e6a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 10 deletions.
2 changes: 2 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarInitialNamespaceSetup $@
elif [ $COMMAND == "initialize-dlog-namespace-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarInitialDlogNamespaceMetadataSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "broker-tool" ]; then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.functions.worker.WorkerUtils;

public class PulsarInitialDlogNamespaceMetadataSetup {
private static class Arguments {
@Parameter(names = {"-cs",
"--configuration-store"}, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Parameter(names = {"--bookkeeper-metadata-service-uri"}, description = "Metadata service uri of BookKeeper", required = true)
private String bkMetadataServiceUri;

@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
}

public static void main(String[] args) throws Exception {
PulsarInitialDlogNamespaceMetadataSetup.Arguments arguments = new PulsarInitialDlogNamespaceMetadataSetup.Arguments();
JCommander jcommander = new JCommander();
try {
jcommander.addObject(arguments);
jcommander.parse(args);
if (arguments.help) {
jcommander.usage();
return;
}
} catch (Exception e) {
jcommander.usage();
throw e;
}

if (arguments.configurationStore == null) {
System.err.println("Configuration store address argument is required (--configuration-store)");
jcommander.usage();
System.exit(1);
}
if (arguments.bkMetadataServiceUri == null) {
System.err.println("Metadata service uri of BookKeeper argument is required (--bookkeeper-metadata-service-uri)");
jcommander.usage();
System.exit(1);
}

InternalConfigurationData internalConf = new InternalConfigurationData(arguments.configurationStore, arguments.configurationStore, null, arguments.bkMetadataServiceUri, null);
WorkerUtils.initializeDlogNamespace(internalConf);
System.out.println("Initial Dlog namespace metadata setup success");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,8 @@ private static URI initialize(WorkerConfig workerConfig)
admin.close();
}

// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
try {
return WorkerUtils.initializeDlogNamespace(internalConf);
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
// get the dlog namespace
return WorkerUtils.getDlogNamespaceURI(internalConf.getZookeeperServers());
}

private AuthorizationService getAuthorizationService() throws PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
}
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);
URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
URI dlogUri = getDlogNamespaceURI(zookeeperServers);

try {
dlMetadata.create(dlogUri);
Expand All @@ -183,6 +183,10 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
return dlogUri;
}

public static URI getDlogNamespaceURI(String zookeeperServers) {
return URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
}

public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) {
return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null);
}
Expand Down

0 comments on commit 6423e6a

Please sign in to comment.