Skip to content

Commit

Permalink
client enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto committed Jan 10, 2024
1 parent 4a4d8a5 commit 2976b25
Showing 1 changed file with 6 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,19 @@ public void updateSubscription(ClientInfo clientInfo, String consumerGroup,
if (CollectionUtils.isEmpty(groupTopicClients)) {
log.error("group {} topic {} clients is empty", consumerGroup, subscription);
}
ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.computeIfAbsent(consumerGroup, (consumerGroupInner) ->
new ConsumerGroupConf(consumerGroup)
);

ConsumerGroupConf consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
if (consumerGroupConf == null) {
// new subscription
ConsumerGroupConf prev = localConsumerGroupMapping.putIfAbsent(consumerGroup, new ConsumerGroupConf(consumerGroup));
if (prev == null) {
log.info("add new subscription, consumer group: {}", consumerGroup);
}
consumerGroupConf = localConsumerGroupMapping.get(consumerGroup);
}

ConsumerGroupTopicConf consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf()
.get(subscription.getTopic());
if (consumerGroupTopicConf == null) {
consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topic) -> {
ConsumerGroupTopicConf consumerGroupTopicConf =
consumerGroupConf.getConsumerGroupTopicConf().computeIfAbsent(subscription.getTopic(), (topicInner) -> {
ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf();
newTopicConf.setConsumerGroup(consumerGroup);
newTopicConf.setTopic(topic);
newTopicConf.setTopic(topicInner);
newTopicConf.setSubscriptionItem(subscription);
log.info("add new {}", newTopicConf);
return newTopicConf;
});
consumerGroupTopicConf = consumerGroupConf.getConsumerGroupTopicConf().get(subscription.getTopic());
}

consumerGroupTopicConf.getUrls().add(url);
if (!consumerGroupTopicConf.getIdcUrls().containsKey(clientInfo.getIdc())) {
Expand Down

0 comments on commit 2976b25

Please sign in to comment.