Skip to content

Commit

Permalink
Prevent creation of regular topic with the same name as existing part…
Browse files Browse the repository at this point in the history
…itioned topic (apache#5943)

### Motivation


Currently, it is not possible to create a partitioned topic with the same name as an existing non-partitioned topic, but the reverse is possible.

```
$ ./bin/pulsar-admin topics create persistent://public/default/t1
$ ./bin/pulsar-admin topics create-partitioned-topic -p 2 persistent://public/default/t1

16:12:50.418 [AsyncHttpClient-5-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/default/t1/partitions] Failed to perform http put request: javax.ws.rs.ClientErrorException: HTTP 409 Conflict
This topic already exists

Reason: This topic already exists

$ ./bin/pulsar-admin topics create-partitioned-topic -p 2 persistent://public/default/t2
$ ./bin/pulsar-admin topics create persistent://public/default/t2
$ ./bin/pulsar-admin topics list public/default

"persistent://public/default/t2"
"persistent://public/default/t1"

$ ./bin/pulsar-admin topics list-partitioned-topics public/default

"persistent://public/default/t2"
```

These non-partitioned topics are not available and should not be created.

### Modifications

When creating a non-partitioned topic, "409 Conflict" error will be returned if a partitioned topic with the same name already exists.
  • Loading branch information
Masahiro Sakamoto authored and huangdx0726 committed Aug 24, 2020
1 parent 8eea549 commit 173893e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,20 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
}

validateTopicOwnership(topicName, authoritative);
try {

PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
throw new RestException(Status.CONFLICT, "This topic already exists");
}

try {
Topic createdTopic = getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
} catch (Exception e) {
log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,29 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {

}

@Test
public void testPersistentTopicCreation() throws Exception {
final String nonPartitionedtopic = "persistent://prop-xyz/ns1/non-partitioned-topic";
final String partitionedtopic = "persistent://prop-xyz/ns1/partitioned-topic";

admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
admin.topics().createPartitionedTopic(partitionedtopic, 2);

try {
admin.topics().createPartitionedTopic(nonPartitionedtopic, 2);
fail("should not be able to create a partitioned topic with the same name");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}

try {
admin.topics().createNonPartitionedTopic(partitionedtopic);
fail("should not be able to create a non-partitioned topic with the same name");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
}

/**
* This test-case verifies that broker should support both url/uri encoding for topic-name. It calls below api with
* url-encoded and also uri-encoded topic-name in http request: a. PartitionedMetadataLookup b. TopicLookupBase c.
Expand Down

0 comments on commit 173893e

Please sign in to comment.