Skip to content

Commit

Permalink
PIP-2: Introduce non-persistent topics (#538)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jul 28, 2017
1 parent d7bb707 commit 380e47e
Show file tree
Hide file tree
Showing 73 changed files with 5,362 additions and 795 deletions.
46 changes: 29 additions & 17 deletions conf/broker.conf
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -84,23 +84,6 @@ statusFilePath=
# use only brokers running the latest software version (to minimize impact to bundles) # use only brokers running the latest software version (to minimize impact to bundles)
preferLaterVersions=false preferLaterVersions=false


### --- Authentication --- ###

# Enable TLS
tlsEnabled=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Path for the trusted TLS certificate file
tlsTrustCertsFilePath=

# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending # Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back. # messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
Expand Down Expand Up @@ -129,6 +112,35 @@ maxConcurrentLookupRequest=10000
# Max number of concurrent topic loading request broker allows to control number of zk-operations # Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000 maxConcurrentTopicLoadRequest=5000


# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

### --- Authentication --- ###

# Enable TLS
tlsEnabled=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Path for the trusted TLS certificate file
tlsTrustCertsFilePath=

# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

### --- Authentication --- ### ### --- Authentication --- ###


# Enable authentication # Enable authentication
Expand Down
18 changes: 18 additions & 0 deletions conf/standalone.conf
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages # limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16


# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

### --- Authentication --- ### ### --- Authentication --- ###


# Enable authentication # Enable authentication
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;


import io.netty.buffer.ByteBuf;

public class EntryCacheManager { public class EntryCacheManager {


private final long maxSize; private final long maxSize;
Expand Down Expand Up @@ -236,5 +238,9 @@ public int compareTo(EntryCache other) {


} }


public static Entry create(long ledgerId, long entryId, ByteBuf data) {
return EntryImpl.create(ledgerId, entryId, data);
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class); private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Max number of concurrent topic loading request broker allows to control number of zk-operations // Max number of concurrent topic loading request broker allows to control number of zk-operations
@FieldContext(dynamic = true) @FieldContext(dynamic = true)
private int maxConcurrentTopicLoadRequest = 5000; private int maxConcurrentTopicLoadRequest = 5000;
// Max concurrent non-persistent message can be processed per connection
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
// Number of worker threads to serve non-persistent topic
private int numWorkerThreadsForNonPersistentTopic = 8;
// Enable broker to load persistent topics
private boolean enablePersistentTopics = true;
// Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true;


/***** --- TLS --- ****/ /***** --- TLS --- ****/
// Enable TLS // Enable TLS
Expand Down Expand Up @@ -507,6 +515,38 @@ public void setMaxConcurrentTopicLoadRequest(int maxConcurrentTopicLoadRequest)
this.maxConcurrentTopicLoadRequest = maxConcurrentTopicLoadRequest; this.maxConcurrentTopicLoadRequest = maxConcurrentTopicLoadRequest;
} }


public int getMaxConcurrentNonPersistentMessagePerConnection() {
return maxConcurrentNonPersistentMessagePerConnection;
}

public void setMaxConcurrentNonPersistentMessagePerConnection(int maxConcurrentNonPersistentMessagePerConnection) {
this.maxConcurrentNonPersistentMessagePerConnection = maxConcurrentNonPersistentMessagePerConnection;
}

public int getNumWorkerThreadsForNonPersistentTopic() {
return numWorkerThreadsForNonPersistentTopic;
}

public void setNumWorkerThreadsForNonPersistentTopic(int numWorkerThreadsForNonPersistentTopic) {
this.numWorkerThreadsForNonPersistentTopic = numWorkerThreadsForNonPersistentTopic;
}

public boolean isEnablePersistentTopics() {
return enablePersistentTopics;
}

public void setEnablePersistentTopics(boolean enablePersistentTopics) {
this.enablePersistentTopics = enablePersistentTopics;
}

public boolean isEnableNonPersistentTopics() {
return enableNonPersistentTopics;
}

public void setEnableNonPersistentTopics(boolean enableNonPersistentTopics) {
this.enableNonPersistentTopics = enableNonPersistentTopics;
}

public boolean isTlsEnabled() { public boolean isTlsEnabled() {
return tlsEnabled; return tlsEnabled;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
private final String webServiceUrlTls; private final String webServiceUrlTls;
private final String pulsarServiceUrl; private final String pulsarServiceUrl;
private final String pulsarServiceUrlTls; private final String pulsarServiceUrlTls;
private boolean persistentTopicsEnabled=true;
private boolean nonPersistentTopicsEnabled=true;


// Most recently available system resource usage. // Most recently available system resource usage.
private ResourceUsage cpu; private ResourceUsage cpu;
Expand Down Expand Up @@ -370,5 +372,21 @@ public String getPulsarServiceUrl() {
public String getPulsarServiceUrlTls() { public String getPulsarServiceUrlTls() {
return pulsarServiceUrlTls; return pulsarServiceUrlTls;
} }

public boolean isPersistentTopicsEnabled() {
return persistentTopicsEnabled;
}

public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
this.persistentTopicsEnabled = persistentTopicsEnabled;
}

public boolean isNonPersistentTopicsEnabled() {
return nonPersistentTopicsEnabled;
}

public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) {
this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
}


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;


import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException; import javax.ws.rs.WebApplicationException;
Expand All @@ -36,17 +37,21 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
Expand All @@ -64,6 +69,7 @@ public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class); private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
public static final String LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH = "/admin/flags/load-shedding-unload-disabled"; public static final String LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH = "/admin/flags/load-shedding-unload-disabled";
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";


protected ZooKeeper globalZk() { protected ZooKeeper globalZk() {
return pulsar().getGlobalZkCache().getZooKeeper(); return pulsar().getGlobalZkCache().getZooKeeper();
Expand Down Expand Up @@ -97,12 +103,10 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
* Get the domain of the destination (whether it's queue or topic) * Get the domain of the destination (whether it's queue or topic)
*/ */
protected String domain() { protected String domain() {
if (uri.getPath().startsWith("queues/")) { if (uri.getPath().startsWith("persistent/")) {
return "queue";
} else if (uri.getPath().startsWith("topics/")) {
return "topic";
} else if (uri.getPath().startsWith("persistent/")) {
return "persistent"; return "persistent";
} else if (uri.getPath().startsWith("non-persistent/")) {
return "non-persistent";
} else { } else {
throw new RestException(Status.INTERNAL_SERVER_ERROR, "domain() invoked from wrong resource"); throw new RestException(Status.INTERNAL_SERVER_ERROR, "domain() invoked from wrong resource");
} }
Expand Down Expand Up @@ -281,4 +285,69 @@ protected ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPolic
return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache(); return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
} }


protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateClusterOwnership(dn.getCluster());

try {
checkConnect(dn);
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(dn.getProperty());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
partitionMetadata.partitions);
}
return partitionMetadata;
}

protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
try {
return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
}
throw new RestException(e);
}
}

protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).thenAccept(metadata -> {
// if the partitioned topic is not found in zk, then the topic is not partitioned
if (metadata.isPresent()) {
metadataFuture.complete(metadata.get());
} else {
metadataFuture.complete(new PartitionedTopicMetadata());
}
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
metadataFuture.completeExceptionally(e);
}
return metadataFuture;
}

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
Expand All @@ -50,8 +49,9 @@


import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -64,12 +64,12 @@
import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -1273,20 +1273,24 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @


private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
try { try {
List<PersistentTopic> topicList = pulsar().getBrokerService() List<Topic> topicList = pulsar().getBrokerService()
.getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange); .getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange);


List<CompletableFuture<Void>> futures = Lists.newArrayList(); List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (subscription != null) { if (subscription != null) {
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
subscription = PersistentReplicator.getRemoteCluster(subscription); subscription = PersistentReplicator.getRemoteCluster(subscription);
} }
for (PersistentTopic topic : topicList) { for (Topic topic : topicList) {
futures.add(topic.clearBacklog(subscription)); if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog(subscription));
}
} }
} else { } else {
for (PersistentTopic topic : topicList) { for (Topic topic : topicList) {
futures.add(topic.clearBacklog()); if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog());
}
} }
} }


Expand All @@ -1300,14 +1304,14 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc


private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) { private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
try { try {
List<PersistentTopic> topicList = pulsar().getBrokerService() List<Topic> topicList = pulsar().getBrokerService()
.getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange); .getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange);
List<CompletableFuture<Void>> futures = Lists.newArrayList(); List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor"); throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
} else { } else {
for (PersistentTopic topic : topicList) { for (Topic topic : topicList) {
PersistentSubscription sub = topic.getPersistentSubscription(subscription); Subscription sub = topic.getSubscription(subscription);
if (sub != null) { if (sub != null) {
futures.add(sub.delete()); futures.add(sub.delete());
} }
Expand Down
Loading

0 comments on commit 380e47e

Please sign in to comment.