Skip to content

Commit

Permalink
[Feature][1.5.0] Linkis orchestrator module supports plugin configura…
Browse files Browse the repository at this point in the history
…tion (apache#5082)
  • Loading branch information
CharlieYan24 committed May 20, 2024
1 parent 43cb5b2 commit b1c1f67
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
import org.apache.linkis.entrance.scheduler.EntranceGroupFactory;
import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$;
import org.apache.linkis.orchestrator.ecm.entity.Policy;
import org.apache.linkis.scheduler.Scheduler;
import org.apache.linkis.scheduler.SchedulerContext;
import org.apache.linkis.scheduler.executer.ExecutorManager;
Expand Down Expand Up @@ -204,9 +201,7 @@ public SchedulerContext schedulerContext(
@Bean
@ConditionalOnMissingBean
public ExecutorManager executorManager(GroupFactory groupFactory) {
EngineConnManagerBuilder engineConnManagerBuilder = EngineConnManagerBuilder$.MODULE$.builder();
engineConnManagerBuilder.setPolicy(Policy.Process);
return new EntranceExecutorManagerImpl(groupFactory, engineConnManagerBuilder.build());
return new EntranceExecutorManagerImpl(groupFactory);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,7 @@ object EntranceConfiguration {
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
CommonVars("linkis.entrance.failover.running.kill.enable", false)

val LINKIS_ENTRANCE_SKIP_ORCHESTRATOR =
CommonVars("linkis.entrance.skip.orchestrator", false).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ class DefaultEntranceExecutor(id: Long)
true
}

def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
val asyncReturn = getEngineExecuteAsyncReturn
if (asyncReturn.isDefined) {
asyncReturn.get.getOrchestrationFuture()
} else {
None
}
}

override protected def callExecute(request: ExecuteRequest): ExecuteResponse = {

val entranceExecuteRequest: EntranceExecuteRequest = request match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,6 @@ abstract class EntranceExecutor(val id: Long) extends Executor with Logging {
super.hashCode()
}

def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
if (null != engineReturn) {
engineReturn.getOrchestrationFuture()
} else {
None
}
}

}

class EngineExecuteAsyncReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.linkis.entrance.execute
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.EntranceErrorException
import org.apache.linkis.entrance.execute.simple.{SimpleEntranceExecutor, SimpleExecuteBusContext}
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.scheduler.executer.{Executor, ExecutorManager}
import org.apache.linkis.scheduler.queue.{GroupFactory, Job, SchedulerEvent}
Expand Down Expand Up @@ -91,6 +93,14 @@ abstract class EntranceExecutorManager(groupFactory: GroupFactory)
case jobReq: JobRequest =>
val entranceEntranceExecutor =
new DefaultEntranceExecutor(jobReq.getId)
if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
new SimpleEntranceExecutor(
jobReq.getId,
SimpleExecuteBusContext.getOrchestratorListenerBusContext()
)
} else {
new DefaultEntranceExecutor(jobReq.getId)
}
// getEngineConn Executor
job.getLogListener.foreach(
_.onLogUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.linkis.orchestrator.ecm.EngineConnManager
import org.apache.linkis.scheduler.listener.ExecutorListener
import org.apache.linkis.scheduler.queue.GroupFactory

class EntranceExecutorManagerImpl(groupFactory: GroupFactory, engineConnManager: EngineConnManager)
class EntranceExecutorManagerImpl(groupFactory: GroupFactory)
extends EntranceExecutorManager(groupFactory) {

override def getOrCreateInterceptors(): Array[ExecuteRequestInterceptor] = Array(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.execute.simple
import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext

object SimpleExecuteBusContext {

private lazy val orchestratorListenerBusContext = OrchestratorListenerBusContext.createBusContext

def getOrchestratorListenerBusContext(): OrchestratorListenerBusContext =
orchestratorListenerBusContext

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.execute.simple

import org.apache.linkis.common.listener.Event
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.EntranceServer
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.orchestrator.listener.OrchestratorAsyncEvent
import org.apache.linkis.orchestrator.listener.task.{
TaskLogEvent,
TaskLogListener,
TaskProgressListener,
TaskRunningInfoEvent
}

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component

import javax.annotation.PostConstruct

@Component
class SimpleASyncListener extends TaskLogListener with TaskProgressListener with Logging {

@Autowired private var entranceServer: EntranceServer = _

@PostConstruct
def init(): Unit = {
if (EntranceConfiguration.LINKIS_ENTRANCE_SKIP_ORCHESTRATOR) {
SimpleExecuteBusContext
.getOrchestratorListenerBusContext()
.getOrchestratorAsyncListenerBus
.addListener(this)
}
}

override def onLogUpdate(taskLogEvent: TaskLogEvent): Unit = {}

override def onProgressOn(taskProgressEvent: TaskRunningInfoEvent): Unit = {}

override def onEvent(event: OrchestratorAsyncEvent): Unit = {}

override def onEventError(event: Event, t: Throwable): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.execute.simple

import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.execute.{EngineExecuteAsyncReturn, EntranceExecutor}
import org.apache.linkis.entrance.job.EntranceExecuteRequest
import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.orchestrator.code.plans.ast.CodeJob
import org.apache.linkis.orchestrator.code.plans.logical.CodeLogicalUnitTaskDesc
import org.apache.linkis.orchestrator.computation.entity.ComputationJobReq
import org.apache.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask
import org.apache.linkis.orchestrator.converter.ASTContextImpl
import org.apache.linkis.orchestrator.execution.{
AsyncTaskResponse,
FailedTaskResponse,
SucceedTaskResponse
}
import org.apache.linkis.orchestrator.listener.OrchestratorListenerBusContext
import org.apache.linkis.orchestrator.plans.physical.{ExecTask, PhysicalContextImpl}
import org.apache.linkis.orchestrator.plans.unit.CodeLogicalUnit
import org.apache.linkis.scheduler.executer._

import java.util

class SimpleEntranceExecutor(
id: Long,
orchestratorListenerBusContext: OrchestratorListenerBusContext
) extends EntranceExecutor(id)
with SingleTaskOperateSupport
with Logging {

private var codeUnitExecTask: CodeLogicalUnitExecTask = null

override protected def callExecute(request: ExecuteRequest): ExecuteResponse = {
val entranceExecuteRequest: EntranceExecuteRequest = request match {
case request: EntranceExecuteRequest =>
request
case _ =>
throw new EntranceErrorException(
EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
s"Invalid entranceExecuteRequest : ${request.code}"
)
}
// 1. create JobReq
val computationJobReq = requestToComputationJobReq(entranceExecuteRequest)
// 2. create code job
val codeJob = new CodeJob(null, null)
val astContext = ASTContextImpl.newBuilder().setJobReq(computationJobReq).build()
codeJob.setAstContext(astContext)
codeJob.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
codeJob.setParams(computationJobReq.getParams)
codeJob.setName(computationJobReq.getName + "_Job")
codeJob.setSubmitUser(computationJobReq.getSubmitUser)
codeJob.setExecuteUser(computationJobReq.getExecuteUser)
codeJob.setLabels(computationJobReq.getLabels)
codeJob.setPriority(computationJobReq.getPriority)
codeUnitExecTask = new CodeLogicalUnitExecTask(Array[ExecTask](), Array[ExecTask]())
// set job id, can find by getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job])
codeUnitExecTask.setId(entranceExecuteRequest.getJob.getId)
// 3.set code unit
codeUnitExecTask.setCodeLogicalUnit(computationJobReq.getCodeLogicalUnit)
codeUnitExecTask.setTaskDesc(CodeLogicalUnitTaskDesc(codeJob))
// 4. set context
val context = new PhysicalContextImpl(codeUnitExecTask, Array.empty)
context.setSyncBus(orchestratorListenerBusContext.getOrchestratorSyncListenerBus)
context.setAsyncBus(orchestratorListenerBusContext.getOrchestratorAsyncListenerBus)
// 5. execute
val response = codeUnitExecTask.execute()
response match {
case async: AsyncTaskResponse =>
new EngineExecuteAsyncReturn(request, null)
case succeed: SucceedTaskResponse =>
logger.info(s"Succeed to execute ExecTask(${getId})")
SuccessExecuteResponse()
case failedTaskResponse: FailedTaskResponse =>
logger.info(s"Failed to execute ExecTask(${getId})")
ErrorExecuteResponse(failedTaskResponse.getErrorMsg, failedTaskResponse.getCause)
case _ =>
logger.warn(s"ExecTask(${getId}) need to retry")
ErrorExecuteResponse("unknown response: " + response, null)
}
}

def requestToComputationJobReq(
entranceExecuteRequest: EntranceExecuteRequest
): ComputationJobReq = {
val jobReqBuilder = ComputationJobReq.newBuilder()
jobReqBuilder.setId(entranceExecuteRequest.jobId())
jobReqBuilder.setSubmitUser(entranceExecuteRequest.submitUser())
jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
val codeTypeLabel: Label[_] = LabelUtil.getCodeTypeLabel(entranceExecuteRequest.getLabels)
if (null == codeTypeLabel) {
throw new EntranceErrorException(
EntranceErrorCode.EXECUTE_REQUEST_INVALID.getErrCode,
s"code Type Label is needed"
)
}
val codes = new util.ArrayList[String]()
codes.add(entranceExecuteRequest.code())
val codeLogicalUnit =
new CodeLogicalUnit(codes, codeTypeLabel.asInstanceOf[CodeLanguageLabel])
jobReqBuilder.setCodeLogicalUnit(codeLogicalUnit)
jobReqBuilder.setLabels(entranceExecuteRequest.getLabels)
jobReqBuilder.setExecuteUser(entranceExecuteRequest.executeUser())
jobReqBuilder.setParams(entranceExecuteRequest.properties())
jobReqBuilder.build().asInstanceOf[ComputationJobReq]
}

override def kill(): Boolean = {
LoggerUtils.setJobIdMDC(getId.toString)
logger.info("Entrance start to kill job {} invoke Orchestrator ", this.getId)
Utils.tryAndWarn {
if (null != codeUnitExecTask) {
codeUnitExecTask.kill()
}
}
LoggerUtils.removeJobIdMDC()
true
}

override def pause(): Boolean = {
true
}

override def resume(): Boolean = {
true
}

override def close(): Unit = {
getEngineExecuteAsyncReturn.foreach { e =>
e.notifyError(s"$toString has already been completed with state $state.")
}
}

}

0 comments on commit b1c1f67

Please sign in to comment.