From 5fa63c5522acd50ef5af15e46085a41e630c1605 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Tue, 11 Mar 2025 15:09:52 +0800 Subject: [PATCH 1/4] code optimization --- .../persistence/QueryPersistenceManager.java | 47 ++++++++++++++++--- .../entrance/conf/EntranceConfiguration.scala | 9 ++++ .../impl/AISQLTransformInterceptor.scala | 4 +- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index 265d4c3a21..b4290c3cd9 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -130,32 +130,67 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress @Override public boolean onJobFailed( Job job, String code, Map props, int errorCode, String errorDesc) { + if (!(job instanceof EntranceJob)) { + return false; + } + + boolean containsAny = false; + String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES(); + for (String keyword : errorDescArray.split(",")) { + if (errorDesc.contains(keyword.trim())) { + containsAny = true; + break; + } + } + + if (!containsAny) { + return false; + } + AtomicBoolean canRetry = new AtomicBoolean(false); String aiSqlKey = EntranceConfiguration.AI_SQL_KEY().key(); String retryNumKey = EntranceConfiguration.RETRY_NUM_KEY().key(); - String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES(); - Map startupMap = TaskUtils.getStartupMap(props); + final EntranceJob entranceJob = (EntranceJob) job; + + // 处理广播表 + String dataFrameKey = "dataFrame to local exception"; + if (errorDesc.contains(dataFrameKey)) { + entranceJob + .getJobRequest() + .setExecutionCode("set spark.sql.autoBroadcastJoinThreshold=-1; " + code); + } + Map startupMap = TaskUtils.getStartupMap(props); // 只对 aiSql 做重试 if ("true".equals(startupMap.get(aiSqlKey))) { LinkisUtils.tryAndWarn( () -> { int retryNum = (int) startupMap.getOrDefault(retryNumKey, 1); boolean canRetryCode = canRetryCode(code); - if (retryNum > 0 - && errorCodeArray.contains(String.valueOf(errorCode)) - && canRetryCode) { + if (retryNum > 0 && canRetryCode) { logger.info( "mark task: {} status to WaitForRetry, current retryNum: {}, for errorCode: {}, errorDesc: {}", - job.getId(), + entranceJob.getJobInfo().getId(), retryNum, errorCode, errorDesc); + // 重试 job.transitionWaitForRetry(); + + // 修改错误码和错误描述 + entranceJob.getJobRequest().setErrorCode(20503); + entranceJob.getJobRequest().setErrorDesc("资源紧张,当前任务正在进行智能重试"); canRetry.set(true); startupMap.put(retryNumKey, retryNum - 1); + // once 引擎 + if ((boolean) EntranceConfiguration.AI_SQL_RETRY_ONCE().getValue()) { + startupMap.put("executeOnce", true); + } TaskUtils.addStartupMap(props, startupMap); + logger.info("task {} set retry status success.", entranceJob.getJobInfo().getId()); + } else { + logger.info("task {} not support retry.", entranceJob.getJobInfo().getId()); } }, logger); diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 35caa4ba0e..123b307db8 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -295,6 +295,12 @@ object EntranceConfiguration { "01002,01003,13005,13006,13012" ).getValue + val SUPPORTED_RETRY_ERROR_DESC = + CommonVars( + "linkis.entrance.supported.retry.error.desc", + "Spark application has already stopped,Spark application sc has already stopped,Failed to allocate a page,dataFrame to local exception" + ).getValue + val AI_SQL_TEST_MODE: Boolean = CommonVars[Boolean]("linkis.entrance.aisql.test.mode", true).getValue @@ -313,6 +319,9 @@ object EntranceConfiguration { val RETRY_NUM_KEY: CommonVars[Int] = CommonVars[Int]("linkis.ai.retry.num", 1) + val AI_SQL_RETRY_ONCE: CommonVars[Boolean] = + CommonVars[Boolean]("linkis.ai.sql.once.enable", true) + val SPARK_SHUFFLE_SERVICE_ENABLED: Boolean = CommonVars[Boolean]("linkis.spark.shuffle.service.enabled", true).getValue diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala index c504635065..966b8daf1a 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala @@ -58,7 +58,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging { val engineTypeLabel: EngineTypeLabel = engineTypeLabelOpt.get.asInstanceOf[EngineTypeLabel] // aiSql change to spark - var currentEngineType: String = engineTypeLabel.toString + var currentEngineType: String = engineTypeLabel.getStringValue if ( aiSqlEnable && sqlLanguage .equals(codeType) && supportAISQLCreator.contains(creator.toLowerCase()) @@ -73,7 +73,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging { LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[EngineTypeLabel]) newEngineTypeLabel.setEngineType(sparkEngineType.split("-")(0)) newEngineTypeLabel.setVersion(sparkEngineType.split("-")(1)) - newEngineTypeLabel.setStringValue(sparkEngineType) + // newEngineTypeLabel.setStringValue(sparkEngineType) labels.add(newEngineTypeLabel) startMap.put(AI_SQL_KEY.key, AI_SQL_KEY.getValue.asInstanceOf[AnyRef]) startMap.put(RETRY_NUM_KEY.key, RETRY_NUM_KEY.getValue.asInstanceOf[AnyRef]) From 3f5b8f4f90529f4ba53c6a6d0934580f3e38288e Mon Sep 17 00:00:00 2001 From: aiceflower Date: Tue, 11 Mar 2025 15:27:18 +0800 Subject: [PATCH 2/4] code optimization --- .../linkis/entrance/persistence/QueryPersistenceManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index b4290c3cd9..6bdc359f69 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -135,9 +135,10 @@ public boolean onJobFailed( } boolean containsAny = false; - String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES(); + String errorDescArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_DESC(); + String errorCodeArray = EntranceConfiguration.SUPPORTED_RETRY_ERROR_CODES(); for (String keyword : errorDescArray.split(",")) { - if (errorDesc.contains(keyword.trim())) { + if (errorDesc.contains(keyword.trim()) || errorCodeArray.contains(errorCode + "")) { containsAny = true; break; } From 7d85b4366d6e2fb721f85436b3d9e20eec04b9be Mon Sep 17 00:00:00 2001 From: aiceflower Date: Tue, 11 Mar 2025 16:28:03 +0800 Subject: [PATCH 3/4] add aisql type --- .../org/apache/linkis/storage/conf/LinkisStorageConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala index 4c66907318..a670c8394d 100644 --- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala +++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/conf/LinkisStorageConf.scala @@ -48,7 +48,7 @@ object LinkisStorageConf { val FILE_TYPE = CommonVars( "wds.linkis.storage.file.type", - "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt" + "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql,txt,aisql" ).getValue private var fileTypeArr: Array[String] = null From 5c20c1d0e561252a7b3d536a6051a771fd2aa541 Mon Sep 17 00:00:00 2001 From: aiceflower Date: Tue, 11 Mar 2025 20:27:40 +0800 Subject: [PATCH 4/4] code optimization --- .../entrance/persistence/QueryPersistenceManager.java | 4 ++++ .../linkis/entrance/conf/EntranceConfiguration.scala | 11 +++++++---- .../interceptor/impl/AISQLTransformInterceptor.scala | 6 +++++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index 6bdc359f69..b2e1255d71 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -130,6 +130,10 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress @Override public boolean onJobFailed( Job job, String code, Map props, int errorCode, String errorDesc) { + if (!EntranceConfiguration.TASK_RETRY_ENABLED()) { + return false; + } + if (!(job instanceof EntranceJob)) { return false; } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 123b307db8..c9612b7b8f 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -301,8 +301,8 @@ object EntranceConfiguration { "Spark application has already stopped,Spark application sc has already stopped,Failed to allocate a page,dataFrame to local exception" ).getValue - val AI_SQL_TEST_MODE: Boolean = - CommonVars[Boolean]("linkis.entrance.aisql.test.mode", true).getValue + val TASK_RETRY_ENABLED: Boolean = + CommonVars[Boolean]("linkis.task.retry.enabled", true).getValue val AI_SQL_ENABLED: Boolean = CommonVars[Boolean]("linkis.ai.sql.enabled", true).getValue @@ -326,14 +326,17 @@ object EntranceConfiguration { CommonVars[Boolean]("linkis.spark.shuffle.service.enabled", true).getValue val SPARK_EXECUTOR_CORES: Int = - CommonVars[Int]("spark.executor.cores", 5).getValue + CommonVars[Int]("spark.executor.cores", 4).getValue val SPARK_EXECUTOR_MEMORY: String = - CommonVars[String]("spark.executor.memory", "20G").getValue + CommonVars[String]("spark.executor.memory", "17G").getValue val SPARK_EXECUTOR_INSTANCES: Int = CommonVars[Int]("spark.executor.instances", 1).getValue + val SPARK_EXECUTOR_MEMORY_OVERHEAD: String = + CommonVars[String]("spark.yarn.executor.memoryOverhead", "3G").getValue + val SPARK_DYNAMIC_ALLOCATION_ENABLED: Boolean = CommonVars[Boolean]("spark.dynamicAllocation.enabled", true).getValue diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala index 966b8daf1a..9ed0e036f8 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/AISQLTransformInterceptor.scala @@ -84,7 +84,7 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging { } // 开启 spark 动态资源规划, spark3.4.4 - if (sparkEngineType.equals(currentEngineType)) { + if (sparkEngineType.equals(currentEngineType) && SPARK_DYNAMIC_ALLOCATION_ENABLED) { logger.info("spark3 add dynamic resource.") // add spark dynamic resource planning @@ -107,6 +107,10 @@ class AISQLTransformInterceptor extends EntranceInterceptor with Logging { startMap.put("spark.executor.cores", SPARK_EXECUTOR_CORES.asInstanceOf[AnyRef]) startMap.put("spark.executor.memory", SPARK_EXECUTOR_MEMORY.asInstanceOf[AnyRef]) startMap.put("spark.executor.instances", SPARK_EXECUTOR_INSTANCES.asInstanceOf[AnyRef]) + startMap.put( + "spark.yarn.executor.memoryOverhead", + SPARK_EXECUTOR_MEMORY_OVERHEAD.asInstanceOf[AnyRef] + ) Utils.tryAndWarn { val extraConfs: String = SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS