Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix call sync method in an async callback when enabling geo replicator. #12590

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ protected boolean exists(String path) throws MetadataStoreException {
}
}

protected CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}

public int getOperationTimeoutSec() {
return operationTimeoutSec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.common.policies.path.PolicyPath.path;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
Expand All @@ -39,7 +36,6 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -122,7 +118,7 @@ public CompletableFuture<Void> setPoliciesAsync(NamespaceName ns, Function<Polic
}

public static boolean pathIsFromNamespace(String path) {
return path.startsWith(BASE_POLICIES_PATH);
return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain better this line ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may receive a notification from the metastore /admin/policies/public, not a complete namespace path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
works for me, probably adding a comment will help future readers of this code

}

public static NamespaceName namespaceFromPath(String path) {
Expand Down Expand Up @@ -208,6 +204,11 @@ public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreExceptio
tn.getEncodedLocalName()));
}

public CompletableFuture<Boolean> partitionedTopicExistsAsync(TopicName tn) {
return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}

public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {
return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,14 +65,14 @@ protected enum State {
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
validatePartitionedTopic(topicName, brokerService);
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.replicationClient = replicationClient;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
Expand Down Expand Up @@ -242,20 +243,18 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
* @param topic
* @param brokerService
*/
private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
TopicName topicName = TopicName.get(topic);
boolean isPartitionedTopic = false;
try {
isPartitionedTopic =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(topicName);
} catch (Exception e) {
log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
}
if (isPartitionedTopic) {
throw new NamingException(
topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
}
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
if (isPartitionedTopic) {
String s = topicName
+ " is a partitioned-topic and replication can't be started for partitioned-producer ";
log.error(s);
return FutureUtil.failedFuture(new NamingException(s));
}
return CompletableFuture.completedFuture(null);
});
}

private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
});

future.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, future));
});
Expand All @@ -1089,15 +1089,15 @@ private <T> CompletableFuture<T> futureWithDeadline() {
() -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
}

public PulsarClient getReplicationClient(String cluster) {
public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) {
PulsarClient client = replicationClients.get(cluster);
if (client != null) {
return client;
}

return replicationClients.computeIfAbsent(cluster, key -> {
try {
ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.stats.Rate;
Expand All @@ -49,8 +49,9 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
replicationClient);

producerBuilder.blockIfQueueFull(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand All @@ -61,6 +61,7 @@
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
Expand Down Expand Up @@ -528,14 +529,7 @@ public CompletableFuture<Void> checkReplication() {
}

if (!replicators.containsKey(cluster)) {
if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on
// original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(
topic + " failed to start replicator for " + cluster));
}
futures.add(startReplicator(cluster));
}
}

Expand All @@ -553,29 +547,35 @@ public CompletableFuture<Void> checkReplication() {

}

boolean startReplicator(String remoteCluster) {
CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
}

protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException | PulsarServerException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
return null;
});

// clean up replicator if startup is failed
if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
replicators.remove(remoteCluster);
}
});
}

CompletableFuture<Void> removeReplicator(String remoteCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
Expand All @@ -54,6 +53,7 @@
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -105,8 +105,10 @@ public class PersistentReplicator extends AbstractReplicator
private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();

public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
replicationClient);
this.topic = topic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
Expand Down