Skip to content

Commit

Permalink
fix partitioned topic list bug
Browse files Browse the repository at this point in the history
Signed-off-by: ZhangJian He <shoothzj@gmail.com>
  • Loading branch information
shoothzj committed Mar 6, 2024
1 parent 03386ad commit 7f660f0
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 16 deletions.
2 changes: 1 addition & 1 deletion embedded-pulsar-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>embedded-pulsar-parent</artifactId>
<groupId>io.github.embedded-middleware</groupId>
<version>0.0.6</version>
<version>0.0.7</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void handleGetPartitionedTopicInfo(RoutingContext context) {
public void handleGetAllPartitionedTopics(RoutingContext context) {
String tenant = context.pathParam("tenant");
String namespace = context.pathParam("namespace");
List<PartitionedTopicInfo> partitionedTopics = pulsarEngine.getAllPartitionedTopics(tenant, namespace);
List<String> partitionedTopics = pulsarEngine.getPartitionedTopics(tenant, namespace);
if (partitionedTopics == null) {
partitionedTopics = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.github.embedded.pulsar.core.module.NamespaceInfo;
import io.github.embedded.pulsar.core.module.PartitionedTopicInfo;
import io.github.embedded.pulsar.core.module.TenantInfo;
import io.github.embedded.pulsar.core.module.TopicInfo;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -16,6 +17,10 @@ public class PulsarEngine {
private final ConcurrentHashMap<String, ConcurrentHashMap<String, NamespaceInfo>> namespaces =
new ConcurrentHashMap<>();

private final ConcurrentHashMap<String,
ConcurrentHashMap<String, ConcurrentHashMap<String, TopicInfo>>> topics =
new ConcurrentHashMap<>();

private final ConcurrentHashMap<String,
ConcurrentHashMap<String, ConcurrentHashMap<String, PartitionedTopicInfo>>> partitionedTopics =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -56,6 +61,16 @@ public List<String> getTenantNamespaces(String tenant) {
return tenantNamespaces == null ? Collections.emptyList() : new ArrayList<>(tenantNamespaces.keySet());
}

public List<String> getTopics(String tenant, String namespace) {
ConcurrentHashMap<String, ConcurrentHashMap<String, TopicInfo>> tenantNamespaces =
topics.get(tenant);
if (tenantNamespaces != null) {
ConcurrentHashMap<String, TopicInfo> namespaceTopics = tenantNamespaces.get(namespace);
return namespaceTopics == null ? Collections.emptyList() : new ArrayList<>(namespaceTopics.keySet());
}
return Collections.emptyList();
}

public void createPartitionedTopic(String tenant, String namespace, String topic, int numPartitions) {
partitionedTopics.computeIfAbsent(tenant, k -> new ConcurrentHashMap<>())
.computeIfAbsent(namespace, k -> new ConcurrentHashMap<>())
Expand Down Expand Up @@ -94,17 +109,4 @@ public PartitionedTopicInfo getPartitionedTopicInfo(String tenant, String namesp
}
return null;
}

public List<PartitionedTopicInfo> getAllPartitionedTopics(String tenant, String namespace) {
ConcurrentHashMap<String, ConcurrentHashMap<String, PartitionedTopicInfo>> tenantNamespaces =
partitionedTopics.get(tenant);
if (tenantNamespaces != null) {
ConcurrentHashMap<String, PartitionedTopicInfo> namespaceTopics = tenantNamespaces.get(namespace);
if (namespaceTopics != null) {
return new ArrayList<>(namespaceTopics.values());
}
}
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.github.embedded.pulsar.core.module;

public class TopicInfo {
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.github.embedded-middleware</groupId>
<artifactId>embedded-pulsar-parent</artifactId>
<version>0.0.6</version>
<version>0.0.7</version>
<modules>
<module>embedded-pulsar-core</module>
</modules>
Expand Down

0 comments on commit 7f660f0

Please sign in to comment.