Skip to content

Commit

Permalink
[ISSUE #55]Delete the corresponding DLQ and Retry Topic simultaneousl…
Browse files Browse the repository at this point in the history
…y when deleting the consumerGroup.
  • Loading branch information
zhangjidi2016 committed Jan 4, 2022
1 parent 1caeb4c commit 00c6b20
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
Expand All @@ -46,6 +48,7 @@
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.dashboard.model.GroupConsumeInfo;
import org.apache.rocketmq.dashboard.model.QueueStatInfo;
Expand All @@ -65,6 +68,9 @@
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);

@Resource
private RMQConfigure configure;

private static final Set<String> SYSTEM_GROUP_SET = new HashSet<>();

static {
Expand Down Expand Up @@ -295,6 +301,9 @@ public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
for (String brokerName : deleteSubGroupRequest.getBrokerNameList()) {
logger.info("addr={} groupName={}", clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
mqAdminExt.deleteSubscriptionGroup(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr(), deleteSubGroupRequest.getGroupName(), true);
// delete %RETRY%+Group and %DLQ%+Group in broker and namesrv
deleteDlqOrRetryTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo);
deleteDlqOrRetryTopic(MixAll.DLQ_GROUP_TOPIC_PREFIX + deleteSubGroupRequest.getGroupName(), brokerName, clusterInfo);
}
}
catch (Exception e) {
Expand All @@ -303,6 +312,16 @@ public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
return true;
}

private void deleteDlqOrRetryTopic(String topic, String brokerName, ClusterInfo clusterInfo) throws Exception {
mqAdminExt.deleteTopicInBroker(Sets.newHashSet(clusterInfo.getBrokerAddrTable().get(brokerName).selectBrokerAddr()), topic);
Set<String> nameServerSet = null;
if (StringUtils.isNotBlank(configure.getNamesrvAddr())) {
String[] ns = configure.getNamesrvAddr().split(";");
nameServerSet = new HashSet<>(Arrays.asList(ns));
}
mqAdminExt.deleteTopicInNameServer(nameServerSet, topic);
}

@Override
public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public void testDelete() throws Exception {
final String url = "/consumer/deleteSubGroup.do";
{
doNothing().when(mqAdminExt).deleteSubscriptionGroup(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInBroker(any(), anyString());
doNothing().when(mqAdminExt).deleteTopicInNameServer(any(), anyString());
}
DeleteSubGroupRequest request = new DeleteSubGroupRequest();
request.setBrokerNameList(Lists.newArrayList("broker-a"));
Expand Down

0 comments on commit 00c6b20

Please sign in to comment.