Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix [Bug] process definition json worker group convert #2794 #2795

Merged
merged 23 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
20bfd6d
add LoggerServerTest UT
May 14, 2020
14376bf
add LoggerServerTest UT
May 14, 2020
0627947
add LoggerServerTest UT
May 14, 2020
c391bfd
test
May 14, 2020
0c496cb
master select worker filter high load worker #2704
May 14, 2020
41e9309
master select worker filter high load worker #2704
May 14, 2020
eb51718
master select worker filter high load worker #2704
May 14, 2020
9641fd6
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 18, 2020
72c0a36
master select worker filter high load worker #2704
May 18, 2020
f318e2e
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 18, 2020
49f1227
master select worker filter high load worker #2704
May 18, 2020
eb0c266
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 19, 2020
c724e78
master select worker filter high load worker #2704
May 19, 2020
fe80dc6
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 21, 2020
a84c1c3
add not worker log and remove worker invalid property
May 21, 2020
db78117
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 21, 2020
49dfa67
Merge remote-tracking branch 'remotes/upstream/dev-1.3.0' into dev-1.…
May 22, 2020
bf7e13f
process definition json worker group convert #2794
May 22, 2020
af53c00
process definition json worker group convert #2794
May 22, 2020
dff7939
process definition json worker group convert #2794
May 22, 2020
679964f
process definition json worker group convert #2794
May 22, 2020
2cdfab4
process definition json worker group convert #2794
May 22, 2020
164e986
process definition json worker group convert #2794
May 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public class TaskNode {
*/
private String workerGroup;

/**
* worker group id
*/
private Integer workerGroupId;


/**
* task time out
Expand Down Expand Up @@ -341,4 +346,12 @@ public String getConditionResult() {
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}

public Integer getWorkerGroupId() {
return workerGroupId;
}

public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,47 +33,12 @@ public class ShellExecutorTest {
@Test
public void execCommand() throws InterruptedException {

ThreadPoolExecutors executors = ThreadPoolExecutors.getInstance();
CountDownLatch latch = new CountDownLatch(200);

executors.execute(new Runnable() {
@Override
public void run() {

try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("groups");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0,5));
Thread.sleep(100l);
latch.countDown();
}

} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});

executors.execute(new Runnable() {
@Override
public void run() {

try {
int i =0;
while(i++ <= 100){
String res = ShellExecutor.execCommand("whoami");
logger.info("time:" + i + ",thread id:" + Thread.currentThread().getId() + ", result2:" + res);
Thread.sleep(100l);
latch.countDown();
}

} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});

latch.await();
try {
String res = ShellExecutor.execCommand("groups");
logger.info("thread id:" + Thread.currentThread().getId() + ", result:" + res.substring(0, 5));
} catch (Exception e) {
e.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,14 @@ public int getTenantId() {
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}

@Override
public String toString() {
return "ProcessData{" +
"tasks=" + tasks +
", globalParams=" + globalParams +
", timeout=" + timeout +
", tenantId=" + tenantId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.dao.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

import java.util.Date;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.upgrade;

import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

public class ProcessDefinitionDao {


public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class);

/**
* queryAllProcessDefinition
* @param conn jdbc connection
* @return ProcessDefinition Json List
*/
public Map<Integer,String> queryAllProcessDefinition(Connection conn){

Map<Integer,String> processDefinitionJsonMap = new HashMap<>();

String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();

while (rs.next()){
Integer id = rs.getInt(1);
String processDefinitionJson = rs.getString(2);
processDefinitionJsonMap.put(id,processDefinitionJson);
}

} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}

return processDefinitionJsonMap;
}


/**
* updateProcessDefinitionJson
* @param conn jdbc connection
* @param processDefinitionJsonMap processDefinitionJsonMap
*/
public void updateProcessDefinitionJson(Connection conn,Map<Integer,String> processDefinitionJsonMap){
String sql = "UPDATE t_ds_process_definition SET process_definition_json=? where id=?";
try {
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()){
try(PreparedStatement pstmt= conn.prepareStatement(sql)) {
pstmt.setString(1,entry.getValue());
pstmt.setInt(2,entry.getKey());
pstmt.executeUpdate();
}

}

} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
*/
package org.apache.dolphinscheduler.dao.upgrade;

import com.alibaba.druid.pool.DruidDataSource;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +32,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.*;

public abstract class UpgradeDao extends AbstractBaseDao {

Expand All @@ -44,6 +43,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
protected static final DataSource dataSource = getDataSource();
private static final DbType dbType = getCurrentDbType();


@Override
protected void init() {

Expand Down Expand Up @@ -119,6 +119,7 @@ public void initSchema(String initSqlPath) {
// Execute the dolphinscheduler DML, it can be rolled back
runInitDML(initSqlPath);


}

/**
Expand Down Expand Up @@ -256,6 +257,43 @@ public void upgradeDolphinScheduler(String schemaDir) {

upgradeDolphinSchedulerDML(schemaDir);

updateProcessDefinitionJsonWorkerGroup();


}

/**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup(){
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());

for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class);

List<TaskNode> tasks = processData.getTasks();
for (TaskNode taskNode : tasks){
Integer workerGroupId = taskNode.getWorkerGroupId();
if (workerGroupId == -1){
taskNode.setWorkerGroup("default");
}else {
taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId));
}
}
replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData));
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json workergroup error",e);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.upgrade;

import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

public class WorkerGroupDao {

public static final Logger logger = LoggerFactory.getLogger(WorkerGroupDao.class);

/**
* query all old worker group
* @param conn jdbc connection
* @return old worker group Map
*/
public Map<Integer,String> queryAllOldWorkerGroup(Connection conn){
Map<Integer,String> workerGroupMap = new HashMap<>();

String sql = String.format("select id,name from t_ds_worker_group");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();

while (rs.next()){
int id = rs.getInt(1);
String name = rs.getString(2);
workerGroupMap.put(id,name);
}

} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}

return workerGroupMap;
}
}

This file was deleted.

Loading