Skip to content

Commit

Permalink
Merge pull request #41 from DataLinkDC/dev
Browse files Browse the repository at this point in the history
0.4.0-rc1
  • Loading branch information
aiwenmo committed Nov 29, 2021
2 parents 9cbaeda + 692d55f commit 524ffda
Show file tree
Hide file tree
Showing 41 changed files with 564 additions and 72 deletions.
12 changes: 4 additions & 8 deletions dlink-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -198,7 +198,7 @@
</configuration>
</plugin>-->

<plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>${maven.resource.version}</version>
Expand All @@ -214,18 +214,14 @@
<overwrite>true</overwrite>
<resources>
<resource>
<!-- 因为dlink-web打包目录在项目跟目录,所以从这里复制 -->
&lt;!&ndash; 因为dlink-web打包目录在项目跟目录,所以从这里复制 &ndash;&gt;
<directory>../dlink-web/dist</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
</plugin>
</plugin>-->
</plugins>
<finalName>${project.artifactId}-${project.version}</finalName>
</build>
Expand Down
29 changes: 27 additions & 2 deletions dlink-admin/src/main/java/com/dlink/model/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.dlink.assertion.Asserts;
import com.dlink.db.model.SuperEntity;
import com.dlink.job.JobConfig;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 任务
Expand Down Expand Up @@ -46,7 +52,7 @@ public class Task extends SuperEntity{

private Integer jarId;

private String config;
private String configJson;

private String note;

Expand All @@ -59,6 +65,21 @@ public class Task extends SuperEntity{
@TableField(exist = false)
private List<Savepoints> savepoints;

@TableField(exist = false)
private List<Map<String,String>> config = new ArrayList<>();


public List<Map<String,String>> parseConfig(){
ObjectMapper objectMapper = new ObjectMapper();
try {
if(Asserts.isNotNullString(configJson)) {
config = objectMapper.readValue(configJson, ArrayList.class);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return config;
}
/*public ExecutorSetting buildExecutorSetting(){
HashMap configMap = new HashMap();
if(config!=null&&!"".equals(clusterName)) {
Expand All @@ -72,7 +93,11 @@ public JobConfig buildSubmitConfig(){
if(clusterId==null||clusterId==0){
useRemote = false;
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath);
Map<String,String> map = new HashMap<>();
for(Map<String,String> item : config){
map.put(item.get("key"),item.get("value"));
}
return new JobConfig(type,false,false,useRemote,clusterId,clusterConfigurationId,jarId,getId(),alias,fragment,statementSet,checkPoint,parallelism,savePointStrategy,savePointPath,map);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ private String buildParas(Integer id) {

@Override
public JobResult submitByTaskId(Integer id) {
Task task = this.getById(id);
Task task = this.getTaskInfoById(id);
Assert.check(task);
boolean isJarTask = isJarTask(task);
Statement statement = null;
/*Statement statement = null;
if(!isJarTask){
statement = statementService.getById(id);
Assert.check(statement);
}
}*/
JobConfig config = task.buildSubmitConfig();
if (!JobManager.useGateway(config.getType())) {
config.setAddress(clusterService.buildEnvironmentAddress(config.isUseRemote(), task.getClusterId()));
Expand Down Expand Up @@ -100,7 +100,7 @@ public JobResult submitByTaskId(Integer id) {
}
JobManager jobManager = JobManager.build(config);
if(!isJarTask) {
return jobManager.executeSql(statement.getStatement());
return jobManager.executeSql(task.getStatement());
}else{
return jobManager.executeJar();
}
Expand All @@ -114,6 +114,7 @@ private boolean isJarTask(Task task){
public Task getTaskInfoById(Integer id) {
Task task = this.getById(id);
if (task != null) {
task.parseConfig();
Statement statement = statementService.getById(id);
if (task.getClusterId() != null) {
Cluster cluster = clusterService.getById(task.getClusterId());
Expand Down
4 changes: 2 additions & 2 deletions dlink-admin/src/main/resources/mapper/TaskMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<result column="cluster_id" property="clusterId" />
<result column="cluster_configuration_id" property="clusterConfigurationId" />
<result column="jar_id" property="jarId" />
<result column="config" property="config" />
<result column="config_json" property="configJson" />
<result column="note" property="note" />
<result column="enabled" property="enabled" />
<result column="create_time" property="createTime" />
Expand All @@ -26,7 +26,7 @@

<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config,note, enabled, create_time, update_time
id, name, alias, type,check_point,save_point_strategy,save_point_path, parallelism,fragment,statement_set,cluster_id,cluster_configuration_id,jar_id,config_json,note, enabled, create_time, update_time
</sql>


Expand Down
2 changes: 1 addition & 1 deletion dlink-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
6 changes: 3 additions & 3 deletions dlink-assembly/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
</fileSet>

<!-- 将模块dlink-extends的常用jar文件放到打包目录/plugins下 -->
<!--<fileSet>
<fileSet>
<directory>${project.parent.basedir}/dlink-extends/target
</directory>
<outputDirectory>plugins</outputDirectory>
Expand All @@ -146,8 +146,8 @@
</includes>
</fileSet>

&lt;!&ndash; 将模块dlink-extends的jar文件放到打包目录/extends下 &ndash;&gt;
<fileSet>
<!-- 将模块dlink-extends的jar文件放到打包目录/extends下 -->
<!--<fileSet>
<directory>${project.parent.basedir}/dlink-extends/target
</directory>
<outputDirectory>extends</outputDirectory>
Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.11/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/dlink-client-1.14/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-client</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion dlink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-connectors/dlink-connector-jdbc-1.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-connectors/dlink-connector-jdbc-1.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion dlink-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
Expand Down
2 changes: 1 addition & 1 deletion dlink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>dlink</artifactId>
<groupId>com.dlink</groupId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
20 changes: 13 additions & 7 deletions dlink-core/src/main/java/com/dlink/job/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public JobConfig(String type,boolean useResult, boolean useSession, String sessi

public JobConfig(String type,boolean useResult, boolean useSession, boolean useRemote, Integer clusterId,
Integer clusterConfigurationId, Integer jarId, Integer taskId, String jobName, boolean useSqlFragment,
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue, String savePointPath) {
boolean useStatementSet,Integer checkpoint, Integer parallelism, Integer savePointStrategyValue,
String savePointPath,Map<String,String> config) {
this.type = type;
this.useResult = useResult;
this.useSession = useSession;
Expand All @@ -99,10 +100,11 @@ public JobConfig(String type,boolean useResult, boolean useSession, boolean useR
this.parallelism = parallelism;
this.savePointStrategy = SavePointStrategy.get(savePointStrategyValue);
this.savePointPath = savePointPath;
this.config = config;
}

public ExecutorSetting getExecutorSetting(){
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName);
return new ExecutorSetting(checkpoint,parallelism,useSqlFragment,savePointPath,jobName,config);
}

public void setSessionConfig(SessionConfig sessionConfig){
Expand All @@ -118,12 +120,16 @@ public void buildGatewayConfig(Map<String,Object> config){
gatewayConfig.setClusterConfig(ClusterConfig.build(config.get("flinkConfigPath").toString(),
config.get("flinkLibPath").toString(),
config.get("hadoopConfigPath").toString()));
AppConfig appConfig = new AppConfig();
if(config.containsKey("userJarPath")){
gatewayConfig.setAppConfig(AppConfig.build(
config.get("userJarPath").toString(),
config.get("userJarParas").toString(),
config.get("userJarMainAppClass").toString()
));
appConfig.setUserJarPath(config.get("userJarPath").toString());
if(config.containsKey("userJarMainAppClass")){
appConfig.setUserJarMainAppClass(config.get("userJarMainAppClass").toString());
}
if(config.containsKey("userJarParas")){
appConfig.setUserJarParas(config.get("userJarParas").toString().split(" "));
}
gatewayConfig.setAppConfig(appConfig);
}
if(config.containsKey("flinkConfig")){
gatewayConfig.setFlinkConfig(FlinkConfig.build((Map<String, String>)config.get("flinkConfig")));
Expand Down
Loading

0 comments on commit 524ffda

Please sign in to comment.