diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java index 86b1a91f7a..1cf9a6b4b1 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java @@ -44,9 +44,6 @@ import org.apache.linkis.entrance.scheduler.EntranceGroupFactory; import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager; import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext; -import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder; -import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$; -import org.apache.linkis.orchestrator.ecm.entity.Policy; import org.apache.linkis.scheduler.Scheduler; import org.apache.linkis.scheduler.SchedulerContext; import org.apache.linkis.scheduler.executer.ExecutorManager; @@ -204,9 +201,7 @@ public SchedulerContext schedulerContext( @Bean @ConditionalOnMissingBean public ExecutorManager executorManager(GroupFactory groupFactory) { - EngineConnManagerBuilder engineConnManagerBuilder = EngineConnManagerBuilder$.MODULE$.builder(); - engineConnManagerBuilder.setPolicy(Policy.Process); - return new EntranceExecutorManagerImpl(groupFactory, engineConnManagerBuilder.build()); + return new EntranceExecutorManagerImpl(groupFactory); } @Bean diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 6f634bd76b..8e667588fb 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -285,4 +285,7 @@ object EntranceConfiguration { val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED = CommonVars("linkis.entrance.failover.running.kill.enable", false) + val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR = + CommonVars("linkis.entrance.skip.orchestrator", false).getValue + } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala index 266de6eb5b..24c697c4ce 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala @@ -261,6 +261,15 @@ class DefaultEntranceExecutor(id: Long) true } + def getRunningOrchestrationFuture: Option[OrchestrationFuture] = { + val asyncReturn = getEngineExecuteAsyncReturn + if (asyncReturn.isDefined) { + asyncReturn.get.getOrchestrationFuture() + } else { + None + } + } + override protected def callExecute(request: ExecuteRequest): ExecuteResponse = { val entranceExecuteRequest: EntranceExecuteRequest = request match { diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala index be7fb13871..d20b5ea8fb 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutor.scala @@ -118,14 +118,6 @@ abstract class EntranceExecutor(val id: Long) extends Executor with Logging { super.hashCode() } - def getRunningOrchestrationFuture: Option[OrchestrationFuture] = { - if (null != engineReturn) { - engineReturn.getOrchestrationFuture() - } else { - None - } - } - } class EngineExecuteAsyncReturn( diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala index 05bc5311b0..4e7ca79367 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceExecutorManager.scala @@ -20,8 +20,10 @@ package org.apache.linkis.entrance.execute import org.apache.linkis.common.exception.WarnException import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._ import org.apache.linkis.entrance.exception.EntranceErrorException +import org.apache.linkis.entrance.execute.simple.{SimpleEntranceExecutor, SimpleExecuteBusContext} import org.apache.linkis.governance.common.entity.job.JobRequest import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager} import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent} @@ -91,6 +93,14 @@ abstract class EntranceExecutorManager(groupFactory: GroupFactory) case jobReq: JobRequest => val entranceEntranceExecutor = new DefaultEntranceExecutor(jobReq.getId) + if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) { + new SimpleEntranceExecutor( + jobReq.getId, + SimpleExecuteBusContext.getOrchestratorListenerBusContext() + ) + } else { + new DefaultEntranceExecutor(jobReq.getId) + } // getEngineConn Executor job.getLogListener.foreach( _.onLogUpdate( diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala index 3efcf41c89..a251c56de7 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/impl/EntranceExecutorManagerImpl.scala @@ -22,7 +22,7 @@ import org.apache.linkis.orchestrator.ecm.EngineConnManager import org.apache.linkis.scheduler.listener.ExecutorListener import org.apache.linkis.scheduler.queue.GroupFactory -class EntranceExecutorManagerImpl(groupFactory: GroupFactory, engineConnManager: EngineConnManager) +class EntranceExecutorManagerImpl(groupFactory: GroupFactory) extends EntranceExecutorManager(groupFactory) { override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] = Array( diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala new file mode 100644 index 0000000000..6f2798a52c --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/EngineLockListener.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.execute.simple +import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext + +object SimpleExecuteBusContext { + + private lazy val orchestratorListenerBusContext = OrchestratorListenerBusContext.createBusContext + + def getOrchestratorListenerBusContext(): OrchestratorListenerBusContext = + orchestratorListenerBusContext + +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala new file mode 100644 index 0000000000..bc52fbd800 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleASyncListener.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.execute.simple + +import org.apache.linkis.common.listener.Event +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.entrance.EntranceServer +import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.orchestrator.listener.OrchestratorAsyncEvent +import org.apache.linkis.orchestrator.listener.task.{ + TaskLogEvent, + TaskLogListener, + TaskProgressListener, + TaskRunningInfoEvent +} + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component + +import javax.annotation.PostConstruct + +@Component +class SimpleASyncListener extends TaskLogListener with TaskProgressListener with Logging { + + @Autowired private var entranceServer: EntranceServer = _ + + @PostConstruct + def init(): Unit = { + if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) { + SimpleExecuteBusContext + .getOrchestratorListenerBusContext() + .getOrchestratorAsyncListenerBus + .addListener(this) + } + } + + override def onLogUpdate(taskLogEvent: TaskLogEvent): Unit = {} + + override def onProgressOn(taskProgressEvent: TaskRunningInfoEvent): Unit = {} + + override def onEvent(event: OrchestratorAsyncEvent): Unit = {} + + override def onEventError(event: Event, t: Throwable): Unit = {} +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala new file mode 100644 index 0000000000..d9e18081d2 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleEntranceExecutor.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.execute.simple + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException} +import org.apache.linkis.entrance.execute.{EngineExecuteAsyncReturn, EntranceExecutor} +import org.apache.linkis.entrance.job.EntranceExecuteRequest +import org.apache.linkis.governance.common.utils.LoggerUtils +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel +import org.apache.linkis.manager.label.utils.LabelUtil +import org.apache.linkis.orchestrator.code.plans.ast.CodeJob +import org.apache.linkis.orchestrator.code.plans.logical.CodeLogicalUnitTaskDesc +import org.apache.linkis.orchestrator.computation.entity.ComputationJobReq +import org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask +import org.apache.linkis.orchestrator.converter.ASTContextImpl +import org.apache.linkis.orchestrator.execution.{ + AsyncTaskResponse, + FailedTaskResponse, + SucceedTaskResponse +} +import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext +import org.apache.linkis.orchestrator.plans.physical.{ExecTask, PhysicalContextImpl} +import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit +import org.apache.linkis.scheduler.executer._ + +import java.util + +class SimpleEntranceExecutor( + id: Long, + orchestratorListenerBusContext: OrchestratorListenerBusContext +) extends EntranceExecutor(id) + with SingleTaskOperateSupport + with Logging { + + private var codeUnitExecTask: CodeLogicalUnitExecTask = null + + override protected def callExecute(request: ExecuteRequest): ExecuteResponse = { + val entranceExecuteRequest: EntranceExecuteRequest = request match { + case request: EntranceExecuteRequest => + request + case _ => + throw new EntranceErrorException( + EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode, + s"Invalid entranceExecuteRequest : ${request.code}" + ) + } + // 1. create JobReq + val computationJobReq = requestToComputationJobReq(entranceExecuteRequest) + // 2. create code job + val codeJob = new CodeJob(null, null) + val astContext = ASTContextImpl.newBuilder().setJobReq(computationJobReq).build() + codeJob.setAstContext(astContext) + codeJob.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit) + codeJob.setParams(computationJobReq.getParams) + codeJob.setName(computationJobReq.getName + "_Job") + codeJob.setSubmitUser(computationJobReq.getSubmitUser) + codeJob.setExecuteUser(computationJobReq.getExecuteUser) + codeJob.setLabels(computationJobReq.getLabels) + codeJob.setPriority(computationJobReq.getPriority) + codeUnitExecTask = new CodeLogicalUnitExecTask(Array[ExecTask](), Array[ExecTask]()) + // set job id, can find by getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job]) + codeUnitExecTask.setId(entranceExecuteRequest.getJob.getId) + // 3.set code unit + codeUnitExecTask.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit) + codeUnitExecTask.setTaskDesc(CodeLogicalUnitTaskDesc(codeJob)) + // 4. set context + val context = new PhysicalContextImpl(codeUnitExecTask, Array.empty) + context.setSyncBus(orchestratorListenerBusContext.getOrchestratorSyncListenerBus) + context.setAsyncBus(orchestratorListenerBusContext.getOrchestratorAsyncListenerBus) + // 5. execute + val response = codeUnitExecTask.execute() + response match { + case async: AsyncTaskResponse => + new EngineExecuteAsyncReturn(request, null) + case succeed: SucceedTaskResponse => + logger.info(s"Succeed to execute ExecTask(${getId})") + SuccessExecuteResponse() + case failedTaskResponse: FailedTaskResponse => + logger.info(s"Failed to execute ExecTask(${getId})") + ErrorExecuteResponse(failedTaskResponse.getErrorMsg, failedTaskResponse.getCause) + case _ => + logger.warn(s"ExecTask(${getId}) need to retry") + ErrorExecuteResponse("unknown response: " + response, null) + } + } + + def requestToComputationJobReq( + entranceExecuteRequest: EntranceExecuteRequest + ): ComputationJobReq = { + val jobReqBuilder = ComputationJobReq.newBuilder() + jobReqBuilder.setId(entranceExecuteRequest.jobId()) + jobReqBuilder.setSubmitUser(entranceExecuteRequest.submitUser()) + jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser()) + val codeTypeLabel: Label[_] = LabelUtil.getCodeTypeLabel(entranceExecuteRequest.getLabels) + if (null == codeTypeLabel) { + throw new EntranceErrorException( + EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode, + s"code Type Label is needed" + ) + } + val codes = new util.ArrayList[String]() + codes.add(entranceExecuteRequest.code()) + val codeLogicalUnit = + new CodeLogicalUnit(codes, codeTypeLabel.asInstanceOf[CodeLanguageLabel]) + jobReqBuilder.setCodeLogicalUnit(codeLogicalUnit) + jobReqBuilder.setLabels(entranceExecuteRequest.getLabels) + jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser()) + jobReqBuilder.setParams(entranceExecuteRequest.properties()) + jobReqBuilder.build().asInstanceOf[ComputationJobReq] + } + + override def kill(): Boolean = { + LoggerUtils.setJobIdMDC(getId.toString) + logger.info("Entrance start to kill job {} invoke Orchestrator ", this.getId) + Utils.tryAndWarn { + if (null != codeUnitExecTask) { + codeUnitExecTask.kill() + } + } + LoggerUtils.removeJobIdMDC() + true + } + + override def pause(): Boolean = { + true + } + + override def resume(): Boolean = { + true + } + + override def close(): Unit = { + getEngineExecuteAsyncReturn.foreach { e => + e.notifyError(s"$toString has already been completed with state $state.") + } + } + +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala new file mode 100644 index 0000000000..46107ff701 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/simple/SimpleSyncListener.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.execute.simple + +import org.apache.linkis.common.listener.Event +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.entrance.EntranceServer +import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.orchestrator.listener.OrchestratorSyncEvent +import org.apache.linkis.orchestrator.listener.task.{ + TaskErrorResponseEvent, + TaskResultSetEvent, + TaskResultSetListener, + TaskResultSetSizeEvent, + TaskStatusEvent, + TaskStatusListener +} + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component + +import javax.annotation.PostConstruct + +/** + * 1.TaskLogListener: job.getLogListener.foreach(_.onLogUpdate(job, logEvent.log)) + * + * 2.TaskProgressListener: entranceJob.getProgressListener.foreach( _.onProgressUpdate(entranceJob, + * progressInfoEvent.progress, entranceJob.getProgressInfo) + * + * 3.TaskResultSetListener entranceContext.getOrCreatePersistenceManager().onResultSizeCreated(j, + * taskResultSize.resultSize) .getOrCreatePersistenceManager() .onResultSetCreated( + * entranceExecuteRequest.getJob, AliasOutputExecuteResponse(firstResultSet.alias, + * firstResultSet.result) ) + * + * 4. TaskStatusListener getEngineExecuteAsyncReturn.foreach { jobReturn => jobReturn.notifyStatus( + * ResponseTaskStatus(entranceExecuteRequest.getJob.getId, ExecutionNodeStatus.Succeed) ) } val msg + * = failedResponse.getErrorCode + ", " + failedResponse.getErrorMsg + * getEngineExecuteAsyncReturn.foreach { jobReturn => jobReturn.notifyError(msg, + * failedResponse.getCause) jobReturn.notifyStatus( + * ResponseTaskStatus(entranceExecuteRequest.getJob.getId, ExecutionNodeStatus.Failed) ) } + */ +@Component +class SimpleSyncListener extends TaskStatusListener with TaskResultSetListener with Logging { + + @Autowired private var entranceServer: EntranceServer = _ + + @PostConstruct + def init(): Unit = { + if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) { + SimpleExecuteBusContext + .getOrchestratorListenerBusContext() + .getOrchestratorSyncListenerBus + .addListener(this) + } + } + + override def onStatusUpdate(taskStatusEvent: TaskStatusEvent): Unit = {} + + override def onTaskErrorResponseEvent(taskErrorResponseEvent: TaskErrorResponseEvent): Unit = {} + + override def onResultSetCreate(taskResultSetEvent: TaskResultSetEvent): Unit = {} + + override def onResultSizeCreated(taskResultSetSizeEvent: TaskResultSetSizeEvent): Unit = {} + + override def onSyncEvent(event: OrchestratorSyncEvent): Unit = {} + + override def onEventError(event: Event, t: Throwable): Unit = {} +} diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala index 82e75c0bd7..0b915ab768 100644 --- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala +++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala @@ -226,6 +226,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask id } + def setId(id: String): Unit = { + this.id = id + } + override def getPhysicalContext: PhysicalContext = physicalContext def getCodeLogicalUnit: CodeLogicalUnit = this.codeLogicalUnit