Skip to content

[Bug][Manager] Problems arising from the MQ cluster during deletion process #12106

@flowers-59f

Description

@flowers-59f

What happened

When creating an access in the data access module, you can select the corresponding MQ type to create a data flow group.
Image
Even without a corresponding MQ cluster, new connections can still be established and submitted for approval. However, issues may arise when deleting because there is no corresponding MQ cluster.
Image
Furthermore, I believe that if there is no corresponding MQ cluster, the creation should not be successful.

What you expected to happen

Deletion process
InlongGroupController.delete(...)
-> InlongGroupProcessService.deleteProcess(String groupId, String operator)
-> InlongGroupProcessService.invokeDeleteProcess(...)
-> workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form)
The workflow will call DeleteGroupWorkflowDefinition.defineProcess(), which includes the DeleteMQ task in the deletion process.
This task will execute QueueResourceListener.listen(...)
In the DELETE case, it calls queueOperator.deleteQueueForGroup(groupInfo, operator);
This method has different implementations for different MQs.
In Kafka

ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);

@Override  
public ClusterInfo getOne(String clusterTag, String name, String type) {  
    List<InlongClusterEntity> entityList = clusterMapper.selectByKey(clusterTag, name, type);  
    if (CollectionUtils.isEmpty(entityList)) {  
        throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s",  
                clusterTag, name, type));  
    }  
  
    InlongClusterEntity entity = entityList.get(0);  
    InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());  
    ClusterInfo result = instance.getFromEntity(entity);  
    LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", clusterTag, name, type);  
    return result;  
}

An exception will be thrown if no cluster of the corresponding type is found.
In the log

[ ] 2026-03-30 05:53:58.572 -ERROR [http-nio-8083-exec-10] a.i.m.w.e.LogableEventListener:88 - execute listener WorkflowEventLogEntity(id=null, processId=48, processName=DELETE_GROUP_PROCESS, processDisplayName=Delete Group, inlongGroupId=test_group_id, taskId=130, elementName=DeleteMQ, elementDisplayName=Group-DeleteMQ, eventType=TaskEvent, event=COMPLETE, listener=QueueResourceListener, startTime=Mon Mar 30 05:53:58 UTC 2026, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=cluster not found by tag=null, name=null, type=KAFKA) error:
org.apache.inlong.manager.common.exceptions.BusinessException: cluster not found by tag=null, name=null, type=KAFKA
at org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl.getOne(InlongClusterServiceImpl.java:545) ~[manager-service-2.3.0.jar:2.3.0]

In Pulsar

List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);

@Override  
public List<ClusterInfo> listByTagAndType(String clusterTag, String clusterType) {  
    List<InlongClusterEntity> clusterEntities = clusterMapper.selectByKey(clusterTag, null, clusterType);  
    if (CollectionUtils.isEmpty(clusterEntities)) {  
        throw new BusinessException(String.format("cannot find any cluster by tag %s and type %s",  
                clusterTag, clusterType));  
    }  
  
    List<ClusterInfo> clusterInfos = clusterEntities.stream()  
            .map(entity -> {  
                InlongClusterOperator operator = clusterOperatorFactory.getInstance(entity.getType());  
                return operator.getFromEntity(entity);  
            })  
            .collect(Collectors.toList());  
  
    LOGGER.debug("success to list inlong cluster by tag={}", clusterTag);  
    return clusterInfos;  
}

The TubeMQ deletion process has not been implemented yet, so no exception will be triggered.

How to reproduce

  1. Create a new connection and use an MQ without a corresponding MQ cluster.
  2. Delete it

Environment

No response

InLong version

2.3.0

InLong Component

InLong Manager

Are you willing to submit PR?

  • Yes, I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugSomething is wrong

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions