Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue 8337][Worker] Move initialize dlog namespace metadata to bin/pulsar #8781

Merged
merged 2 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,7 @@ functionsDirectory: ./functions

# Should connector config be validated during during submission
validateConnectorConfig: false

# Whether to initialize distributed log metadata by runtime.
# If it is set to true, you must ensure that it has been initialized by "bin/pulsarinitialize-cluster-metadata" command.
initializedDlogMetadata: false
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
Expand Down Expand Up @@ -141,6 +143,18 @@ private static void createZkNode(ZooKeeper zkc, String path,
}
}

private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
throws IOException {
InternalConfigurationData internalConf = new InternalConfigurationData(
configurationStore,
configurationStore,
null,
bkMetadataServiceUri,
null
);
WorkerUtils.initializeDlogNamespace(internalConf);
}

public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
Expand Down Expand Up @@ -195,15 +209,20 @@ public static void main(String[] args) throws Exception {
}
}


String uriStr = bkConf.getMetadataServiceUri();
if (arguments.existingBkMetadataServiceUri != null) {
uriStr = arguments.existingBkMetadataServiceUri;
} else if (arguments.bookieMetadataServiceUri != null) {
uriStr = arguments.bookieMetadataServiceUri;
}
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);

// initial distributed log metadata
initialDlogNamespaceMetadata(arguments.configurationStore, uriStr);

// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
String uriStr = bkConf.getMetadataServiceUri();
if (arguments.existingBkMetadataServiceUri != null) {
uriStr = arguments.existingBkMetadataServiceUri;
} else if (arguments.bookieMetadataServiceUri != null) {
uriStr = arguments.bookieMetadataServiceUri;
}
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
ClusterInitializer initializer = new ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,19 @@ public boolean getTlsEnabled() {
return tlsEnabled || workerPortTls != null;
}

@FieldContext(
category = CATEGORY_WORKER,
doc = "Whether to initialize distributed log metadata in runtime"
)
private Boolean initializedDlogMetadata = false;

public Boolean isInitializedDlogMetadata() {
if (this.initializedDlogMetadata == null){
return false;
}
return this.initializedDlogMetadata;
};

/******** security settings for pulsar broker client **********/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,21 @@ private static URI initializeStandaloneWorkerService(PulsarClientCreator clientC
}

// initialize the dlog namespace
// TODO: move this as part of pulsar cluster initialization later
URI dlogURI;
try {
return WorkerUtils.initializeDlogNamespace(internalConf);
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing " +
"function packages", internalConf.getZookeeperServers(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}

return dlogURI;
}

@Override
Expand Down Expand Up @@ -363,9 +370,14 @@ public void initInBroker(ServiceConfiguration brokerConfig,
URI dlogURI;
try {
// initializing dlog namespace for function worker
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
if (workerConfig.isInitializedDlogMetadata()){
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for storing function packages",
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for " +
"storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig)
return conf;
}

public static URI newDlogNamespaceURI(InternalConfigurationData internalConf) {
String zookeeperServers = internalConf.getZookeeperServers();
public static URI newDlogNamespaceURI(String zookeeperServers) {
return URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
}

Expand All @@ -176,7 +175,7 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);

URI dlogUri = newDlogNamespaceURI(internalConf);
URI dlogUri = newDlogNamespaceURI(internalConf.getZookeeperServers());
try {
dlMetadata.create(dlogUri);
} catch (ZKException e) {
Expand Down