Skip to content

Commit

Permalink
[Transaction] Standalone support transaction. (#10238)
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 committed Apr 15, 2021
1 parent 9d3cbef commit 64f8140
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
11 changes: 10 additions & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,16 @@ allowAutoSubscriptionCreation=true
defaultNumPartitions=1

### --- Transaction config variables --- ###
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=false
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider

# Transaction buffer take snapshot transaction count
transactionBufferSnapshotMaxTransactionCount=1000

# Transaction buffer take snapshot interval time
# Unit : millisecond
transactionBufferSnapshotMinTimeInMillis=5000

### --- Packages management service configuration variables (begin) --- ###

Expand Down
22 changes: 12 additions & 10 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_ASSIGN;
import com.beust.jcommander.Parameter;
import com.google.common.collect.Sets;
import java.io.File;
Expand All @@ -36,7 +38,6 @@
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -291,10 +292,6 @@ public void start() throws Exception {
});
broker.start();

if (config.isTransactionCoordinatorEnabled()) {
broker.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
}

final String cluster = config.getClusterName();

if (!config.isTlsEnabled()) {
Expand Down Expand Up @@ -338,15 +335,20 @@ public void start() throws Exception {
createSampleNameSpace(clusterData, cluster);
}

createDefaultNameSpace(cluster);
//create default namespace
createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
//create pulsar system namespace
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString());
if (config.isTransactionCoordinatorEnabled() && !admin.namespaces()
.getTopics(SYSTEM_NAMESPACE.toString())
.contains(TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())) {
admin.topics().createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
}

log.debug("--- setup completed ---");
}

private void createDefaultNameSpace(String cluster) {
// Create a public tenant and default namespace
final String publicTenant = TopicName.PUBLIC_TENANT;
final String defaultNamespace = TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE;
private void createNameSpace(String cluster, String publicTenant, String defaultNamespace) {
try {
if (!admin.tenants().getTenants().contains(publicTenant)) {
admin.tenants().createTenant(publicTenant,
Expand Down

0 comments on commit 64f8140

Please sign in to comment.