Skip to content

Commit

Permalink
[pulsar-broker] PIP-100 Support pluggable topic factory
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Sep 29, 2021
1 parent 3ec3665 commit 649af8f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ brokerShutdownTimeoutMs=60000
# Flag to skip broker shutdown when broker handles Out of memory error
skipBrokerShutdownOnOOM=false

# Factory class-name to create topic with custom workflow
topicFactoryClassName=

# Enable backlog quota check. Enforces action on topic when the quota is reached
backlogQuotaCheckEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long topicLoadTimeoutSeconds = 60;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Factory class-name to create topic with custom workflow"
)
private String topicFactoryClassName;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ public class BrokerService implements Closeable {
private boolean preciseTopicPublishRateLimitingEnable;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private TopicFactory topicFactory;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

Expand Down Expand Up @@ -305,6 +306,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers = new ConcurrentOpenHashSet<>();
this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
Expand Down Expand Up @@ -987,7 +989,13 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
new NotAllowedException("Broker is not unable to load non-persistent topic"));
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
} catch (Exception e) {
log.warn("Failed to create topic {}", topic, e);
return FutureUtil.failedFuture(e);
}

CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
.thenCompose(__ -> nonPersistentTopic.checkReplication())
Expand Down Expand Up @@ -1226,7 +1234,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
CompletableFuture<Void> preCreateSubForCompaction =
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
CompletableFuture<Void> replicationFuture = persistentTopic
Expand Down Expand Up @@ -2651,6 +2659,35 @@ public long getPausedConnections() {
return pausedConnections.longValue();
}

@SuppressWarnings("unchecked")
private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) throws NamingException {
if (topicFactory != null) {
try {
Topic newTopic = topicFactory.create(topic, ledger, brokerService, topicClazz);
if (newTopic != null) {
return (T) newTopic;
}
} catch (Throwable e) {
log.warn("Failed to create persistent topic using factory {}, {}", topic, e.getMessage());
}
}
return topicClazz == NonPersistentTopic.class ? (T) new NonPersistentTopic(topic, BrokerService.this)
: (T) new PersistentTopic(topic, ledger, brokerService);
}

private TopicFactory createPersistentTopicFactory() {
String topicFactoryClassName = pulsar.getConfig().getTopicFactoryClassName();
if (StringUtils.isNotBlank(topicFactoryClassName)) {
try {
return (TopicFactory) Class.forName(topicFactoryClassName).newInstance();
} catch (Exception e) {
log.warn("Failed to initialize topic factory class {}", topicFactoryClassName, e);
}
}
return null;
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.bookkeeper.mledger.ManagedLedger;

/**
* Pluggable TopicFactory to create topic with specific behavior in broker.
* Note: This API and feature is in experimental phase.
*/
public interface TopicFactory {

<T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService, Class<T> topicClazz);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -113,6 +117,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topic")
public Object[][] isPersistent() {
return new Object[][] { { true }, { false } };
}

@Test
public void testSimpleProducerEvents() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic0";
Expand Down Expand Up @@ -1859,4 +1868,42 @@ public void testProducerBusy() throws Exception {

assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
}

@Test(dataProvider = "topic")
public void testPersistentTopicFactory(boolean isPersistent) throws Exception {
conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
restartBroker();

final String topicName = (isPersistent ? "persistent" : "non-persistent") + "://prop/ns-abc/factoryTopic"
+ isPersistent;
MyTopicFactory.count.set(0);

// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

assertTrue(MyTopicFactory.count.get() > 0);
producer.close();
consumer.close();
}

public static class MyTopicFactory implements TopicFactory {
private static AtomicInteger count = new AtomicInteger(0);

@Override
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) {
try {
count.incrementAndGet();
if(topicClazz == NonPersistentTopic.class) {
return (T) new NonPersistentTopic(topic, brokerService);
}else {
return (T) new PersistentTopic(topic, ledger, brokerService);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
}
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds |30000|
|brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After this time elapses, the process will be killed |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|topicFactoryClassName| Factory class-name to create topic with custom workflow. ||
|backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. | -1 |
Expand Down Expand Up @@ -462,6 +463,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|zooKeeperOperationTimeoutSeconds|ZooKeeper operation timeout in seconds.|30|
|brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After this time elapses, the process will be killed. |60000|
|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false|
|topicFactoryClassName| Factory class-name to create topic with custom workflow. ||
|backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true|
|backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60|
|backlogQuotaDefaultLimitBytes| The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1. |-1|
Expand Down

0 comments on commit 649af8f

Please sign in to comment.