Skip to content

Commit

Permalink
Correct TopicName#getPartitionIndex implementation. (apache#10902)
Browse files Browse the repository at this point in the history
### Motivation

Current partitioned topic check logic is not rigorous enough, such as the `mytopic-partition--1` this should not partitioned topic. 
For details of the discussion see http://mail-archives.apache.org/mod_mbox/pulsar-dev/202106.mbox/raw/%3CCACcefgerbLU88rKqPNCTgBSCWn_FJ5MB_E7oa1tKfxTsMAfpfg%40mail.gmail.com%3E

### Modifications
1. change the `getPartitionIndex` logic.
  • Loading branch information
yangl authored and ciaocloud committed Oct 16, 2021
1 parent bff84c6 commit 0b94e93
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Encapsulate the parsing of the completeTopicName name.
*/
public class TopicName implements ServiceUnitId {

private static final Logger log = LoggerFactory.getLogger(TopicName.class);

public static final String PUBLIC_TENANT = "public";
public static final String DEFAULT_NAMESPACE = "default";

Expand Down Expand Up @@ -272,9 +268,17 @@ public static int getPartitionIndex(String topic) {
int partitionIndex = -1;
if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
try {
partitionIndex = Integer.parseInt(topic.substring(topic.lastIndexOf('-') + 1));
String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX);
partitionIndex = Integer.parseInt(idx);
if (partitionIndex < 0) {
// for the "topic-partition--1"
partitionIndex = -1;
} else if (StringUtils.length(idx) != String.valueOf(partitionIndex).length()) {
// for the "topic-partition-01"
partitionIndex = -1;
}
} catch (NumberFormatException nfe) {
log.warn("Could not get the partition index from the topic {}", topic);
// ignore exception
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package org.apache.pulsar.common.naming;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.pulsar.common.util.Codec;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -200,10 +201,24 @@ public void topic() {

assertEquals(TopicName.getPartitionIndex("persistent://myprop/mycolo/myns/mytopic-partition-4"), 4);

// NOTE: Following behavior is not right actually, but for the backward compatibility, it shouldn't be changed
assertEquals(TopicName.getPartitionIndex("mytopic-partition--1"), 1);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-00"), 0);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-012"), 12);
// Following behavior is not right actually, none partitioned topic, partition index is -1
assertEquals(TopicName.getPartitionIndex("mytopic-partition--1"), -1);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-00"), -1);
assertEquals(TopicName.getPartitionIndex("mytopic-partition-012"), -1);

assertFalse(TopicName.get("mytopic-partition--1").isPartitioned());
assertFalse(TopicName.get("mytopic-partition--2").isPartitioned());
assertFalse(TopicName.get("mytopic-partition-01").isPartitioned());
assertFalse(TopicName.get("mytopic-partition-012").isPartitioned());
assertFalse(TopicName.get("mytopic-partition- 12").isPartitioned());
assertFalse(TopicName.get("mytopic-partition-12 ").isPartitioned());
assertFalse(TopicName.get("mytopic-partition- 12 ").isPartitioned());
assertFalse(TopicName.get("mytopic-partition-1&").isPartitioned());
assertFalse(TopicName.get("mytopic-partition-1!").isPartitioned());

assertTrue(TopicName.get("mytopic-partition-0").isPartitioned());
assertTrue(TopicName.get("mytopic-partition-1").isPartitioned());
assertTrue(TopicName.get("mytopic-partition-12").isPartitioned());
}

@Test
Expand Down

0 comments on commit 0b94e93

Please sign in to comment.