From eeebb8dcabebda82c6a5e0d0fcb33525bd9325fc Mon Sep 17 00:00:00 2001 From: healchow Date: Sat, 12 Mar 2022 22:11:24 +0800 Subject: [PATCH] [INLONG-3089][Manager] Create group resource failed after approving one inlong group --- .../pojo/dataproxy/PulsarClusterInfo.java | 4 +- .../consumption/ConsumptionMqExtBase.java | 7 +- .../mapper/ThirdPartyClusterEntityMapper.java | 2 +- .../mappers/ThirdPartyClusterEntityMapper.xml | 2 +- .../manager/service/CommonOperateService.java | 67 +++++++------------ .../core/impl/ConsumptionServiceImpl.java | 44 ++++++------ .../core/impl/InlongGroupServiceImpl.java | 34 +++++----- .../impl/ThirdPartyClusterServiceImpl.java | 15 +++-- ...reatePulsarGroupForStreamTaskListener.java | 2 +- .../mq/CreatePulsarGroupTaskListener.java | 10 +-- .../mq/CreatePulsarResourceTaskListener.java | 2 +- ...reatePulsarTopicForStreamTaskListener.java | 2 +- .../thirdparty/mq/PulsarEventSelector.java | 8 +-- .../thirdparty/sort/util/SourceInfoUtils.java | 9 +-- .../ConsumptionCompleteProcessListener.java | 10 +-- .../CreateStreamWorkflowDefinition.java | 12 ++-- 16 files changed, 113 insertions(+), 117 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java index a0733e24b3a..f03204ed0a3 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java @@ -25,13 +25,15 @@ import java.util.Map; @Data +@Builder @AllArgsConstructor @NoArgsConstructor -@Builder public class PulsarClusterInfo { + private String type; private String adminUrl; private String token; private String brokerServiceUrl; private Map ext; + } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java index 6bff8a7b321..0f0784ffcf7 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java @@ -32,14 +32,15 @@ @ApiModel("Extended consumption information of different MQs") @JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class) @JsonSubTypes({ - @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR) + @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR), + @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR) }) public class ConsumptionMqExtBase { - @ApiModelProperty(value = "Self-incrementing primary key") + @ApiModelProperty(value = "Primary key") private Integer id; - @ApiModelProperty(value = "Consumer information ID") + @ApiModelProperty(value = "Consumption ID") private Integer consumptionId; @ApiModelProperty(value = "Consumer group") diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java index fe13d803581..962782abfe0 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java @@ -39,7 +39,7 @@ public interface ThirdPartyClusterEntityMapper { List selectByType(@Param("type") String type); - List selectMqCluster(@Param("mqSetName") String mqSetName, + List selectMQCluster(@Param("mqSetName") String mqSetName, @Param("typeList") List typeList); ThirdPartyClusterEntity selectByName(@Param("name") String name); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml index f67ae628da9..a74ecb80752 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml @@ -225,7 +225,7 @@ where is_deleted = 0 and name = #{name, jdbcType=VARCHAR} - select from third_party_cluster diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java index b171f90ef68..baec6e72a28 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java @@ -20,6 +20,7 @@ import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo; import org.apache.inlong.manager.common.beans.ClusterBean; import org.apache.inlong.manager.common.enums.Constant; @@ -28,19 +29,15 @@ import org.apache.inlong.manager.common.enums.SinkType; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; -import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest; import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo; -import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest; import org.apache.inlong.manager.common.pojo.sink.SinkResponse; import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse; import org.apache.inlong.manager.common.pojo.source.SourceResponse; import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; -import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity; -import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper; import org.apache.inlong.manager.service.core.InlongStreamService; @@ -61,6 +58,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,8 +80,6 @@ public class CommonOperateService { @Autowired private InlongGroupEntityMapper groupMapper; @Autowired - private DataProxyClusterEntityMapper dataProxyClusterMapper; - @Autowired private ThirdPartyClusterEntityMapper thirdPartyClusterMapper; /** @@ -100,14 +96,14 @@ public String getSpecifiedParam(String key) { switch (key) { case Constant.PULSAR_SERVICEURL: { - clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR); + clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR); if (clusterEntity != null) { result = clusterEntity.getUrl(); } break; } case Constant.PULSAR_ADMINURL: { - clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR); + clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR); if (clusterEntity != null) { params = gson.fromJson(clusterEntity.getExtParams(), Map.class); result = params.get(key); @@ -117,7 +113,7 @@ public String getSpecifiedParam(String key) { case Constant.CLUSTER_TUBE_MANAGER: case Constant.CLUSTER_TUBE_CLUSTER_ID: case Constant.TUBE_MASTER_URL: { - clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_TUBE); + clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE); if (clusterEntity != null) { if (key.equals(Constant.TUBE_MASTER_URL)) { result = clusterEntity.getUrl(); @@ -129,61 +125,50 @@ public String getSpecifiedParam(String key) { break; } } - return result; } /** * Get third party cluster by type. * - * TODO Add more condition for query. + * TODO Add data_proxy_cluster_name for query. * * @param type Cluster type, such as TUBE, PULSAR, etc. */ - private ThirdPartyClusterEntity getThirdPartyCluster(String type) { - InlongGroupPageRequest groupPageRequest = new InlongGroupPageRequest(); - groupPageRequest.setMiddlewareType(type); - List groupEntities = groupMapper.selectByCondition(groupPageRequest); - if (groupEntities.isEmpty()) { - LOGGER.warn("no inlong group found by type={}", type); + private ThirdPartyClusterEntity getMQCluster(String type) { + List clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY); + if (CollectionUtils.isEmpty(clusterList)) { + LOGGER.warn("no data proxy cluster found"); return null; } - - Integer clusterId = groupEntities.get(0).getProxyClusterId(); - DataProxyClusterEntity dataProxyCluster = dataProxyClusterMapper.selectByPrimaryKey(clusterId); - if (dataProxyCluster == null) { - LOGGER.warn("no data proxy cluster found with id={}", clusterId); + String mqSetName = clusterList.get(0).getMqSetName(); + List mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName, + Collections.singletonList(type)); + if (CollectionUtils.isEmpty(mqClusterList)) { + LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName); return null; } - String mqSetName = dataProxyCluster.getMqSetName(); - ClusterPageRequest clusterRequest = new ClusterPageRequest(); - clusterRequest.setMqSetName(mqSetName); - List thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(clusterRequest); - if (CollectionUtils.isEmpty(thirdPartyClusters)) { - LOGGER.warn("no related third-party-cluster by type={} and mq set name={}", type, mqSetName); - return null; - } - - return thirdPartyClusters.get(0); + return mqClusterList.get(0); } /** - * Get Pulsar cluster info. + * Get Pulsar cluster by the given type. * * @return Pulsar cluster info. */ - public PulsarClusterInfo getPulsarClusterInfo() { - ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR); - Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check" - + "third party cluster table"); - Map configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class); + public PulsarClusterInfo getPulsarClusterInfo(String type) { + ThirdPartyClusterEntity clusterEntity = getMQCluster(type); + if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) { + throw new BusinessException("pulsar cluster or pulsar ext params is empty"); + } + Map configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class); PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl( - thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build(); + clusterEntity.getUrl()).token(clusterEntity.getToken()).build(); String adminUrl = configParams.get(Constant.PULSAR_ADMINURL); Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table"); pulsarClusterInfo.setAdminUrl(adminUrl); - pulsarClusterInfo.setType(thirdPartyClusterEntity.getType()); + pulsarClusterInfo.setType(clusterEntity.getType()); return pulsarClusterInfo; } @@ -241,7 +226,7 @@ public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkR // Get source info String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL); - PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(); + PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType()); InlongStreamInfo streamInfo = streamService.get(groupId, streamId); SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean, groupInfo, streamInfo, sourceResponse, sourceFields); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java index 72cdf7d5967..fc8b1d8d84a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java @@ -20,15 +20,6 @@ import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; -import java.util.Arrays; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.inlong.manager.common.beans.ClusterBean; import org.apache.inlong.manager.common.enums.Constant; @@ -66,6 +57,16 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + /** * Data consumption service */ @@ -119,7 +120,8 @@ public ConsumptionInfo get(Integer id) { ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) { + String mqType = info.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId()); Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar"); ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new); @@ -147,11 +149,10 @@ public boolean isConsumerGroupIdExists(String consumerGroup, Integer excludeSelf @Transactional(rollbackFor = Throwable.class) public Integer save(ConsumptionInfo info, String operator) { fullConsumptionInfo(info); - Date now = new Date(); ConsumptionEntity entity = this.saveConsumption(info, operator, now); - - if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) { + String mqType = entity.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { savePulsarInfo(info.getMqExtInfo(), entity); } @@ -232,7 +233,8 @@ public Boolean update(ConsumptionInfo info, String operator) { entity.setModifyTime(now); // Modify Pulsar consumption info - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) { + String mqType = info.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId); Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null"); pulsarEntity.setConsumerGroupId(info.getConsumerGroupId()); @@ -338,10 +340,10 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co } log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup); - String middlewareType = bizInfo.getMiddlewareType(); + String mqType = bizInfo.getMiddlewareType(); ConsumptionEntity entity = new ConsumptionEntity(); entity.setInlongGroupId(groupId); - entity.setMiddlewareType(middlewareType); + entity.setMiddlewareType(mqType); entity.setTopic(topic); entity.setConsumerGroupId(consumerGroup); entity.setConsumerGroupName(consumerGroup); @@ -355,7 +357,7 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co consumptionMapper.insert(entity); - if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) { + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity(); pulsarEntity.setConsumptionId(entity.getId()); pulsarEntity.setConsumerGroupId(consumerGroup); @@ -370,7 +372,8 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) { NewConsumptionProcessForm form = new NewConsumptionProcessForm(); Integer id = consumptionInfo.getId(); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) { + String mqType = consumptionInfo.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id); ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity, ConsumptionPulsarInfo::new); @@ -426,11 +429,12 @@ private void fullConsumptionInfo(ConsumptionInfo info) { Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId); // Tube’s topic is the inlong group level, one inlong group, one Tube topic - if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) { + String mqType = topicVO.getMiddlewareType(); + if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { String bizTopic = topicVO.getMqResourceObj(); Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()), "topic [" + info.getTopic() + "] not belong to inlong group " + groupId); - } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) { + } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { // Pulsar's topic is the inlong stream level. // There will be multiple inlong streams under one inlong group, and there will be multiple topics List dsTopicList = topicVO.getDsTopicList(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java index d55a3c4c013..cff6760fd29 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java @@ -117,7 +117,8 @@ public String save(InlongGroupRequest groupInfo, String operator) { groupMapper.insertSelective(entity); this.saveOrUpdateExt(groupId, groupInfo.getExtList()); - if (Constant.MIDDLEWARE_PULSAR.equals(groupInfo.getMiddlewareType())) { + String mqType = groupInfo.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo(); Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar"); @@ -172,23 +173,23 @@ public InlongGroupInfo get(String groupId) { groupInfo.setExtList(extInfoList); // If the middleware is Pulsar, we need to encapsulate Pulsar related data - String middlewareType = entity.getMiddlewareType(); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + String mqType = entity.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { InlongGroupPulsarEntity pulsarEntity = groupPulsarMapper.selectByGroupId(groupId); - Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found under the inlong group"); + Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found by the groupId=" + groupId); InlongGroupPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, InlongGroupPulsarInfo::new); - pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR); + pulsarInfo.setMiddlewareType(mqType); groupInfo.setMqExtInfo(pulsarInfo); } // For approved inlong group, encapsulate the cluster address of the middleware if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) { - if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) { + if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(mqType)) { groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL)); - } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { - PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(); - groupInfo.setPulsarAdminUrl(pulsarClusterInfo.getAdminUrl()); - groupInfo.setPulsarServiceUrl(pulsarClusterInfo.getBrokerServiceUrl()); + } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { + PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType); + groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl()); + groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl()); } } @@ -240,7 +241,8 @@ public String update(InlongGroupRequest groupRequest, String operator) { this.saveOrUpdateExt(groupId, groupRequest.getExtList()); // Update the Pulsar info - if (Constant.MIDDLEWARE_PULSAR.equals(groupRequest.getMiddlewareType())) { + String mqType = groupRequest.getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo(); Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar"); Integer writeQuorum = pulsarInfo.getWriteQuorum(); @@ -401,25 +403,25 @@ public InlongGroupTopicResponse getTopic(String groupId) { LOGGER.debug("begin to get topic by groupId={}", groupId); InlongGroupInfo groupInfo = this.get(groupId); - String middlewareType = groupInfo.getMiddlewareType(); + String mqType = groupInfo.getMiddlewareType(); InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse(); - if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) { + if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { // Tube Topic corresponds to inlong group one-to-one topicVO.setMqResourceObj(groupInfo.getMqResourceObj()); topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL)); - } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { // Pulsar's topic corresponds to the inlong stream one-to-one topicVO.setDsTopicList(streamService.getTopicList(groupId)); topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL)); topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL)); } else { - LOGGER.error("middleware type={} not supported", middlewareType); + LOGGER.error("middleware type={} not supported", mqType); throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED); } topicVO.setInlongGroupId(groupId); - topicVO.setMiddlewareType(middlewareType); + topicVO.setMiddlewareType(mqType); return topicVO; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java index b7e5cb79d40..2247b6843c6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java @@ -211,10 +211,11 @@ public List getConfig() { DataProxyConfig config = new DataProxyConfig(); config.setM(groupEntity.getSchemaName()); - if (Constant.MIDDLEWARE_TUBE.equals(groupEntity.getMiddlewareType())) { + String mqType = groupEntity.getMiddlewareType(); + if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { config.setInlongGroupId(groupId); config.setTopic(bizResource); - } else if (Constant.MIDDLEWARE_PULSAR.equals(groupEntity.getMiddlewareType())) { + } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { List streamList = streamMapper.selectByGroupId(groupId); for (InlongStreamEntity stream : streamList) { String topic = stream.getMqResourceObj(); @@ -248,9 +249,9 @@ public ThirdPartyClusterDTO getConfigV2(String clusterName) { } // third-party-cluster type - String middlewareType = ""; + String mqType = ""; if (!groupEntityList.isEmpty()) { - middlewareType = groupEntityList.get(0).getMiddlewareType(); + mqType = groupEntityList.get(0).getMiddlewareType(); } // Get topic list by group id @@ -258,7 +259,7 @@ public ThirdPartyClusterDTO getConfigV2(String clusterName) { for (InlongGroupEntity groupEntity : groupEntityList) { final String groupId = groupEntity.getInlongGroupId(); final String mqResource = groupEntity.getMqResourceObj(); - if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) { + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { List streamList = streamMapper.selectByGroupId(groupId); for (InlongStreamEntity stream : streamList) { DataProxyConfig topicConfig = new DataProxyConfig(); @@ -273,7 +274,7 @@ public ThirdPartyClusterDTO getConfigV2(String clusterName) { topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic); topicList.add(topicConfig); } - } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) { + } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { DataProxyConfig topicConfig = new DataProxyConfig(); topicConfig.setInlongGroupId(groupId); topicConfig.setTopic(mqResource); @@ -285,7 +286,7 @@ public ThirdPartyClusterDTO getConfigV2(String clusterName) { List mqSet = new ArrayList<>(); List clusterType = Arrays.asList(Constant.CLUSTER_TUBE, Constant.CLUSTER_PULSAR, Constant.CLUSTER_TDMQ_PULSAR); - List clusterList = thirdPartyClusterMapper.selectMqCluster( + List clusterList = thirdPartyClusterMapper.selectMQCluster( clusterEntity.getMqSetName(), clusterType); for (ThirdPartyClusterEntity cluster : clusterList) { ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java index a814379ff76..d4ec8fbc986 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java @@ -86,7 +86,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId); return ListenerResult.success(); } - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(); + PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType()); try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { // Query data sink info based on groupId and streamId List sinkTypeList = sinkService.getSinkTypeList(groupId, streamId); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java index 304dc787418..c22342d0f14 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java @@ -70,8 +70,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm(); String groupId = form.getInlongGroupId(); - InlongGroupInfo bizInfo = groupService.get(groupId); - if (bizInfo == null) { + InlongGroupInfo groupInfo = groupService.get(groupId); + if (groupInfo == null) { log.error("inlong group not found with groupId={}", groupId); throw new WorkflowListenerException("inlong group not found with groupId=" + groupId); } @@ -82,10 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId); return ListenerResult.success(); } - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(); + PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType()); try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { String tenant = clusterBean.getDefaultTenant(); - String namespace = bizInfo.getMqResourceObj(); + String namespace = groupInfo.getMqResourceObj(); for (InlongStreamEntity streamEntity : streamEntities) { PulsarTopicBean topicBean = new PulsarTopicBean(); @@ -114,7 +114,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription); // Insert the consumption data into the consumption table - consumptionService.saveSortConsumption(bizInfo, topic, subscription); + consumptionService.saveSortConsumption(groupInfo, topic, subscription); } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java index 9cb3d889a7f..b0b882714f2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java @@ -77,7 +77,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc if (groupInfo == null) { throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId); } - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(); + PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType()); try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); for (String cluster : pulsarClusters) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java index 3864ac25e55..2811b5feb79 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java @@ -76,7 +76,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc } log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId); - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(); + PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType()); try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { List pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin); for (String cluster : pulsarClusters) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java index 5da60b875ea..0bf990c36dc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java @@ -21,8 +21,8 @@ import org.apache.inlong.manager.common.enums.Constant; import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo; import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm; -import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm; +import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.EventSelector; @Slf4j @@ -35,13 +35,13 @@ public boolean accept(WorkflowContext context) { return false; } GroupResourceProcessForm form = (GroupResourceProcessForm) processForm; - String middlewareType = form.getGroupInfo().getMiddlewareType(); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + String mqType = form.getGroupInfo().getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo(); return pulsarInfo.getEnableCreateResource() == 1; } log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}", - form.getInlongGroupId(), middlewareType); + form.getInlongGroupId(), mqType); return false; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java index 2df1936c225..13a74c846d0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java @@ -64,17 +64,17 @@ public static SourceInfo createSourceInfo(PulsarClusterInfo pulsarCluster, Strin ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, SourceResponse sourceResponse, List sourceFields) { - String middleWareType = groupInfo.getMiddlewareType(); + String mqType = groupInfo.getMiddlewareType(); DeserializationInfo deserializationInfo = SerializationUtils.createDeserialInfo(sourceResponse, streamInfo); SourceInfo sourceInfo; - if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) { + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo, sourceFields); - } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) { + } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { // InlongGroupInfo groupInfo, String masterAddress, sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, sourceFields); } else { - throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", middleWareType)); + throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", mqType)); } return sourceInfo; @@ -98,6 +98,7 @@ private static SourceInfo createPulsarSourceInfo(PulsarClusterInfo pulsarCluster final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + topicName; final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group"; FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]); + String type = pulsarCluster.getType(); if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) { return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java index 1e0d757661f..7f0b49d7fbd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java @@ -84,14 +84,14 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc throw new WorkflowListenerException("consumption not exits for id=" + consumptionId); } - String middlewareType = entity.getMiddlewareType(); - if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) { + String mqType = entity.getMiddlewareType(); + if (Constant.MIDDLEWARE_TUBE.equals(mqType)) { this.createTubeConsumerGroup(entity); return ListenerResult.success("Create Tube consumer group successful"); - } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { this.createPulsarTopicMessage(entity); } else { - throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported"); + throw new WorkflowListenerException("middleware type [" + mqType + "] not supported"); } this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId()); @@ -119,7 +119,7 @@ private void createPulsarTopicMessage(ConsumptionEntity entity) { Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId); String mqResourceObj = groupInfo.getMqResourceObj(); Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId); - PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(); + PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMiddlewareType()); try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) { PulsarTopicBean topicMessage = new PulsarTopicBean(); String tenant = clusterBean.getDefaultTenant(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java index c10c80d06aa..8e879eca917 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java @@ -89,12 +89,12 @@ public WorkflowProcess defineProcess() { ServiceTask createPulsarTopicTask = new ServiceTask(); createPulsarTopicTask.setSkipResolver(c -> { GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm(); - String middlewareType = form.getGroupInfo().getMiddlewareType(); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + String mqType = form.getGroupInfo().getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { return false; } log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}", - form.getInlongGroupId(), form.getInlongStreamId(), middlewareType); + form.getInlongGroupId(), form.getInlongStreamId(), mqType); return true; }); createPulsarTopicTask.setName("createPulsarTopic"); @@ -105,12 +105,12 @@ public WorkflowProcess defineProcess() { ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask(); createPulsarSubscriptionGroupTask.setSkipResolver(c -> { GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm(); - String middlewareType = form.getGroupInfo().getMiddlewareType(); - if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) { + String mqType = form.getGroupInfo().getMiddlewareType(); + if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) { return false; } log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}", - form.getInlongGroupId(), form.getInlongStreamId(), middlewareType); + form.getInlongGroupId(), form.getInlongStreamId(), mqType); return true; }); createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");