Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix call sync method in an async callback when enabling geo replicator. #12590

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ protected boolean exists(String path) throws MetadataStoreException {
}
}

protected CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}

public int getOperationTimeoutSec() {
return operationTimeoutSec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.common.policies.path.PolicyPath.path;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
Expand All @@ -39,7 +36,6 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -122,7 +118,7 @@ public CompletableFuture<Void> setPoliciesAsync(NamespaceName ns, Function<Polic
}

public static boolean pathIsFromNamespace(String path) {
return path.startsWith(BASE_POLICIES_PATH);
return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain better this line ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may receive a notification from the metastore /admin/policies/public, not a complete namespace path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
works for me, probably adding a comment will help future readers of this code

}

public static NamespaceName namespaceFromPath(String path) {
Expand Down Expand Up @@ -208,6 +204,11 @@ public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreExceptio
tn.getEncodedLocalName()));
}

public CompletableFuture<Boolean> partitionedTopicExistsAsync(TopicName tn) {
return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
}

public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {
return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
tn.getEncodedLocalName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
return;
}

List<CompletableFuture<Void>> createFutureList = new ArrayList<>();

CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
Expand All @@ -658,7 +655,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
return null;
});

FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
List<String> replicatedClusters = new ArrayList<>();
if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
getNamespaceReplicatedClusters(namespaceName)
.stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
.forEach(replicatedClusters::add);
}
createLocalFuture.whenComplete((ignored, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
if (ex.getCause() instanceof RestException) {
Expand All @@ -669,14 +672,20 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
return;
}

if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
getNamespaceReplicatedClusters(namespaceName)
.stream()
.filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
.forEach(cluster -> createFutureList.add(
((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
if (!replicatedClusters.isEmpty()) {
replicatedClusters.forEach(cluster -> {
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
.thenAccept(clusterDataOp -> {
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(), numPartitions, true)));
topicName.getPartitionedTopicName(), numPartitions, true);
})
.exceptionally(throwable -> {
log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fail the operation ?

return null;
});
});
}

log.info("[{}] Successfully created partitions for topic {} in cluster {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,14 @@ private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions,
if (cluster.equals(pulsar().getConfig().getClusterName())) {
return;
}
results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
.updatePartitionedTopicAsync(topicName.toString(),
numPartitions, true, false));
CompletableFuture<Void> updatePartitionTopicFuture =
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
.thenApply(clusterDataOp ->
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
.thenCompose(pulsarAdmin ->
pulsarAdmin.topics().updatePartitionedTopicAsync(
topicName.toString(), numPartitions, true, false));
results.add(updatePartitionTopicFuture);
});
return FutureUtil.waitForAll(results);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,14 +65,14 @@ protected enum State {
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
validatePartitionedTopic(topicName, brokerService);
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
this.brokerService = brokerService;
this.topicName = topicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.replicationClient = replicationClient;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
Expand Down Expand Up @@ -242,20 +243,18 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
* @param topic
* @param brokerService
*/
private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
TopicName topicName = TopicName.get(topic);
boolean isPartitionedTopic = false;
try {
isPartitionedTopic =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExists(topicName);
} catch (Exception e) {
log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
}
if (isPartitionedTopic) {
throw new NamingException(
topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
}
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
if (isPartitionedTopic) {
String s = topicName
+ " is a partitioned-topic and replication can't be started for partitioned-producer ";
log.error(s);
return FutureUtil.failedFuture(new NamingException(s));
}
return CompletableFuture.completedFuture(null);
});
}

private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
}

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
Expand All @@ -1061,43 +1062,56 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);

CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
.thenCompose(__ -> nonPersistentTopic.checkReplication())
.thenApply(__ -> {
log.info("Created topic {}", nonPersistentTopic);
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
return Optional.of(nonPersistentTopic);
});

future.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, future));
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
nonPersistentTopic.initialize()
.thenCompose(__ -> nonPersistentTopic.checkReplication())
.thenRun(() -> {
log.info("Created topic {}", nonPersistentTopic);
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
topicFuture.complete(Optional.of(nonPersistentTopic));
}).exceptionally(ex -> {
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we run this after the execution of "topics.remove" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No effect before and after

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, just checking

});
return null;
});

}).exceptionally(e -> {
log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause());
// CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct
// broker. When client get non-persistent-partitioned topic
// metadata will the non-persistent-topic will be created.
// so we should add checkTopicNsOwnership logic otherwise the topic will be created
// if it dont own by this broker,we should return success
// otherwise it will keep retrying getPartitionedTopicMetadata
topicFuture.complete(Optional.of(nonPersistentTopic));
// after get metadata return success, we should delete this topic from this broker, because this topic not
// owner by this broker and it don't initialize and checkReplication
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});

return future;
return topicFuture;
}

private <T> CompletableFuture<T> futureWithDeadline() {
return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(),
() -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
}

public PulsarClient getReplicationClient(String cluster) {
public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) {
PulsarClient client = replicationClients.get(cluster);
if (client != null) {
return client;
}

return replicationClients.computeIfAbsent(cluster, key -> {
try {
ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
Expand Down Expand Up @@ -1164,14 +1178,14 @@ private void configTlsSettings(ClientBuilder clientBuilder, String serviceUrl,
}
}

public PulsarAdmin getClusterPulsarAdmin(String cluster) {
public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> clusterDataOp) {
PulsarAdmin admin = clusterAdmins.get(cluster);
if (admin != null) {
return admin;
}
return clusterAdmins.computeIfAbsent(cluster, key -> {
try {
ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
ClusterData data = clusterDataOp
.orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));

ServiceConfiguration conf = pulsar.getConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.stats.Rate;
Expand All @@ -49,8 +49,9 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService) throws NamingException, PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
replicationClient);

producerBuilder.blockIfQueueFull(false);

Expand Down
Loading