Earliest Deadline First (dynamic version)

Three tasks are added to a priority queue during runtime. 
The first two tasks after 5 seconds of listening for incoming tasks and the last task after another 5 seconds.

The full scheduling order can be seen in the execution_order List, after all the Tasks have finished running

In [None]:
from datetime import datetime
import queue
import time
import threading

Tasks are made comparable and can be executed. 

Each task has:
-  an id
- an execution time (time needed to execute the whole task)
- a deadline
- a start time, to track when the task started running/ running again 
- elapsed time, to track a tasks total time running (not counting the time during preemption)
- exists since: time, when the task was created and ready for execution


In [None]:
currentlyExecutedTask = None    # holds the task task is being to compare different deadlines
currentlyExecutedThread = None  # holds the thread of the task, that is being executed
execution_order = []            
stopFlag = False            

class Task:

    def __init__(self, id, execution_time, deadline):
        self.id = id
        self.execution_time = execution_time
        self.deadline = deadline
        self.start_time = None
        self.elapsed_time = 0
        self.exists_since = datetime.now().strftime('%Y-%m-%d %H:%M:%S')


    def print_values(self):
        print(f'Task {self.id}:')
        print(f'execution time - {self.execution_time} seconds')
        print(f'instantiated at {self.exists_since}')
        print(f'deadline - {self.deadline} \n')


    def __lt__(self, other):  #makes tasks comparable
        return self.deadline < other.deadline
    
    def execute_task(self):

        global stopFlag
        global currentlyExecutedTask

        self.start_time = datetime.now()
        print(f"Task {self.id} started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} \n")
        
        while not stopFlag and self.elapsed_time < self.execution_time:
            time.sleep(1) 
            self.elapsed_time = (datetime.now() - self.start_time).total_seconds()
        
        self.end_time = datetime.now()
        self.elapsed_time = (self.end_time - self.start_time).total_seconds()
        
        if stopFlag:
            print(f"Task {self.id} was interrupted at {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")

        else:
            print(f"Task {self.id} finished at {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
            currentlyExecutedTask = None
        print(f"Elapsed time: {self.elapsed_time} seconds \n")

Using a priority queue, the tasks can be held in order.


The task with the highest priority is run in a thread, which can be stopped and rerun at a later time, with the progress (elapsed time) saved.

The monitoring queue, run in a separate thread, constantly checks if a task has with a higher priority (closer deadline) has arrived.


In [None]:
task_queue = queue.PriorityQueue()
stopMonitoring = False

def runTaskInThread(task):
    task_thread = threading.Thread(target= task.execute_task)
    task_thread.start()
    return task_thread

def monitor_queue():

    global currentlyExecutedTask
    global stopFlag
    global currentlyExecutedThread

    while not stopMonitoring:

        if task_queue.qsize() == 0:
            print("task queue is empty")
        else: 

            if currentlyExecutedTask == None:       #make sure there is a task being executed

                currentlyExecutedTask = task_queue.get()
                stopFlag = False
                currentlyExecutedThread = runTaskInThread(currentlyExecutedTask)
                execution_order.append(currentlyExecutedTask)

            else:
                #make sure task queue still has values 
                if not task_queue.empty():

                    # currently queued task with highest priority
                    highestPriority = task_queue.get() 
            
                     #check if there is a queued task with a higher priority, if yes, swap the tasks
                    if highestPriority.deadline < currentlyExecutedTask.deadline: 
                  
                        #currentlyExecutedTask.stop_task()
                        stopFlag = True
                        task_queue.put(currentlyExecutedTask)
                        currentlyExecutedTask = None
                        task_queue.put(highestPriority)

                    else: 
                        #put the task back if it doesn't have a higher priority than the one currently being executed
                        task_queue.put(highestPriority)


            
        time.sleep(1)  # Sleep for a second before checking again

The first two tasks are added after 5 seconds of starting the monitoring queue and the last task after another 5 seconds.

In [None]:
monitoring_thread = threading.Thread(target=monitor_queue)
monitoring_thread.start()

time.sleep(5)

print("adding two new tasks to the priority queue \n")
task_queue.put(Task(1, 10, datetime(2024, 4, 17, 15, 15)))
task_queue.put(Task(2, 1, datetime(2024, 4, 17, 15, 30)))

time.sleep(5)
print("adding one new task to the priority queue")
task_queue.put(Task(3, 5, datetime(2024, 4, 17, 15, 10)))
print("adding complete, no more new tasks to add \n" )

The execution order can be printed out here, after uncommenting the last line

In [None]:
def printSchedulingOrder():
    global stopMonitoring
    stopMonitoring = True
    for i in execution_order:
          i.print_values()

#printSchedulingOrder()