Skip to content

Commit

Permalink
[INLONG-3089][Manager] Create group resource failed after approving o…
Browse files Browse the repository at this point in the history
…ne inlong group
  • Loading branch information
healchow committed Mar 12, 2022
1 parent baa8802 commit eeebb8d
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.util.Map;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class PulsarClusterInfo {

private String type;
private String adminUrl;
private String token;
private String brokerServiceUrl;
private Map<String, String> ext;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@
@ApiModel("Extended consumption information of different MQs")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR)
@JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
@JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
})
public class ConsumptionMqExtBase {

@ApiModelProperty(value = "Self-incrementing primary key")
@ApiModelProperty(value = "Primary key")
private Integer id;

@ApiModelProperty(value = "Consumer information ID")
@ApiModelProperty(value = "Consumption ID")
private Integer consumptionId;

@ApiModelProperty(value = "Consumer group")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface ThirdPartyClusterEntityMapper {

List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);

List<ThirdPartyClusterEntity> selectMqCluster(@Param("mqSetName") String mqSetName,
List<ThirdPartyClusterEntity> selectMQCluster(@Param("mqSetName") String mqSetName,
@Param("typeList") List<String> typeList);

ThirdPartyClusterEntity selectByName(@Param("name") String name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
where is_deleted = 0
and name = #{name, jdbcType=VARCHAR}
</select>
<select id="selectMqCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
<select id="selectMQCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
select
<include refid="Base_Column_List"/>
from third_party_cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
Expand All @@ -28,19 +29,15 @@
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
import org.apache.inlong.manager.service.core.InlongStreamService;
Expand All @@ -61,6 +58,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -82,8 +80,6 @@ public class CommonOperateService {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private DataProxyClusterEntityMapper dataProxyClusterMapper;
@Autowired
private ThirdPartyClusterEntityMapper thirdPartyClusterMapper;

/**
Expand All @@ -100,14 +96,14 @@ public String getSpecifiedParam(String key) {

switch (key) {
case Constant.PULSAR_SERVICEURL: {
clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
if (clusterEntity != null) {
result = clusterEntity.getUrl();
}
break;
}
case Constant.PULSAR_ADMINURL: {
clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
if (clusterEntity != null) {
params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
result = params.get(key);
Expand All @@ -117,7 +113,7 @@ public String getSpecifiedParam(String key) {
case Constant.CLUSTER_TUBE_MANAGER:
case Constant.CLUSTER_TUBE_CLUSTER_ID:
case Constant.TUBE_MASTER_URL: {
clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_TUBE);
clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE);
if (clusterEntity != null) {
if (key.equals(Constant.TUBE_MASTER_URL)) {
result = clusterEntity.getUrl();
Expand All @@ -129,61 +125,50 @@ public String getSpecifiedParam(String key) {
break;
}
}

return result;
}

/**
* Get third party cluster by type.
*
* TODO Add more condition for query.
* TODO Add data_proxy_cluster_name for query.
*
* @param type Cluster type, such as TUBE, PULSAR, etc.
*/
private ThirdPartyClusterEntity getThirdPartyCluster(String type) {
InlongGroupPageRequest groupPageRequest = new InlongGroupPageRequest();
groupPageRequest.setMiddlewareType(type);
List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(groupPageRequest);
if (groupEntities.isEmpty()) {
LOGGER.warn("no inlong group found by type={}", type);
private ThirdPartyClusterEntity getMQCluster(String type) {
List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
if (CollectionUtils.isEmpty(clusterList)) {
LOGGER.warn("no data proxy cluster found");
return null;
}

Integer clusterId = groupEntities.get(0).getProxyClusterId();
DataProxyClusterEntity dataProxyCluster = dataProxyClusterMapper.selectByPrimaryKey(clusterId);
if (dataProxyCluster == null) {
LOGGER.warn("no data proxy cluster found with id={}", clusterId);
String mqSetName = clusterList.get(0).getMqSetName();
List<ThirdPartyClusterEntity> mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName,
Collections.singletonList(type));
if (CollectionUtils.isEmpty(mqClusterList)) {
LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName);
return null;
}

String mqSetName = dataProxyCluster.getMqSetName();
ClusterPageRequest clusterRequest = new ClusterPageRequest();
clusterRequest.setMqSetName(mqSetName);
List<ThirdPartyClusterEntity> thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(clusterRequest);
if (CollectionUtils.isEmpty(thirdPartyClusters)) {
LOGGER.warn("no related third-party-cluster by type={} and mq set name={}", type, mqSetName);
return null;
}

return thirdPartyClusters.get(0);
return mqClusterList.get(0);
}

/**
* Get Pulsar cluster info.
* Get Pulsar cluster by the given type.
*
* @return Pulsar cluster info.
*/
public PulsarClusterInfo getPulsarClusterInfo() {
ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check"
+ "third party cluster table");
Map<String, String> configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class);
public PulsarClusterInfo getPulsarClusterInfo(String type) {
ThirdPartyClusterEntity clusterEntity = getMQCluster(type);
if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
throw new BusinessException("pulsar cluster or pulsar ext params is empty");
}
Map<String, String> configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build();
clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
pulsarClusterInfo.setAdminUrl(adminUrl);
pulsarClusterInfo.setType(thirdPartyClusterEntity.getType());
pulsarClusterInfo.setType(clusterEntity.getType());
return pulsarClusterInfo;
}

Expand Down Expand Up @@ -241,7 +226,7 @@ public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkR

// Get source info
String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
PulsarClusterInfo pulsarCluster = getPulsarClusterInfo();
PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
groupInfo, streamInfo, sourceResponse, sourceFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
Expand Down Expand Up @@ -66,6 +57,16 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Data consumption service
*/
Expand Down Expand Up @@ -119,7 +120,8 @@ public ConsumptionInfo get(Integer id) {

ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);

if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
String mqType = info.getMiddlewareType();
if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
Expand Down Expand Up @@ -147,11 +149,10 @@ public boolean isConsumerGroupIdExists(String consumerGroup, Integer excludeSelf
@Transactional(rollbackFor = Throwable.class)
public Integer save(ConsumptionInfo info, String operator) {
fullConsumptionInfo(info);

Date now = new Date();
ConsumptionEntity entity = this.saveConsumption(info, operator, now);

if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
String mqType = entity.getMiddlewareType();
if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
savePulsarInfo(info.getMqExtInfo(), entity);
}

Expand Down Expand Up @@ -232,7 +233,8 @@ public Boolean update(ConsumptionInfo info, String operator) {
entity.setModifyTime(now);

// Modify Pulsar consumption info
if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
String mqType = info.getMiddlewareType();
if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
Expand Down Expand Up @@ -338,10 +340,10 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co
}

log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
String middlewareType = bizInfo.getMiddlewareType();
String mqType = bizInfo.getMiddlewareType();
ConsumptionEntity entity = new ConsumptionEntity();
entity.setInlongGroupId(groupId);
entity.setMiddlewareType(middlewareType);
entity.setMiddlewareType(mqType);
entity.setTopic(topic);
entity.setConsumerGroupId(consumerGroup);
entity.setConsumerGroupName(consumerGroup);
Expand All @@ -355,7 +357,7 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co

consumptionMapper.insert(entity);

if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
pulsarEntity.setConsumptionId(entity.getId());
pulsarEntity.setConsumerGroupId(consumerGroup);
Expand All @@ -370,7 +372,8 @@ public void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String co
private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
NewConsumptionProcessForm form = new NewConsumptionProcessForm();
Integer id = consumptionInfo.getId();
if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
String mqType = consumptionInfo.getMiddlewareType();
if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
ConsumptionPulsarInfo::new);
Expand Down Expand Up @@ -426,11 +429,12 @@ private void fullConsumptionInfo(ConsumptionInfo info) {
Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);

// Tube’s topic is the inlong group level, one inlong group, one Tube topic
if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) {
String mqType = topicVO.getMiddlewareType();
if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
String bizTopic = topicVO.getMqResourceObj();
Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
"topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
} else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) {
} else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
// Pulsar's topic is the inlong stream level.
// There will be multiple inlong streams under one inlong group, and there will be multiple topics
List<InlongStreamTopicResponse> dsTopicList = topicVO.getDsTopicList();
Expand Down

0 comments on commit eeebb8d

Please sign in to comment.