Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object LinkisStorageConf {

val FILE_TYPE = CommonVars(
"wds.linkis.storage.file.type",
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt"
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt,aisql"
).getValue

private var fileTypeArr: Array[String] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,32 +130,72 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress
@Override
public boolean onJobFailed(
Job job, String code, Map<String, Object> props, int errorCode, String errorDesc) {
if (!EntranceConfiguration.TASK_RETRY_ENABLED()) {
return false;
}

if (!(job instanceof EntranceJob)) {
return false;
}

boolean containsAny = false;
String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC();
String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES();
for (String keyword : errorDescArray.split(",")) {
if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) {
containsAny = true;
break;
}
}

if (!containsAny) {
return false;
}

AtomicBoolean canRetry = new AtomicBoolean(false);
String aiSqlKey = EntranceConfiguration.AI_SQL_KEY().key();
String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key();
String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES();

Map<String, Object> startupMap = TaskUtils.getStartupMap(props);
final EntranceJob entranceJob = (EntranceJob) job;

// 处理广播表
String dataFrameKey = "dataFrame to local exception";
if (errorDesc.contains(dataFrameKey)) {
entranceJob
.getJobRequest()
.setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code);
}

Map<String, Object> startupMap = TaskUtils.getStartupMap(props);
// 只对 aiSql 做重试
if ("true".equals(startupMap.get(aiSqlKey))) {
LinkisUtils.tryAndWarn(
() -> {
int retryNum = (int) startupMap.getOrDefault(retryNumKey, 1);
boolean canRetryCode = canRetryCode(code);
if (retryNum > 0
&& errorCodeArray.contains(String.valueOf(errorCode))
&& canRetryCode) {
if (retryNum > 0 && canRetryCode) {
logger.info(
"mark task: {} status to WaitForRetry, current retryNum: {}, for errorCode: {}, errorDesc: {}",
job.getId(),
entranceJob.getJobInfo().getId(),
retryNum,
errorCode,
errorDesc);
// 重试
job.transitionWaitForRetry();

// 修改错误码和错误描述
entranceJob.getJobRequest().setErrorCode(20503);
entranceJob.getJobRequest().setErrorDesc("资源紧张,当前任务正在进行智能重试");
canRetry.set(true);
startupMap.put(retryNumKey, retryNum - 1);
// once 引擎
if ((boolean) EntranceConfiguration.AI_SQL_RETRY_ONCE().getValue()) {
startupMap.put("executeOnce", true);
}
TaskUtils.addStartupMap(props, startupMap);
logger.info("task {} set retry status success.", entranceJob.getJobInfo().getId());
} else {
logger.info("task {} not support retry.", entranceJob.getJobInfo().getId());
}
},
logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,14 @@ object EntranceConfiguration {
"01002,01003,13005,13006,13012"
).getValue

val AI_SQL_TEST_MODE: Boolean =
CommonVars[Boolean]("linkis.entrance.aisql.test.mode", true).getValue
val SUPPORTED_RETRY_ERROR_DESC =
CommonVars(
"linkis.entrance.supported.retry.error.desc",
"Spark application has already stopped,Spark application sc has already stopped,Failed to allocate a page,dataFrame to local exception"
).getValue

val TASK_RETRY_ENABLED: Boolean =
CommonVars[Boolean]("linkis.task.retry.enabled", true).getValue

val AI_SQL_ENABLED: Boolean =
CommonVars[Boolean]("linkis.ai.sql.enabled", true).getValue
Expand All @@ -313,18 +319,24 @@ object EntranceConfiguration {
val RETRY_NUM_KEY: CommonVars[Int] =
CommonVars[Int]("linkis.ai.retry.num", 1)

val AI_SQL_RETRY_ONCE: CommonVars[Boolean] =
CommonVars[Boolean]("linkis.ai.sql.once.enable", true)

val SPARK_SHUFFLE_SERVICE_ENABLED: Boolean =
CommonVars[Boolean]("linkis.spark.shuffle.service.enabled", true).getValue

val SPARK_EXECUTOR_CORES: Int =
CommonVars[Int]("spark.executor.cores", 5).getValue
CommonVars[Int]("spark.executor.cores", 4).getValue

val SPARK_EXECUTOR_MEMORY: String =
CommonVars[String]("spark.executor.memory", "20G").getValue
CommonVars[String]("spark.executor.memory", "17G").getValue

val SPARK_EXECUTOR_INSTANCES: Int =
CommonVars[Int]("spark.executor.instances", 1).getValue

val SPARK_EXECUTOR_MEMORY_OVERHEAD: String =
CommonVars[String]("spark.yarn.executor.memoryOverhead", "3G").getValue

val SPARK_DYNAMIC_ALLOCATION_ENABLED: Boolean =
CommonVars[Boolean]("spark.dynamicAllocation.enabled", true).getValue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging {

val engineTypeLabel: EngineTypeLabel = engineTypeLabelOpt.get.asInstanceOf[EngineTypeLabel]
// aiSql change to spark
var currentEngineType: String = engineTypeLabel.toString
var currentEngineType: String = engineTypeLabel.getStringValue
if (
aiSqlEnable && sqlLanguage
.equals(codeType) && supportAISQLCreator.contains(creator.toLowerCase())
Expand All @@ -73,7 +73,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging {
LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineTypeLabel])
newEngineTypeLabel.setEngineType(sparkEngineType.split("-")(0))
newEngineTypeLabel.setVersion(sparkEngineType.split("-")(1))
newEngineTypeLabel.setStringValue(sparkEngineType)
// newEngineTypeLabel.setStringValue(sparkEngineType)
labels.add(newEngineTypeLabel)
startMap.put(AI_SQL_KEY.key, AI_SQL_KEY.getValue.asInstanceOf[AnyRef])
startMap.put(RETRY_NUM_KEY.key, RETRY_NUM_KEY.getValue.asInstanceOf[AnyRef])
Expand All @@ -84,7 +84,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging {

}
// 开启 spark 动态资源规划, spark3.4.4
if (sparkEngineType.equals(currentEngineType)) {
if (sparkEngineType.equals(currentEngineType) && SPARK_DYNAMIC_ALLOCATION_ENABLED) {
logger.info("spark3 add dynamic resource.")

// add spark dynamic resource planning
Expand All @@ -107,6 +107,10 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging {
startMap.put("spark.executor.cores", SPARK_EXECUTOR_CORES.asInstanceOf[AnyRef])
startMap.put("spark.executor.memory", SPARK_EXECUTOR_MEMORY.asInstanceOf[AnyRef])
startMap.put("spark.executor.instances", SPARK_EXECUTOR_INSTANCES.asInstanceOf[AnyRef])
startMap.put(
"spark.yarn.executor.memoryOverhead",
SPARK_EXECUTOR_MEMORY_OVERHEAD.asInstanceOf[AnyRef]
)

Utils.tryAndWarn {
val extraConfs: String = SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS
Expand Down
Loading