Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USE
@ResponseStatus(HttpStatus.OK)
@ApiException(CHECK_PROCESS_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result startCheckProcessDefinition(@RequestParam(value = "processDefinitionCode") int processDefinitionCode) {
public Result startCheckProcessDefinition(@RequestParam(value = "processDefinitionCode") long processDefinitionCode) {
Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode);
return returnDataList(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;

@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;

@Autowired
private TaskDefinitionMapper taskDefinitionMapper;

@Autowired
private SchedulerService schedulerService;

@Autowired
private TenantMapper tenantMapper;

/**
* create process definition
*
Expand Down Expand Up @@ -193,10 +205,14 @@ public Map<String, Object> createProcessDefinition(User loginUser,
return checkRelationJson;
}

Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
tenantId = tenant.getId();
}

long processDefinitionCode;
Expand All @@ -207,23 +223,11 @@ public Map<String, Object> createProcessDefinition(User loginUser,
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenant.getId());
globalParams, locations, timeout, loginUser.getId(), tenantId);

return createProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs);
}

@Autowired
TaskDefinitionLogMapper taskDefinitionLogMapper;

@Autowired
private TaskDefinitionMapper taskDefinitionMapper;

@Autowired
private SchedulerService schedulerService;

@Autowired
private TenantMapper tenantMapper;

private void createTaskDefinition(Map<String, Object> result,
User loginUser,
long projectCode,
Expand All @@ -240,12 +244,6 @@ private void createTaskDefinition(Map<String, Object> result,
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskDefinitionLog.getName());
if (taskDefinition != null) {
logger.error("task definition name {} already exists", taskDefinitionLog.getName());
putMsg(result, Status.TASK_DEFINITION_NAME_EXISTED, taskDefinitionLog.getName());
return;
}
}
if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) {
putMsg(result, Status.SUCCESS);
Expand Down Expand Up @@ -401,6 +399,10 @@ public Map<String, Object> queryProcessDefinitionByCode(User loginUser, long pro
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
if (tenant != null) {
processDefinition.setTenantCode(tenant.getTenantCode());
}
DagData dagData = processService.genDagData(processDefinition);
result.put(Constants.DATA_LIST, dagData);
putMsg(result, Status.SUCCESS);
Expand Down Expand Up @@ -475,10 +477,14 @@ public Map<String, Object> updateProcessDefinition(User loginUser,
return checkRelationJson;
}

Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
tenantId = tenant.getId();
}

ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
Expand All @@ -500,21 +506,28 @@ public Map<String, Object> updateProcessDefinition(User loginUser,
return result;
}
}
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenant.getId());
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs);
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId);
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs);
}

private Map<String, Object> updateProcessDefine(User loginUser,
Map<String, Object> result,
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition,
ProcessDefinition processDefinitionDeepCopy,
List<TaskDefinitionLog> taskDefinitionLogs) {
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
int insertVersion;
if (processDefinition.equals(processDefinitionDeepCopy)) {
insertVersion = processDefinitionDeepCopy.getVersion();
} else {
processDefinition.setUpdateTime(new Date());
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
}
if (insertVersion > 0) {
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult > 0) {
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
Expand Down Expand Up @@ -1286,7 +1299,7 @@ private void doBatchOperateProcessDefinition(User loginUser,
processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
createProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList());
} else {
updateProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList());
updateProcessDefine(loginUser, result, taskRelationList, processDefinition, null, Lists.newArrayList());
}
if (result.get(Constants.STATUS) != Status.SUCCESS) {
failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,17 @@ public Map<String, Object> updateProcessInstance(User loginUser, long projectCod
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
tenantId = tenant.getId();
}

processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenant.getId());
processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
if (insertVersion > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.text.MessageFormat;
Expand Down Expand Up @@ -96,6 +98,8 @@ public class ProcessDefinitionServiceTest {
private ProcessInstanceService processInstanceService;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock
private TenantMapper tenantMapper;

@Test
public void testQueryProcessDefinitionList() {
Expand Down Expand Up @@ -173,7 +177,9 @@ public void testQueryProcessDefinitionByCode() {
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);

Tenant tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("root");
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);

Expand All @@ -195,6 +201,7 @@ public void testQueryProcessDefinitionByCode() {
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(tenantMapper.queryById(1)).thenReturn(tenant);
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private Constants() {
*/
public static final String DATASOURCE_PROPERTIES = "/datasource.properties";

public static final String DEFAULT = "Default";
public static final String DEFAULT = "default";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String XXXXXX = "******";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ public enum TaskTimeoutStrategy {
* 2 warn+failed
*/
WARN(0, "warn"),
FAILED(1,"failed"),
WARNFAILED(2,"warnfailed");
FAILED(1, "failed"),
WARNFAILED(2, "warnfailed");


TaskTimeoutStrategy(int code, String descp){
TaskTimeoutStrategy(int code, String descp) {
this.code = code;
this.descp = descp;
}
Expand All @@ -49,9 +48,9 @@ public String getDescp() {
return descp;
}

public static TaskTimeoutStrategy of(int status){
for(TaskTimeoutStrategy es : values()){
if(es.getCode() == status){
public static TaskTimeoutStrategy of(int status) {
for (TaskTimeoutStrategy es : values()) {
if (es.getCode() == status) {
return es;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import com.baomidou.mybatisplus.annotation.IdType;
Expand Down Expand Up @@ -149,6 +150,12 @@ public class ProcessDefinition {
*/
private int tenantId;

/**
* tenant code
*/
@TableField(exist = false)
private String tenantCode;

/**
* modify user name
*/
Expand All @@ -167,7 +174,8 @@ public class ProcessDefinition {
@TableField(exist = false)
private int warningGroupId;

public ProcessDefinition() {}
public ProcessDefinition() {
}

public ProcessDefinition(long projectCode,
String name,
Expand Down Expand Up @@ -356,6 +364,14 @@ public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}

public String getTenantCode() {
return tenantCode;
}

public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}

public String getDescription() {
return description;
}
Expand Down Expand Up @@ -396,12 +412,33 @@ public void setWarningGroupId(int warningGroupId) {
this.warningGroupId = warningGroupId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessDefinition that = (ProcessDefinition) o;
return projectCode == that.projectCode
&& userId == that.userId
&& timeout == that.timeout
&& tenantId == that.tenantId
&& Objects.equals(name, that.name)
&& releaseState == that.releaseState
&& Objects.equals(description, that.description)
&& Objects.equals(globalParams, that.globalParams)
&& flag == that.flag
&& Objects.equals(locations, that.locations);
}

@Override
public String toString() {
return "ProcessDefinition{"
+ "id=" + id
+ ", name='" + name + '\''
+ ", code=" + code
+ ", name='" + name + '\''
+ ", version=" + version
+ ", releaseState=" + releaseState
+ ", projectCode=" + projectCode
Expand All @@ -418,10 +455,11 @@ public String toString() {
+ ", locations='" + locations + '\''
+ ", scheduleReleaseState=" + scheduleReleaseState
+ ", timeout=" + timeout
+ ", warningGroupId=" + warningGroupId
+ ", tenantId=" + tenantId
+ ", tenantCode='" + tenantCode + '\''
+ ", modifyBy='" + modifyBy + '\''
+ ", resourceIds='" + resourceIds + '\''
+ ", warningGroupId=" + warningGroupId
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ public void setOperateTime(Date operateTime) {
this.operateTime = operateTime;
}

@Override
public boolean equals(Object o) {
return super.equals(o);
}
}
Loading