Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix create consumer on partitioned topic while disable topic auto creation. #5572

Merged
merged 6 commits into from Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -425,10 +425,13 @@ public List<String> getChildren(String path, boolean watch) throws KeeperExcepti
if (path.length() >= item.length()) {
continue;
}

String child = item.substring(path.length() + 1);
if (!child.contains("/")) {
children.add(child);
String child = item.substring(path.length());
if (child.indexOf("/") == 0) {
child = child.substring(1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
}
}
}
}
Expand Down Expand Up @@ -465,10 +468,13 @@ public void getChildren(final String path, boolean watcher, final Children2Callb
} else if (item.equals(path)) {
continue;
} else {
String child = item.substring(path.length() + 1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
String child = item.substring(path.length());
if (child.indexOf("/") == 0) {
child = child.substring(1);
log.debug("child: '{}'", child);
if (!child.contains("/")) {
children.add(child);
}
}
}
}
Expand Down
Expand Up @@ -30,6 +30,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.servlet.ServletContext;
Expand Down Expand Up @@ -66,9 +68,9 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -111,6 +113,11 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void zkCreateOptimisticAsync(String path, byte[] content, AsyncCallback.StringCallback callback) {
ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, callback, null);
}

protected boolean zkPathExists(String path) throws KeeperException, InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
Expand All @@ -119,6 +126,21 @@ protected boolean zkPathExists(String path) throws KeeperException, InterruptedE
return false;
}

protected void zkSync(String path) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue());
globalZk().sync(path, (rc2, s, ctx) -> {
if (KeeperException.Code.OK.intValue() != rc2) {
rc.set(rc2);
}
latch.countDown();
}, null);
latch.await();
if (KeeperException.Code.OK.intValue() != rc.get()) {
throw KeeperException.create(KeeperException.Code.get(rc.get()));
}
}

/**
* Get the domain of the topic (whether it's persistent or non-persistent)
*/
Expand Down Expand Up @@ -233,6 +255,37 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}

protected void tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
return;
}
for (int i = 0; i < numPartitions; i++) {
tryCreatePartitionAsync(i);
}
}

private void tryCreatePartitionAsync(final int partition) {
zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
(rc, s, o, s1) -> {
if (KeeperException.Code.OK.intValue() == rc) {
if (log.isDebugEnabled()) {
log.debug("[{}] Topic partition {} created.", clientAppId(),
topicName.getPartition(partition));
}
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
tryCreatePartitionAsync(partition);
} else {
log.error("[{}] Fail to create topic partition {}", clientAppId(),
topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
}
});
}

protected NamespaceName namespaceName;

protected void validateNamespaceName(String property, String namespace) {
Expand Down
Expand Up @@ -31,6 +31,10 @@ public static String partitionedTopicPath(TopicName name) {
name.getNamespace(), name.getDomain().value(), name.getEncodedLocalName());
}

public static String managedLedgerPath(TopicName name) {
return "/managed-ledgers/" + name.getPersistenceNamingEncoding();
}

public static String namespacePoliciesPath(NamespaceName name) {
return adminPath(POLICIES, name.toString());
}
Expand Down
Expand Up @@ -116,7 +116,6 @@
public class PersistentTopicsBase extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);

public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
Expand Down Expand Up @@ -394,8 +393,9 @@ protected void internalCreatePartitionedTopic(int numPartitions) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
tryCreatePartitionsAsync(numPartitions);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down Expand Up @@ -513,6 +513,13 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
}
}

protected void internalCreateMissedPartitions() {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions);
}
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
Expand Down Expand Up @@ -600,8 +607,8 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole
try {
globalZk().delete(path, -1);
globalZkCache().invalidate(path);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
return;
Expand Down Expand Up @@ -1819,24 +1826,28 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
}

admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
if (stats.subscriptions.size() == 0) {
result.complete(null);
} else {
stats.subscriptions.keySet().forEach(subscription -> {
List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();

subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
subscription, MessageId.latest));
}

FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex);
result.completeExceptionally(ex);
return null;
});
});
});
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are currently to subscriptions to recreate
Expand Down
Expand Up @@ -144,8 +144,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down
Expand Up @@ -190,8 +190,8 @@ public void createPartitionedTopic(
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
// we wait for the data to be synced in all quorums and the observers
Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
// Sync data to all quorums and the observers
zkSync(path);
log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
Expand Down
Expand Up @@ -246,7 +246,7 @@ public void createNonPartitionedTopic(
*/
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand All @@ -270,6 +270,30 @@ public void updatePartitionedTopic(
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
}


@POST
@Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
@ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void createMissedPartitions(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {

validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalCreateMissedPartitions();
}

@GET
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
Expand Down
Expand Up @@ -79,15 +79,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
Expand Down Expand Up @@ -1717,10 +1715,15 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
partitionedTopicPath(topicName), content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
// we wait for the data to be synced in all quorums and the observers
executor().schedule(
SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)),
PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS);
// Sync data to all quorums and the observers
pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName),
(rc2, path2, ctx2) -> {
if (rc2 == KeeperException.Code.OK.intValue()) {
partitionedTopicFuture.complete(configMetadata);
} else {
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2));
}
}, null);
} else {
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
}
Expand Down
Expand Up @@ -812,9 +812,8 @@ public void partitionedTopics(String topicName) throws Exception {

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4);

// check if the virtual topic doesn't get created
List<String> topics = admin.topics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 0);
assertEquals(topics.size(), 4);

assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
0);
Expand All @@ -826,15 +825,8 @@ public void partitionedTopics(String topicName) throws Exception {
assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).partitions.size(),
0);

try {
admin.topics().getSubscriptions(partitionedTopicName);
fail("should have failed");
} catch (PulsarAdminException e) {
// ok
assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
} catch (Exception e) {
fail(e.getMessage());
}
List<String> subscriptions = admin.topics().getSubscriptions(partitionedTopicName);
assertEquals(subscriptions.size(), 0);

// create consumer and subscription
URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
Expand Down