In [96]:
# Reference code from Session 13 - Heaps and priority queues

import heapq
import random
import math

class Task:
    """
    Represents a single task with its dependencies, duration, and time constraints.

    Attributes:
        id (int): The unique identifier for the task.
        description (str): A brief description of the task.
        duration (int): The time in minutes required to complete the task.
        dependencies (list): A list of task IDs that must be completed before this task can start.
        earliest_start_time (int): The earliest time (in minutes from start) this task can begin.
        latest_start_time (int): The latest time (in minutes from start) this task must begin.
        status (str): The current status of the task (N, I, C).
        priority (float): A random float used for tie-breaking in the priority queue.
    """
    
    def __init__(self, id, description, duration,
                 dependencies, earliest_start_time=None, latest_start_time=None, status="N"):
        """
        Initializes a new Task instance.
        
        Args:
            id (int): Task ID.
            description (str): Task description.
            duration (int): Task duration in minutes.
            dependencies (list): List of prerequisite task IDs.
            earliest_start_time (int, optional): Earliest start time in minutes.
            latest_start_time (int, optional): Latest start time (deadline) in minutes.
            status (str, optional): Initial status, defaults to 'N' (Not Started).
        """
        self.id = id
        self.description = description
        self.duration = duration
        self.dependencies = dependencies
        self.earliest_start_time = earliest_start_time
        self.latest_start_time = latest_start_time
        self.status = status
        
        # Assign a random priority for tie-breaking between otherwise equal tasks
        self.priority = random.random()

    def __lt__(self, other):
        """
        Defines the priority logic for the heap queue.
        
        Tasks are prioritized using a tuple, compared in order:
        1. Earliest Start Time (earlier is higher priority)
        2. Latest Start Time (earlier is higher priority)
        3. Random Priority (serves as a tie-breaker)
        
        Returns:
            bool: True if this task has higher priority than 'other', False otherwise.
        """
        
        # Use math.inf as a default for tasks without a specific time constraint.
        # This gives them the lowest possible priority for that tuple element.
        prio_self_e = self.earliest_start_time if self.earliest_start_time is not None else math.inf
        prio_other_e = other.earliest_start_time if other.earliest_start_time is not None else math.inf
        
        prio_self_l = self.latest_start_time if self.latest_start_time is not None else math.inf
        prio_other_l = other.latest_start_time if other.latest_start_time is not None else math.inf

        # Create the priority tuples for comparison
        self_tuple = (prio_self_e, prio_self_l, self.priority)
        other_tuple = (prio_other_e, prio_other_l, other.priority)
        
        # Python's built-in tuple comparison will handle the prioritized sorting
        return self_tuple < other_tuple
    
class TaskScheduler:
    """
    Manages a list of Task objects and runs a simulation to create a viable schedule.
    
    Uses a priority queue to manage "ready" tasks and a separate list for
    tasks that are "time-blocked" (i.e., waiting for their start time).
    """
    NOT_STARTED = 'N'
    IN_PRIORITY_QUEUE = 'I'
    COMPLETED = 'C'
    
    def __init__(self, tasks):
        """
        Initializes the TaskScheduler.
        
        Args:
            tasks (list): A list of Task objects to be scheduled.
        """
        self.tasks = tasks
        # A dictionary to quickly look up task objects by their ID
        self.task_map = {task.id: task for task in tasks} 
        # The main priority queue for tasks that are ready to run
        self.priority_queue = []
        # A list for tasks that are ready but cannot start yet (e.g., too early)
        self.time_blocked_tasks = []
        
    def print_self(self):
        """Prints summary of all tasks and their constraints."""
        print("Tasks added to the scheduler:")
        print("---------------------------------------")
        for t in self.tasks:
            print(f"‚û°Ô∏è'{t.description}', duration = {t.duration} mins.")   
            
            if t.dependencies:
                try:
                    # Look up dependency descriptions for a friendlier print
                    dep_descriptions = [self.task_map[dep_id].description for dep_id in t.dependencies]
                    print(f"\t ‚ö†Ô∏è Depends on: \"{', '.join(dep_descriptions)}\"")
                except KeyError as e:
                    print(f"\t ‚ö†Ô∏è Error: Dependency ID {e} not found!")
            
            # Print the correct time constraint type
            if t.earliest_start_time and t.latest_start_time:
                if t.earliest_start_time == t.latest_start_time:
                    print(f"\t ‚è∞ FIXED: Must start exactly at {self.format_time(t.earliest_start_time)}")
                else:
                    print(f"\t ‚è∞ WINDOW: Must run between {self.format_time(t.earliest_start_time)} and {self.format_time(t.latest_start_time)}")
            elif t.earliest_start_time:
                print(f"\t ‚è∞ EARLIEST: Cannot start before {self.format_time(t.earliest_start_time)}")
            elif t.latest_start_time:
                print(f"\t ‚è∞ DEADLINE: Must start by {self.format_time(t.latest_start_time)}")
            else:
                print(f"\t ‚è∞ FLEXIBLE: No time constraints")
            
    def remove_dependency(self, id):
        """
        Removes a completed task ID from the dependency lists of all other tasks.
        
        Args:
            id (int): The ID of the task that has just been completed.
        """
        for t in self.tasks:
            if t.id != id and id in t.dependencies:
                t.dependencies.remove(id)           
            
    def get_tasks_ready(self):
        """Finds tasks with no remaining dependencies and adds them to the priority queue."""
        for task in self.tasks:
            if task.status == self.NOT_STARTED and not task.dependencies: 
                task.status = self.IN_PRIORITY_QUEUE 
                heapq.heappush(self.priority_queue, task)
    
    def move_ready_time_blocked_tasks(self, current_time):
        """
        Moves tasks from the time-blocked list back to the main priority queue
        if their 'earliest_start_time' has been reached.
        
        Args:
            current_time (int): The current simulation time.
        """
        tasks_to_move = []
        for task in self.time_blocked_tasks:
            if task and (task.earliest_start_time is None or current_time >= task.earliest_start_time):
                tasks_to_move.append(task)
        
        for task in tasks_to_move:
            self.time_blocked_tasks.remove(task)
            heapq.heappush(self.priority_queue, task)

    def check_unscheduled_tasks(self):
        """
        Checks if any tasks are not yet marked as COMPLETED.
        
        Returns:
            bool: True if unfinished tasks remain, False otherwise.
        """
        for task in self.tasks:
            if task.status != self.COMPLETED:
                return True
        return False   
    
    def format_time(self, time):
        """
        Converts minutes-from-start into a human-readable string.
        
        Args:
            time (int): Time in minutes.
            
        Returns:
            str: Formatted time string (e.g., "9h30" or "Day 2, 2h30").
        """
        if not isinstance(time, (int, float)):
            return "Invalid time"
        days = int(time) // 1440 # 1440 minutes in a day
        minutes_today = int(time) % 1440
        hour = minutes_today // 60
        minute = minutes_today % 60
        
        if days > 0:
            return f"Day {days+1}, {hour}h{minute:02d}"
        return f"{hour}h{minute:02d}"
    
    def run_task_scheduler(self, starting_time):
        """
        Runs the main scheduling simulation loop.
        
        Continues until all tasks are completed or a deadlock is detected.
        
        Args:
            starting_time (int): The time (in minutes) to start the simulation.
        """
        current_time = starting_time
        print("Running the scheduler:\n")
        
        while self.check_unscheduled_tasks():
            
            # Add any newly available tasks to the priority queue
            self.get_tasks_ready()
            
            # Check if any time-blocked tasks can now be moved to the main queue
            self.move_ready_time_blocked_tasks(current_time)
            
            if self.priority_queue:
                # Get the highest-priority task that is ready
                task = heapq.heappop(self.priority_queue)
                
                # Check 1: Is it too early to run this task?
                if task.earliest_start_time is not None and current_time < task.earliest_start_time:
                    # If so, move it to the time-blocked list and check again later
                    self.time_blocked_tasks.append(task)
                    continue
                
                # Check 2: Look ahead to prevent conflicts with future, fixed-time tasks.
                
                # Find the start time of the *next* task in the waiting list
                next_hard_start_time = math.inf
                if self.time_blocked_tasks:
                    valid_times = [t.earliest_start_time for t in self.time_blocked_tasks if t.earliest_start_time is not None]
                    if valid_times:
                        next_hard_start_time = min(valid_times)
                
                task_end_time = current_time + task.duration
                
                # Check if running this task will make us late for the next fixed task
                if task_end_time > next_hard_start_time:
                    print(f"üï∞t={self.format_time(current_time)}... Delaying flexible task '{task.description}' to keep {self.format_time(next_hard_start_time)} free.")
                    
                    # Put this task back on the queue to be run later
                    heapq.heappush(self.priority_queue, task)
                    
                    # Advance time to the start of the fixed task we were waiting for
                    current_time = next_hard_start_time
                    continue # Restart the loop
                
                # If we pass all checks, we can run the task.
                
                # Check for a deadline violation (running LATE)
                if task.latest_start_time is not None and current_time > task.latest_start_time:
                    print(f"üï∞t={self.format_time(current_time)} - ‚ö†Ô∏è WARNING: Running '{task.description}' LATE (Deadline was {self.format_time(task.latest_start_time)})")
                
                # Execute the task
                print(f"üï∞t={self.format_time(current_time)}")
                print(f"\tstarted '{task.description}' for {task.duration} mins...")
                current_time += task.duration            
                print(f"\t‚úÖ t={self.format_time(current_time)}, task completed!") 
                
                # Update dependencies and status
                self.remove_dependency(task.id)
                task.status = self.COMPLETED
            
            elif self.time_blocked_tasks:
                # If the main queue is empty, we must wait for a time-blocked task.
                # Advance time to the start of the next available task.
                try:
                    next_available_time = min(t.earliest_start_time for t in self.time_blocked_tasks if t.earliest_start_time is not None)
                except ValueError:
                     print(f"üï∞t={self.format_time(current_time)} - ERROR: Time-blocked, but no valid start time!")
                     break
                
                if current_time < next_available_time:
                    print(f"üï∞t={self.format_time(current_time)}... no tasks runnable. Waiting until {self.format_time(next_available_time)}...")
                    current_time = next_available_time
            
            elif self.check_unscheduled_tasks():
                 # If both lists are empty but tasks remain, we have a deadlock (e.g., circular dependency)
                 print(f"üï∞t={self.format_time(current_time)} - ERROR: Deadlock detected!")
                 break
            
        total_time = current_time - starting_time             
        print(f"\nüèÅ Completed all planned tasks in {int(total_time)//60}h{int(total_time)%60:02d}min!")

In [97]:
# Reference code from Session 13 - Heaps and priority queues

"""
List of Tasks - 5 Nov 2025:

‚Ä¢ Fixed tasks:
1) Take CS111 class

‚Ä¢ Partly Timed Tasks
1) Wake up at 09:30
2) Have lunch at 14:30
3) Buy Bento at Lincos after gym after 20:00
4) Go to bed before 02:30

‚Ä¢ Non-timely tasks:
1) Make CS113 Pre-Class Work
2) Have Japan's signature drink - Matcha Green Tea
3) Go to the gym
4) Buy 100¬• (~$0.65) Snacks for Kamaishi Trip at Mini Stop after Lincos
5) Take shower after gym
6) Have dinner

"""

def get_pristine_tasks():
    """
    Creates and returns a new, 'pristine' list of all Task objects.
    
    This function is necessary for two reasons:
    1. The scheduler modifies the task list as it runs (e.g., dependencies, status).
    2. We need new random priority numbers assigned (in Task.__init__) for each
       schedule alternative.
       
    Returns:
        list: A new list of Task objects.
    """
    tasks = [
        Task(id=0, description='Wake up at 09:30', 
             duration=10, dependencies=[], earliest_start_time=570), 
        Task(id=1, description='Have lunch at 14:30', 
             duration=60, dependencies=[0], earliest_start_time=870), 
        Task(id=2, description='Go to the gym', 
             duration=120, dependencies=[0, 1]), 
        Task(id=3, description='Buy Bento at Lincos after gym after 20:00', 
             duration=25, dependencies=[0, 2], earliest_start_time=1200), 
        Task(id=4, description='Buy 100¬• (~$0.65) Snacks for Kamaishi Trip at Mini Stop after Lincos', 
             duration=10, dependencies=[0, 1, 3]), 
        Task(id=5, description='Take shower after gym', 
             duration=30, dependencies=[0, 2]), 
        Task(id=6, description='Have dinner', 
             duration=40, dependencies=[0, 5]), 
        Task(id=7, description='Prepare luggage for Kamaishi Trip', 
             duration=100, dependencies=[0, 6]),
         Task(id=8, description='Make CS113 Pre-Class Work', 
             duration=120, dependencies=[0, 7]),
         Task(id=9, description="Have Japan's signature drink - Matcha Green Tea", 
             duration=15, dependencies=[0]),
         Task(id=10, description='Go to bed before 02:30', 
             duration=15, dependencies=[0, 1, 4, 6, 7, 8, 9], earliest_start_time=22*60, latest_start_time=26*60+30),
         Task(id=11, description="Attend CS111 Session from 16:00 to 17:30",
              duration=90, dependencies=[0], earliest_start_time=16*60, latest_start_time=16*60)
        ]
    return tasks

# This part just prints the task definitions once for review
task_scheduler_printer = TaskScheduler(get_pristine_tasks())
print("--- Printing Task Definitions ---")
task_scheduler_printer.print_self()
print("---------------------------------")

--- Printing Task Definitions ---
Tasks added to the scheduler:
---------------------------------------
‚û°Ô∏è'Wake up at 09:30', duration = 10 mins.
	 ‚è∞ EARLIEST: Cannot start before 9h30
‚û°Ô∏è'Have lunch at 14:30', duration = 60 mins.
	 ‚ö†Ô∏è Depends on: "Wake up at 09:30"
	 ‚è∞ EARLIEST: Cannot start before 14h30
‚û°Ô∏è'Go to the gym', duration = 120 mins.
	 ‚ö†Ô∏è Depends on: "Wake up at 09:30, Have lunch at 14:30"
	 ‚è∞ FLEXIBLE: No time constraints
‚û°Ô∏è'Buy Bento at Lincos after gym after 20:00', duration = 25 mins.
	 ‚ö†Ô∏è Depends on: "Wake up at 09:30, Go to the gym"
	 ‚è∞ EARLIEST: Cannot start before 20h00
‚û°Ô∏è'Buy 100¬• (~$0.65) Snacks for Kamaishi Trip at Mini Stop after Lincos', duration = 10 mins.
	 ‚ö†Ô∏è Depends on: "Wake up at 09:30, Have lunch at 14:30, Buy Bento at Lincos after gym after 20:00"
	 ‚è∞ FLEXIBLE: No time constraints
‚û°Ô∏è'Take shower after gym', duration = 30 mins.
	 ‚ö†Ô∏è Depends on: "Wake up at 09:30, Go to the gym"
	 ‚è∞ FLEXIBLE: No time 

In [98]:
# Reference code from Session 13 - Heaps and priority queues

# Running the Scheduler
start_scheduler_at = int((9.5)*60) # 9:30 AM

# Loop 3 times to create 3 different schedule alternatives
for i in range(3):
    print("\n" + "="*30)
    print(f"üóìÔ∏è SCHEDULE ALTERNATIVE {i+1}")
    print("="*30)
    
    # Get a fresh, new list of tasks with new random priorities
    fresh_tasks = get_pristine_tasks()
    
    # Create and run a new scheduler instance for this alternative
    task_scheduler = TaskScheduler(fresh_tasks)
    task_scheduler.run_task_scheduler(start_scheduler_at)


üóìÔ∏è SCHEDULE ALTERNATIVE 1
Running the scheduler:

üï∞t=9h30
	started 'Wake up at 09:30' for 10 mins...
	‚úÖ t=9h40, task completed!
üï∞t=9h40
	started 'Have Japan's signature drink - Matcha Green Tea' for 15 mins...
	‚úÖ t=9h55, task completed!
üï∞t=9h55... no tasks runnable. Waiting until 14h30...
üï∞t=14h30
	started 'Have lunch at 14:30' for 60 mins...
	‚úÖ t=15h30, task completed!
üï∞t=15h30... Delaying flexible task 'Go to the gym' to keep 16h00 free.
üï∞t=16h00
	started 'Attend CS111 Session from 16:00 to 17:30' for 90 mins...
	‚úÖ t=17h30, task completed!
üï∞t=17h30
	started 'Go to the gym' for 120 mins...
	‚úÖ t=19h30, task completed!
üï∞t=19h30
	started 'Take shower after gym' for 30 mins...
	‚úÖ t=20h00, task completed!
üï∞t=20h00
	started 'Buy Bento at Lincos after gym after 20:00' for 25 mins...
	‚úÖ t=20h25, task completed!
üï∞t=20h25
	started 'Have dinner' for 40 mins...
	‚úÖ t=21h05, task completed!
üï∞t=21h05
	started 'Buy 100¬• (~$0.65) Snacks for Kamai