Skip to content
Permalink
Browse files
[INLONG-4220][Manager] Add StreamResourceProcessForm (#4234)
  • Loading branch information
kipshi committed May 18, 2022
1 parent 5db0eb8 commit 0a08141290ebcad4538bdf95d632d8a6ba7fe5f2
Showing 70 changed files with 1,261 additions and 422 deletions.
@@ -23,15 +23,21 @@
public enum StreamStatus {

DRAFT(0, "draft"),
DELETED(10, "deleted"),

// Stream related status
NEW(100, "new"),
CONFIG_ING(110, "in configure"),
CONFIG_FAILED(120, "configuration failed"),
CONFIG_SUCCESSFUL(130, "configuration successful"),

;
SUSPENDING(141, "suspending"),
SUSPENDED(140, "suspended"),

RESTARTING(151, "restarting"),
RESTARTED(150, "restarted"),

DELETING(41, "deleting"),
DELETED(40, "deleted");

private final Integer code;
private final String description;
@@ -113,4 +113,7 @@ public InlongStreamResponse genResponse() {
return CommonBeanUtils.copyProperties(this, InlongStreamResponse::new);
}

public InlongStreamRequest genRequest() {
return CommonBeanUtils.copyProperties(this, InlongStreamRequest::new);
}
}
@@ -32,6 +32,7 @@
@JsonSubTypes.Type(value = NewConsumptionProcessForm.class, name = NewConsumptionProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = GroupResourceProcessForm.class, name = GroupResourceProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = LightGroupResourceProcessForm.class, name = LightGroupResourceProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = StreamResourceProcessForm.class, name = StreamResourceProcessForm.FORM_NAME),
})
public abstract class BaseProcessForm implements ProcessForm {

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.pojo.workflow.form;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.exceptions.FormValidateException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;

/**
* Form of create inlong stream resource
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class StreamResourceProcessForm extends BaseProcessForm {

public static final String FORM_NAME = "StreamResourceProcessForm";

private InlongGroupInfo groupInfo;

private InlongStreamInfo streamInfo;

private GroupOperateType groupOperateType = GroupOperateType.INIT;

@Override
public void validate() throws FormValidateException {

}

@Override
public String getFormName() {
return FORM_NAME;
}

@Override
public String getInlongGroupId() {
return groupInfo.getInlongGroupId();
}
}
@@ -402,7 +402,7 @@ public boolean updateAfterApprove(InlongGroupApproveRequest approveInfo, String
@Override
@Transactional(rollbackFor = Throwable.class)
public void saveOrUpdateExt(String groupId, List<InlongGroupExtInfo> infoList) {
LOGGER.debug("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, infoList);
LOGGER.info("begin to save or update inlong group ext info, groupId={}, ext={}", groupId, infoList);

if (CollectionUtils.isEmpty(infoList)) {
return;
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.core.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;

/**
* Operation related to inlong stream process
*/
@Service
@Slf4j
public class InlongStreamProcessOperation {

private final ExecutorService executorService = new ThreadPoolExecutor(
20,
40,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("inlong-stream-process-%s").build(),
new CallerRunsPolicy());

@Autowired
private InlongGroupService groupService;

@Autowired
private InlongStreamService streamService;

@Autowired
private WorkflowService workflowService;


}
@@ -57,6 +57,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
@@ -241,7 +242,6 @@ public Boolean update(InlongStreamRequest request, String operator) {

CommonBeanUtils.copyProperties(request, streamEntity, true);
streamEntity.setModifier(operator);
streamEntity.setStatus(GroupStatus.CONFIG_ING.getCode());
streamMapper.updateByIdentifierSelective(streamEntity);

// Update field information
@@ -518,6 +518,7 @@ public boolean updateAfterApprove(List<InlongStreamApproveRequest> streamApprove
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean updateStatus(String groupId, String streamId, Integer status, String operator) {
LOGGER.debug("begin to update status by groupId={}, streamId={}", groupId, streamId);
streamMapper.updateStatusByIdentifier(groupId, streamId, status, operator);
@@ -29,6 +29,7 @@
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -31,6 +31,7 @@
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -46,7 +47,7 @@
* Create Pulsar tenant, namespace and topic
*/
@Slf4j
@Component()
@Component
public class CreatePulsarResourceTaskListener implements QueueOperateListener {

@Autowired
@@ -24,14 +24,13 @@
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -47,17 +46,13 @@
*/
@Slf4j
@Component
public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListener {
public class CreatePulsarSubscriptionTaskListener implements QueueOperateListener {

@Autowired
private CommonOperateService commonOperateService;
@Autowired
private ClusterBean clusterBean;
@Autowired
private InlongGroupService groupService;
@Autowired
private InlongStreamEntityMapper streamMapper;
@Autowired
private PulsarOptService pulsarOptService;
@Autowired
private StreamSinkService sinkService;
@@ -71,21 +66,13 @@ public TaskEvent event() {

@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();

InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
log.error("inlong group not found with groupId={}", groupId);
throw new WorkflowListenerException("inlong group not found with groupId=" + groupId);
}

InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (streamEntity == null) {
log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
return ListenerResult.success();
}
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
InlongGroupInfo groupInfo = form.getGroupInfo();
InlongStreamInfo streamInfo = form.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
final String namespace = groupInfo.getMqResource();
final String topic = streamInfo.getMqResource();
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
// Query data sink info based on groupId and streamId
@@ -95,17 +82,14 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
groupId, streamId);
return ListenerResult.success();
}

String tenant = clusterBean.getDefaultTenant();
PulsarTopicBean topicBean = new PulsarTopicBean();
topicBean.setTenant(clusterBean.getDefaultTenant());
topicBean.setNamespace(groupInfo.getMqResource());
String topic = streamEntity.getMqResource();
topicBean.setTenant(tenant);
topicBean.setNamespace(namespace);
topicBean.setTopicName(topic);
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);

// Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
String tenant = clusterBean.getDefaultTenant();
String namespace = groupInfo.getMqResource();
for (String cluster : pulsarClusters) {
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
@@ -23,11 +23,10 @@
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -44,18 +43,14 @@
*/
@Slf4j
@Component
public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListener {
public class CreatePulsarTopicTaskListener implements QueueOperateListener {

@Autowired
private CommonOperateService commonOperateService;
@Autowired
private ClusterBean clusterBean;
@Autowired
private PulsarOptService pulsarOptService;
@Autowired
private InlongGroupService groupService;
@Autowired
private InlongStreamEntityMapper streamMapper;

@Override
public TaskEvent event() {
@@ -64,17 +59,11 @@ public TaskEvent event() {

@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();

InlongGroupInfo groupInfo = groupService.get(groupId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (groupInfo == null || streamEntity == null) {
throw new WorkflowListenerException("inlong group or inlong stream not found with groupId=" + groupId
+ ", streamId=" + streamId);
}

StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
InlongGroupInfo groupInfo = form.getGroupInfo();
InlongStreamInfo streamInfo = form.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
@@ -83,7 +72,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
.token(globalCluster.getToken()).adminUrl(serviceUrl).build();
String pulsarTopic = streamEntity.getMqResource();
String pulsarTopic = streamInfo.getMqResource();
this.createTopic(groupInfo, pulsarTopic, pulsarClusterInfo);
}
} catch (Exception e) {
@@ -28,6 +28,7 @@
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -23,6 +23,7 @@
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;

0 comments on commit 0a08141

Please sign in to comment.