Skip to content

Commit

Permalink
[Feature-14251][Task] Support yarn queue definition in yarn task (#14310
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 0b69236)
  • Loading branch information
Radeity authored and zhongjiajie committed Jul 20, 2023
1 parent 5ce72b3 commit a99596a
Show file tree
Hide file tree
Showing 37 changed files with 115 additions and 73 deletions.
1 change: 1 addition & 0 deletions docs/docs/en/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Flink task type, used to execute Flink programs. For Flink nodes:
| TaskManager memory size | Used to set the size of taskManager memories, which can be set according to the actual production environment. |
| Number of TaskManager | Used to set the number of taskManagers, which can be set according to the actual production environment. |
| Parallelism | Used to set the degree of parallelism for executing Flink tasks. |
| Yarn queue | Used to set the yarn queue, use `default` queue by default. |
| Main program parameters | Set the input parameters for the Flink program and support the substitution of custom parameter variables. |
| Optional parameters | Support `--jar`, `--files`,` --archives`, `--conf` format. |
| Custom parameter | It is a local user-defined parameter for Flink, and will replace the content with `${variable}` in the script. |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/en/guide/task/map-reduce.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ MapReduce(MR) task type used for executing MapReduce programs. For MapReduce nod
| The class of the main function | The **full path** of Main Class, the entry point of the MapReduce program. |
| Main jar package | The jar package of the MapReduce program. |
| Task name | MapReduce task name. |
| Yarn queue | Used to set the yarn queue, use `default` queue by default. |
| Command line parameters | Set the input parameters of the MapReduce program and support the substitution of custom parameter variables. |
| Other parameters | Support `-D`, `-files`, `-libjars`, `-archives` format. |
| User-defined parameter | It is a local user-defined parameter for MapReduce, and will replace the content with `${variable}` in the script. |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/en/guide/task/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Spark task type for executing Spark application. When executing the Spark task,
| Driver memory size | Set the size of Driver memories, which can be set according to the actual production environment. |
| Number of Executor | Set the number of Executor, which can be set according to the actual production environment. |
| Executor memory size | Set the size of Executor memories, which can be set according to the actual production environment. |
| Yarn queue | Set the yarn queue, use `default` queue by default. |
| Main program parameters | Set the input parameters of the Spark program and support the substitution of custom parameter variables. |
| Optional parameters | Support `--jars`, `--files`,` --archives`, `--conf` format. |
| Resource | Appoint resource files in the `Resource` if parameters refer to them. |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/zh/guide/task/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
| taskManager 内存数 | 用于设置 taskManager 内存数,可根据实际生产环境设置对应的内存数 |
| taskManager 数量 | 用于设置 taskManager 的数量,可根据实际生产环境设置对应的数量 |
| 并行度 | 用于设置执行 Flink 任务的并行度 |
| Yarn 队列 | 用于设置 Yarn 队列,默认使用 default 队列 |
| 主程序参数 | 设置 Flink 程序的输入参数,支持自定义参数变量的替换 |
| 选项参数 | 支持 `--jar``--files``--archives``--conf` 格式 |
| 自定义参数 | 是 Flink 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/zh/guide/task/map-reduce.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ MapReduce(MR) 任务类型,用于执行 MapReduce 程序。对于 MapReduce
| 主函数的 Class | 是 MapReduce 程序的入口 Main Class 的**全路径** |
| 主程序包 | 执行 MapReduce 程序的 jar 包 |
| 任务名称(选填) | MapReduce 任务名称 |
| Yarn 队列 | 设置 Yarn 队列,默认使用 default |
| 命令行参数 | 是设置 MapReduce 程序的输入参数,支持自定义参数变量的替换 |
| 其他参数 | 支持 –D、-files、-libjars、-archives 格式 |
| 自定义参数 | 是 MapReduce 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容 |
Expand Down
1 change: 1 addition & 0 deletions docs/docs/zh/guide/task/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,worker 支
- Driver 内存数:用于设置 Driver 内存数,可根据实际生产环境设置对应的内存数。
- Executor 数量:用于设置 Executor 的数量,可根据实际生产环境设置对应的内存数。
- Executor 内存数:用于设置 Executor 内存数,可根据实际生产环境设置对应的内存数。
- Yarn 队列:用于设置 Yarn 队列,默认使用 default 队列。
- 主程序参数:设置 Spark 程序的输入参数,支持自定义参数变量的替换。
- 选项参数:支持 `--jars``--files``--archives``--conf` 格式。
- 资源:如果其他参数中引用了资源文件,需要在资源中选择指定。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstan
taskExecutionContext.setExecutorId(processInstance.getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
taskExecutionContext.setTenantCode(processInstance.getTenantCode());
taskExecutionContext.setQueue(processInstance.getQueue());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ public class TaskExecutionContext implements Serializable {
*/
private String tenantCode;

/**
* task queue
*/
private String queue;

/**
* process define id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters {
/**
* The YARN queue to submit to
*/
private String queue;
private String yarnQueue;

/**
* other arguments
Expand Down Expand Up @@ -180,12 +180,12 @@ public void setAppName(String appName) {
this.appName = appName;
}

public String getQueue() {
return queue;
public String getYarnQueue() {
return yarnQueue;
}

public void setQueue(String queue) {
this.queue = queue;
public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}

public String getOthers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ public void init() {
StringEscapeUtils.escapeJava(JSONUtils.toJsonString(dataQualityConfiguration)))
+ "\"");

dataQualityParameters
.getSparkParameters()
.setQueue(dqTaskExecutionContext.getQueue());

setMainJarName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static List<String> buildArgs(SparkParameters param) {
String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
String queue = param.getQueue();
String queue = param.getYarnQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(SparkConstants.SPARK_QUEUE);
args.add(queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void init() {
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());

FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ public static List<String> buildInitOptionsForSql(FlinkParameters flinkParameter
}

// yarn.application.queue
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
String yarnQueue = flinkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, yarnQueue));
}
}

Expand Down Expand Up @@ -306,26 +306,26 @@ private static void determinedYarnQueue(List<String> args, FlinkParameters flink
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
} else {
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_MODE);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
}
case APPLICATION:
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
doAddQueue(args, flinkParameters, FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS);
}
}

private static void doAddQueue(List<String> args, FlinkParameters flinkParameters, String option) {
String others = flinkParameters.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(option)) {
String queue = flinkParameters.getQueue();
if (StringUtils.isNotEmpty(queue)) {
String yarnQueue = flinkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
switch (option) {
case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
case FlinkConstants.FLINK_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
args.add(queue);
case FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS:
args.add(String.format(FlinkConstants.FLINK_YARN_QUEUE_FOR_TARGETS + "=%s", yarnQueue));
case FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE:
args.add(FlinkConstants.FLINK_YARN_QUEUE_FOR_MODE);
args.add(yarnQueue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ private FlinkConstants() {
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_YARN_QUEUE_FOR_MODE = "-yqu";
public static final String FLINK_YARN_QUEUE_FOR_TARGETS = "-Dyarn.application.queue";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class FlinkParameters extends AbstractParameters {
/**
* The YARN queue to submit to
*/
private String queue;
private String yarnQueue;

/**
* other arguments
Expand Down Expand Up @@ -194,12 +194,12 @@ public void setTaskManagerMemory(String taskManagerMemory) {
this.taskManagerMemory = taskManagerMemory;
}

public String getQueue() {
return queue;
public String getYarnQueue() {
return yarnQueue;
}

public void setQueue(String queue) {
this.queue = queue;
public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}

public List<ResourceInfo> getResourceList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void init() {
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());

FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.D;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_YARN_QUEUE;

import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
Expand Down Expand Up @@ -67,10 +67,10 @@ public static List<String> buildArgs(MapReduceParameters param, TaskExecutionCon
}

String others = param.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(MR_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(String.format("%s%s=%s", D, MR_QUEUE, queue));
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
String yarnQueue = param.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class MapReduceParameters extends AbstractParameters {
private String appName;

/**
* queue
* The YARN queue to submit to
*/
private String queue;
private String yarnQueue;

/**
* resource list
Expand Down Expand Up @@ -101,12 +101,12 @@ public void setAppName(String appName) {
this.appName = appName;
}

public String getQueue() {
return queue;
public String getYarnQueue() {
return yarnQueue;
}

public void setQueue(String queue) {
this.queue = queue;
public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}

public List<ResourceInfo> getResourceList() {
Expand Down Expand Up @@ -152,7 +152,7 @@ public String toString() {
return "mainJar= " + mainJar
+ "mainClass=" + mainClass
+ "mainArgs=" + mainArgs
+ "queue=" + queue
+ "yarnQueue=" + yarnQueue
+ "other mainArgs=" + others;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public void init() {
throw new RuntimeException("mapreduce task params is not valid");
}

mapreduceParameters.setQueue(taskExecutionContext.getQueue());

// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ private MapReduceTaskConstants() {
/**
* -D mapreduce.job.queuename=queuename
*/
public static final String MR_QUEUE = "mapreduce.job.queuename";
public static final String MR_YARN_QUEUE = "mapreduce.job.queuename";

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private SparkConstants() {
/**
* --queue QUEUE
*/
public static final String SPARK_QUEUE = "--queue";
public static final String SPARK_YARN_QUEUE = "--queue";

public static final String DEPLOY_MODE = "--deploy-mode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class SparkParameters extends AbstractParameters {
/**
* The YARN queue to submit to
*/
private String queue;
private String yarnQueue;

/**
* other arguments
Expand Down Expand Up @@ -198,12 +198,12 @@ public void setAppName(String appName) {
this.appName = appName;
}

public String getQueue() {
return queue;
public String getYarnQueue() {
return yarnQueue;
}

public void setQueue(String queue) {
this.queue = queue;
public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}

public String getOthers() {
Expand Down

0 comments on commit a99596a

Please sign in to comment.