Skip to content

Commit

Permalink
fixed a bug that one thread per concurrent-task rather than per concu…
Browse files Browse the repository at this point in the history
…rrent-task-queue is actually generated for Concurrent Executor

fixes #952
  • Loading branch information
Fuu3214 committed Aug 10, 2021
1 parent e26a62f commit 5816180
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
private var lastTaskDaemonFuture: Future[_] = _

// for concurrent executor
private var concurrentTaskQueueFifoConsumerFuture: Future[_] = _
private var consumerThread: Thread = _
private var concurrentTaskQueue: BlockingQueue[EngineConnTask] = _

@Autowired
Expand All @@ -84,15 +84,15 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re

private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = synchronized {
Utils.tryCatch {
var sender : Sender = null
var sender: Sender = null
if (null != task && null != task.getCallbackServiceInstance() && null != msg) {
sender = Sender.getSender(task.getCallbackServiceInstance())
sender.send(msg)
} else {
// todo
debug("SendtoEntrance error, cannot find entrance instance.")
}
}{
} {
t =>
val errorMsg = s"SendToEntrance error. $msg" + t.getCause
error(errorMsg, t)
Expand Down Expand Up @@ -179,6 +179,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
case _ =>
}
clearCache(task.getTaskId)
executeTask(task, computationExecutor)
}
}
lastTask = task
Expand All @@ -194,71 +195,73 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
}
}
concurrentTaskQueue.put(task)
if (null == concurrentTaskQueueFifoConsumerFuture) synchronized {
val consumerRunnable = new Runnable {
override def run(): Unit = {
var errCount = 0
val ERR_COUNT_MAX = 20
while (true) {
Utils.tryCatch {
if (! executor.isBusy && ! executor.isClosed) {
val task = concurrentTaskQueue.take()
val concurrentJob = new Runnable {
override def run(): Unit = {
lastTask = task
Utils.tryCatch {
info(s"Start to run task ${task.getTaskId}")
val response = executor.execute(task)
response match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
error(message, throwable)
LogHelper.pushAllRemainLogs()
executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
case _ => //TODO response maybe lose
}
clearCache(task.getTaskId)
} {
case t: Throwable => {
errCount += 1
error(s"Execute task ${task.getTaskId} failed :", t)
if (errCount > ERR_COUNT_MAX) {
error(s"Executor run failed for ${errCount} times over ERROR_COUNT_MAX : ${ERR_COUNT_MAX}, will shutdown.")
executor.transition(NodeStatus.ShuttingDown)
}
if (null == consumerThread) synchronized {
if (null == consumerThread) {
consumerThread = new Thread(createConsumerRunnable(executor))
consumerThread.setDaemon(true)
consumerThread.setName("ConcurrentTaskQueueFifoConsumerThread")
consumerThread.start()
}
}
SubmitResponse(task.getTaskId)
}

private def createConsumerRunnable(executor: ComputationExecutor): Thread = {
val consumerRunnable = new Runnable {
override def run(): Unit = {
var errCount = 0
val ERR_COUNT_MAX = 20
while (true) {
Utils.tryCatch {
if (!executor.isBusy && !executor.isClosed) {
val task = concurrentTaskQueue.take()
val concurrentJob = new Runnable {
override def run(): Unit = {
lastTask = task
Utils.tryCatch {
logger.info(s"Start to run task ${task.getTaskId}")
executeTask(task, executor)
} {
case t: Throwable => {
errCount += 1
logger.error(s"Execute task ${task.getTaskId} failed :", t)
if (errCount > ERR_COUNT_MAX) {
logger.error(s"Executor run failed for ${errCount} times over ERROR_COUNT_MAX : ${ERR_COUNT_MAX}, will shutdown.")
executor.transition(NodeStatus.ShuttingDown)
}
}
}
}
cachedThreadPool.submit(concurrentJob)
}
Thread.sleep(20)
} {
case t: Throwable =>
errCount += 1
val lastTaskId = if (null == lastTask) "none" else lastTask.getTaskId
error(s"Execute task ${lastTaskId} failed :", t)
if (errCount > ERR_COUNT_MAX) {
error(s"Executor run failed for ${errCount} times over ERROR_COUNT_MAX : ${ERR_COUNT_MAX}, will shutdown.")
executor.transition(NodeStatus.ShuttingDown)
}
cachedThreadPool.submit(concurrentJob)
}
Thread.sleep(20)
} { case t: Throwable =>
logger.error(s"consumerThread failed :", t)
}
}
}
if (null == concurrentTaskQueueFifoConsumerFuture) {
val consumerThread = new Thread(consumerRunnable)
consumerThread.setDaemon(true)
consumerThread.setName("ConcurrentTaskQueueFifoConsumerThread")
consumerThread.start()
}
}
SubmitResponse(task.getTaskId)
new Thread(consumerRunnable)
}

private def executeTask(task: EngineConnTask, executor: ComputationExecutor): Unit = {
val response = executor.execute(task)
response match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
error(message, throwable)
LogHelper.pushAllRemainLogs()
executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
case _ => logger.warn(s"task get response is $response")
}
clearCache(task.getTaskId)
}

/**
* Open daemon thread
* @param task engine conn task
*
* @param task engine conn task
* @param scheduler scheduler
* @return
*/
Expand All @@ -275,6 +278,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
}
})
}

override def taskProgress(taskID: String): ResponseTaskProgress = {
var response = ResponseTaskProgress(taskID, 0, null)
if (StringUtils.isBlank(taskID)) return response
Expand Down Expand Up @@ -313,7 +317,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
} else {
error(s"Executor of taskId : $taskID is not cached.")
}
Utils.tryAndWarn (Thread.sleep(50))
Utils.tryAndWarn(Thread.sleep(50))
if (null != lastTask && lastTask.getTaskId.equalsIgnoreCase(taskID)) {
if (null != lastTaskFuture && !lastTaskFuture.isDone) {
Utils.tryAndWarn {
Expand Down Expand Up @@ -343,7 +347,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re

@Receiver
override def dealRequestTaskPause(requestTaskPause: RequestTaskPause): Unit = {
info(s"Pause is Not supported for task : " + requestTaskPause.execId )
info(s"Pause is Not supported for task : " + requestTaskPause.execId)
}

@Receiver
Expand All @@ -360,7 +364,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re

@Receiver
override def dealRequestTaskResume(requestTaskResume: RequestTaskResume): Unit = {
info(s"Resume is Not support for task : " + requestTaskResume.execId )
info(s"Resume is Not support for task : " + requestTaskResume.execId)
}

override def onEvent(event: EngineConnSyncEvent): Unit = event match {
Expand All @@ -386,7 +390,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
val executor = executorManager.getReportExecutor
executor match {
case computationExecutor: ComputationExecutor =>
if (computationExecutor.isBusy) {
if (computationExecutor.isBusy) {
sendToEntrance(lastTask, ResponseTaskLog(lastTask.getTaskId, logUpdateEvent.log))
}
case _ =>
Expand All @@ -406,6 +410,8 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
lastTask = task
LogHelper.pushAllRemainLogs()
}
val toStatus = taskStatusChangedEvent.toStatus
logger.info(s"send task ${task.getTaskId} status $toStatus to entrance")
sendToEntrance(task, ResponseTaskStatus(taskStatusChangedEvent.taskId, taskStatusChangedEvent.toStatus))
} else {
error("Task cannot null! taskStatusChangedEvent: " + ComputationEngineUtils.GSON.toJson(taskStatusChangedEvent))
Expand Down Expand Up @@ -433,7 +439,7 @@ class TaskExecutionServiceImpl extends TaskExecutionService with Logging with Re
taskResultCreateEvent.alias
))
} else {
error(s"Task cannot null! taskResultCreateEvent: ${taskResultCreateEvent.taskId}" )
error(s"Task cannot null! taskResultCreateEvent: ${taskResultCreateEvent.taskId}")
}
info(s"Finished to deal result event ${taskResultCreateEvent.taskId}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import com.webank.wedatasphere.linkis.common.conf.CommonVars
object HiveEngineConfiguration {

val HIVE_LIB_HOME = CommonVars[String]("hive.lib", CommonVars[String]("HIVE_LIB", "/appcom/Install/hive/lib").getValue)
val ENABLE_FETCH_BASE64 = CommonVars[Boolean]("wds.linkis.hive.enable.fetch.base64",true).getValue
val ENABLE_FETCH_BASE64 = CommonVars[Boolean]("wds.linkis.hive.enable.fetch.base64",false).getValue
val BASE64_SERDE_CLASS = CommonVars[String]("wds.linkis.hive.base64.serde.class","com.webank.wedatasphere.linkis.engineplugin.hive.serde.CustomerDelimitedJSONSerDe").getValue
}

0 comments on commit 5816180

Please sign in to comment.