Skip to content

Commit

Permalink
linkis-entranc - fix JobQueueLabel and jobRunningLabel (#2237)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexkun committed Jun 7, 2022
1 parent ab6f29f commit adfc902
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ trait SchedulerEvent extends Logging {

def getEndTime = endTime
def getStartTime = startTime

/*
* To be compatible with old versions.
* It's not recommonded to use scheduledTime, which was only several mills at most time.
*/
@Deprecated
def getScheduledTime = scheduledTime

def getId = id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,32 @@ class JobTimeoutManager extends Logging {
private def timeoutDetective(): Unit = {
if (timeoutCheck) {
def checkAndSwitch(job: EntranceJob): Unit = {
info(s"Checking whether the job timed out: ${job.getId()}")
val currentTime = System.currentTimeMillis() / 1000
val queuingTime = currentTime - job.getScheduledTime / 1000
val runningTime = currentTime - job.getStartTime / 1000
info(s"Checking whether the job id ${job.getJobRequest.getId()} timed out. ")
val currentTimeSeconds = System.currentTimeMillis() / 1000
// job.isWaiting == job in queue
val jobScheduleStartTimeSeconds = if (job.isWaiting) job.createTime / 1000 else currentTimeSeconds
val queuingTimeSeconds = currentTimeSeconds - jobScheduleStartTimeSeconds
val jobRunningStartTimeSeconds = if (job.getStartTime > 0) job.getStartTime / 1000 else currentTimeSeconds
val runningTimeSeconds = currentTimeSeconds - jobRunningStartTimeSeconds
if (!job.isCompleted) {
job.jobRequest.getLabels foreach {
case queueTimeOutLabel: JobQueuingTimeoutLabel =>
if (queueTimeOutLabel.getQueuingTimeout > 0 && queuingTime >= queueTimeOutLabel.getQueuingTimeout) {
warn(s"Job queuing timeout, cancel it now: ${job.getId()}")
job.cancel()
if (job.isWaiting && queueTimeOutLabel.getQueuingTimeout > 0 && queuingTimeSeconds >= queueTimeOutLabel.getQueuingTimeout) {
logger.warn(s"Job ${job.getJobRequest.getId()} queued time : ${queuingTimeSeconds} seconds, which was over queueTimeOut : ${queueTimeOutLabel.getQueuingTimeout} seconds, cancel it now! ")
job.onFailure(s"Job queued ${queuingTimeSeconds} seconds over max queue time : ${queueTimeOutLabel.getQueuingTimeout} seconds.", null)
}
case jobRunningTimeoutLabel: JobRunningTimeoutLabel =>
if (jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTime >= jobRunningTimeoutLabel.getRunningTimeout) {
warn(s"Job running timeout, cancel it now: ${job.getId()}")
job.cancel()
if (job.isRunning && jobRunningTimeoutLabel.getRunningTimeout > 0 && runningTimeSeconds >= jobRunningTimeoutLabel.getRunningTimeout) {
logger.warn(s"Job ${job.getJobRequest.getId()} run timeout ${runningTimeSeconds} seconds, which was over runTimeOut : ${jobRunningTimeoutLabel.getRunningTimeout} seconds, cancel it now! ")
job.onFailure(s"Job run ${runningTimeSeconds} seconds over max run time : ${jobRunningTimeoutLabel.getRunningTimeout} seconds.", null)
}
case _ =>
}
}
}

timeoutJobByName.foreach(item => {
info(s"Running timeout detection!")
logger.info(s"Running timeout detection!")
synchronized {
jobCompleteDelete(item._1)
if (jobExist(item._1)) checkAndSwitch(item._2)
Expand Down

0 comments on commit adfc902

Please sign in to comment.