Skip to content

Commit

Permalink
Broker should not start replicator for root partitioned-topic (#1262)
Browse files Browse the repository at this point in the history
* Broker should not start replicator for root partitioned-topic

* address comment
  • Loading branch information
rdhabalia committed Feb 23, 2018
1 parent 68cd115 commit f38a003
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 39 deletions.
Expand Up @@ -18,17 +18,21 @@
*/ */
package org.apache.pulsar.broker.service; package org.apache.pulsar.broker.service;


import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.AbstractReplicator.State; import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -57,8 +61,9 @@ protected enum State {
Stopped, Starting, Started, Stopping Stopped, Starting, Started, Stopping
} }


public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
String remoteCluster, BrokerService brokerService) { BrokerService brokerService) throws NamingException {
validatePartitionedTopic(topicName, brokerService);
this.brokerService = brokerService; this.brokerService = brokerService;
this.topicName = topicName; this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix; this.replicatorPrefix = replicatorPrefix;
Expand All @@ -69,8 +74,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();


this.producerBuilder = client.newProducer() // this.producerBuilder = client.newProducer() //
.topic(topicName) .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) // .maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster)); .producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped); STATE_UPDATER.set(this, State.Stopped);
Expand Down Expand Up @@ -211,5 +215,42 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
return (replicatorPrefix + "." + cluster).intern(); return (replicatorPrefix + "." + cluster).intern();
} }


/**
* Replication can't be started on root-partitioned-topic to avoid producer startup conflict.
*
* <pre>
* eg:
* if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic with 2 partitions then
* broker explicitly creates replicator producer for: "my-topic-partition-1" and "my-topic-partition-2".
*
* However, if broker tries to start producer with root topic "my-topic" then client-lib internally creates individual
* producers for "my-topic-partition-1" and "my-topic-partition-2" which creates conflict with existing
* replicator producers.
* </pre>
*
* Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
* producers.
*
* @param topicName
* @param brokerService
*/
private void validatePartitionedTopic(String topicName, BrokerService brokerService) throws NamingException {
DestinationName destination = DestinationName.get(topicName);
String partitionedTopicPath = path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
destination.getNamespace().toString(), destination.getDomain().toString(),
destination.getEncodedLocalName());
boolean isPartitionedTopic = false;
try {
isPartitionedTopic = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(partitionedTopicPath).isPresent();
} catch (Exception e) {
log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
}
if (isPartitionedTopic) {
throw new NamingException(
topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
}
}

private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class); private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
} }
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -583,29 +584,37 @@ private void createPersistentTopic(final String topic, CompletableFuture<Topic>
new OpenLedgerCallback() { new OpenLedgerCallback() {
@Override @Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) { public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this); try {

PersistentTopic persistentTopic = new PersistentTopic(topic, ledger,
CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication(); BrokerService.this);
replicationFuture.thenCompose(v -> { CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
// Also check dedup status replicationFuture.thenCompose(v -> {
return persistentTopic.checkDeduplicationStatus(); // Also check dedup status
}).thenRun(() -> { return persistentTopic.checkDeduplicationStatus();
log.info("Created topic {} - dedup is {}", topic, }).thenRun(() -> {
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"); log.info("Created topic {} - dedup is {}", topic,
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
- topicCreateTimeMs; long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - topicCreateTimeMs;
addTopicToStatsMaps(destinationName, persistentTopic); pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
topicFuture.complete(persistentTopic); addTopicToStatsMaps(destinationName, persistentTopic);
}).exceptionally((ex) -> { topicFuture.complete(persistentTopic);
log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", topic, ex); }).exceptionally((ex) -> {
persistentTopic.stopReplProducers().whenComplete((v, exception) -> { log.warn(
topics.remove(topic, topicFuture); "Replication or dedup check failed. Removing topic from topics list {}, {}",
topicFuture.completeExceptionally(ex); topic, ex);
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
});

return null;
}); });

} catch (NamingException e) {
return null; log.warn("Failed to create topic {}-{}", topic, e.getMessage());
}); pulsar.getExecutor().submit(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(e);
}
} }


@Override @Override
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.util.Rate; import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
Expand All @@ -49,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats(); private final NonPersistentReplicatorStats stats = new NonPersistentReplicatorStats();


public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) { BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService); super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);


producerBuilder.blockIfQueueFull(false); producerBuilder.blockIfQueueFull(false);
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;


import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.OrderedSafeExecutor;
Expand All @@ -49,6 +50,8 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.Replicator;
Expand Down Expand Up @@ -535,7 +538,12 @@ public CompletableFuture<Void> checkReplication() {
} }


if (!replicators.containsKey(cluster)) { if (!replicators.containsKey(cluster)) {
startReplicator(cluster); if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
}
} }
} }


Expand All @@ -550,13 +558,30 @@ public CompletableFuture<Void> checkReplication() {
return FutureUtil.waitForAll(futures); return FutureUtil.waitForAll(futures);
} }


void startReplicator(String remoteCluster) { boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster, return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
r -> new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService));
} }


protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) { CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster); log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>(); final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -941,5 +966,7 @@ public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true; this.hasBatchMessagePublished = true;
} }




private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
} }
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.util.Rate; import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.Backoff;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final ReplicatorStats stats = new ReplicatorStats(); private final ReplicatorStats stats = new ReplicatorStats();


public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) { BrokerService brokerService) throws NamingException {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService); super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);
this.topic = topic; this.topic = topic;
this.cursor = cursor; this.cursor = cursor;
Expand Down
Expand Up @@ -194,7 +194,7 @@ public void reset() {
} }
} }


public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
this.topic = topic; this.topic = topic;
this.ledger = ledger; this.ledger = ledger;
this.brokerService = brokerService; this.brokerService = brokerService;
Expand All @@ -213,8 +213,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
if (cursor.getName().startsWith(replicatorPrefix)) { if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
replicators.put(remoteCluster, boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor, localCluster);
new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService)); if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
}
} else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) { } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
// This is not a regular subscription, we are going to ignore it for now and let the message dedup logic // This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
// to take care of it // to take care of it
Expand Down Expand Up @@ -896,9 +899,13 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
@Override @Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) { public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster, boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
remoteCluster, brokerService)); if (isReplicatorStarted) {
future.complete(null); future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
} }


@Override @Override
Expand All @@ -911,6 +918,26 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future; return future;
} }


protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, ManagedCursor cursor,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}

CompletableFuture<Void> removeReplicator(String remoteCluster) { CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster); log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>(); final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down

0 comments on commit f38a003

Please sign in to comment.