In [None]:
import time
import collections
import itertools
import heapq # Used for the priority queue
import copy # Useful for safe iteration if needed

# --- Constants ---
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"

# --- Job Representation (Using a dictionary) ---
# {'id': int, 'command': str, 'required_slots': int, 'priority': int, 'state': str,
#  'submit_time': float, 'start_time': float|None, 'simulated_duration': float,
#  'depends_on': set[int], 'successors': set[int], 'unmet_dependencies': int}

class DependencyScheduler:
    """
    A basic, in-memory, priority-based job scheduler simulating HPC resource
    management with job dependencies (DAG).
    Jobs with lower priority numbers run first, but only after dependencies are met.
    """

    def __init__(self, total_slots: int):
        """
        Initializes the scheduler.

        Args:
            total_slots: The total number of compute slots available.
        """
        if total_slots <= 0:
            raise ValueError("Total slots must be positive.")

        self.total_slots: int = total_slots
        self.free_slots: int = total_slots
        # Min-heap stores (priority, submit_time, job_id) for jobs whose dependencies ARE met
        self.pending_jobs_heap = []
        self.running_jobs = {} # {job_id: job_info}
        self.completed_jobs = {} # {job_id: job_info}
        self.failed_jobs = {} # {job_id: job_info}
        self.job_id_counter = itertools.count(1)
        # Central dictionary stores info for ALL jobs, including those waiting for dependencies
        self.all_jobs = {} # {job_id: job_info}

        print(f"Scheduler initialized with {self.total_slots} slots.")

    def submit_job(self, command: str, required_slots: int, priority: int = 10, duration: float = 5.0, depends_on: list[int] | None = None) -> int:
        """
        Submits a new job to the scheduler with priority and dependencies.

        Args:
            command: The command to be executed (simulated).
            required_slots: The number of slots the job needs.
            priority: Job priority (lower number means higher priority). Defaults to 10.
            duration: The simulated duration of the job in seconds.
            depends_on: A list of job IDs that this job depends on. Defaults to None.

        Returns:
            The unique ID assigned to the job.

        Raises:
            ValueError: If requirements are invalid or a dependency ID doesn't exist or creates a cycle (basic check).
        """
        if required_slots <= 0:
            raise ValueError("Required slots must be positive.")
        if required_slots > self.total_slots:
            raise ValueError(f"Job requires {required_slots} slots, but scheduler only has {self.total_slots} total.")
        if not isinstance(priority, int):
             raise ValueError("Priority must be an integer.")

        depends_on_set = set(depends_on) if depends_on else set()
        unmet_dependency_count = 0

        # Validate dependencies
        for dep_id in depends_on_set:
            if dep_id not in self.all_jobs:
                raise ValueError(f"Dependency job ID {dep_id} not found.")
            # Basic cycle check: cannot depend on self (more complex cycle detection needed for full DAG validation)
            # if dep_id == potential_new_job_id: # Need job_id before creating job_info
            #     raise ValueError("Job cannot depend on itself.")

            # Check if dependency is already finished (COMPLETED or FAILED)
            dep_job_info = self.all_jobs[dep_id]
            if dep_job_info['state'] not in (COMPLETED, FAILED):
                unmet_dependency_count += 1
            # More robust cycle detection would involve graph traversal (e.g., DFS)

        job_id = next(self.job_id_counter)
        submit_time = time.time()
        job_info = {
            'id': job_id,
            'command': command,
            'required_slots': required_slots,
            'priority': priority,
            'state': PENDING,
            'submit_time': submit_time,
            'start_time': None,
            'simulated_duration': duration,
            'depends_on': depends_on_set,
            'successors': set(), # Populated below
            'unmet_dependencies': unmet_dependency_count
        }

        # Store the job centrally FIRST
        self.all_jobs[job_id] = job_info

        # Update successor lists for dependency jobs
        for dep_id in depends_on_set:
             # Check if the dependency job still exists (it should, based on earlier check)
             if dep_id in self.all_jobs:
                 self.all_jobs[dep_id]['successors'].add(job_id)

        # Add job to the pending heap *only if* it has no unmet dependencies initially
        if unmet_dependency_count == 0:
            heapq.heappush(self.pending_jobs_heap, (priority, submit_time, job_id))
            print(f"  Job {job_id} has no unmet dependencies and added to pending queue.")
        else:
             print(f"  Job {job_id} is waiting for {unmet_dependency_count} dependencies.")

        print(f"Job {job_id} ('{command}') submitted with priority {priority}, requires {required_slots} slots, duration {duration:.1f}s, depends on {depends_on_set or '{}'}.")
        return job_id

    def _mark_job_finished(self, job_id: int, final_state: str):
        """ Helper to handle common logic for COMPLETED/FAILED jobs """
        if job_id not in self.running_jobs:
             # Should not happen if called correctly, but good robustness check
             print(f"Warning: Trying to mark non-running job {job_id} as {final_state}.")
             return

        job = self.running_jobs[job_id]
        print(f"  Job {job_id} ('{job['command']}') finished with state {final_state}.")

        # 1. Update state in central tracker
        job['state'] = final_state

        # 2. Release slots
        self.free_slots += job['required_slots']

        # 3. Move from running to final state dict
        if final_state == COMPLETED:
            self.completed_jobs[job_id] = job
        elif final_state == FAILED:
            self.failed_jobs[job_id] = job
        # Remove from running AFTER processing successors
        # del self.running_jobs[job_id] # Done after loop

        # 4. Process dependencies for successors
        print(f"  Processing successors for job {job_id}: {job['successors']}")
        for successor_id in job['successors']:
            if successor_id not in self.all_jobs:
                print(f"Warning: Successor job ID {successor_id} not found for completed job {job_id}.")
                continue

            successor_job_info = self.all_jobs[successor_id]

            # If the parent job FAILED, the dependent job also fails (common policy)
            if final_state == FAILED and successor_job_info['state'] == PENDING:
                 print(f"  Marking dependent job {successor_id} as FAILED due to parent failure.")
                 successor_job_info['state'] = FAILED
                 self.failed_jobs[successor_id] = successor_job_info
                 # Potentially cascade failure further? Depends on desired policy.
                 # For now, just fail this direct successor.
                 continue # Skip decrementing count for failed successors

            # If parent COMPLETED and successor is PENDING, decrement count
            if final_state == COMPLETED and successor_job_info['state'] == PENDING:
                if successor_job_info['unmet_dependencies'] > 0:
                    successor_job_info['unmet_dependencies'] -= 1
                    print(f"  Decremented dependency count for job {successor_id} to {successor_job_info['unmet_dependencies']}.")

                    # If dependencies are now met, add to the ready heap
                    if successor_job_info['unmet_dependencies'] == 0:
                        print(f"    Job {successor_id} dependencies met. Adding to ready heap.")
                        heapq.heappush(self.pending_jobs_heap,
                                     (successor_job_info['priority'],
                                      successor_job_info['submit_time'],
                                      successor_id))
                else:
                    # This case (count already 0) shouldn't happen if logic is correct
                    print(f"Warning: Successor job {successor_id} had unmet_dependencies=0 but was PENDING.")


    def _check_running_jobs(self):
        """
        Internal helper to check running jobs for completion or failure.
        If a job finishes, update successors via _mark_job_finished.
        """
        current_time = time.time()
        finished_ids_this_cycle = []

        for job_id in list(self.running_jobs.keys()):
            job = self.running_jobs[job_id]

            # Simulate potential failure (e.g., 5% chance for demo)
            # import random
            # if random.random() < 0.05:
            #     self._mark_job_finished(job_id, FAILED)
            #     finished_ids_this_cycle.append(job_id)
            #     continue # Move to next job

            # Check for normal completion
            if job['start_time'] is not None and current_time >= job['start_time'] + job['simulated_duration']:
                self._mark_job_finished(job_id, COMPLETED)
                finished_ids_this_cycle.append(job_id)

        # Remove finished jobs from running_jobs *after* iterating and processing successors
        for job_id in finished_ids_this_cycle:
             if job_id in self.running_jobs: # Check if it wasn't already removed (e.g., by failure logic)
                 del self.running_jobs[job_id]


    def _try_start_pending_jobs(self):
        """
        Internal helper to check the pending heap and start the highest-priority,
        dependency-free jobs that fit the available resources.
        """
        jobs_started_this_cycle = 0
        # Use a temporary list to hold jobs popped but maybe not started
        candidates_to_requeue = []

        # Keep trying as long as there are ready jobs and potential slots
        while self.pending_jobs_heap and self.free_slots > 0:
            # Peek at the highest priority ready job
            priority, submit_time, next_job_id = self.pending_jobs_heap[0]

            # Get the full job info
            if next_job_id not in self.all_jobs:
                 print(f"Warning: Job ID {next_job_id} from heap not found in all_jobs. Discarding.")
                 heapq.heappop(self.pending_jobs_heap) # Remove inconsistent entry
                 continue

            job_info = self.all_jobs[next_job_id]

            # Sanity checks (should already be true if logic is correct)
            if job_info['state'] != PENDING:
                print(f"Warning: Job {next_job_id} in heap has state {job_info['state']}. Discarding.")
                heapq.heappop(self.pending_jobs_heap)
                continue
            if job_info['unmet_dependencies'] != 0:
                print(f"Warning: Job {next_job_id} in heap has {job_info['unmet_dependencies']} unmet dependencies. Discarding.")
                heapq.heappop(self.pending_jobs_heap)
                continue

            # Check if resources are available
            if self.free_slots >= job_info['required_slots']:
                # Resources available, start the job
                # Pop *now* that we know we can run it
                heapq.heappop(self.pending_jobs_heap)

                job_info['state'] = RUNNING
                job_info['start_time'] = time.time()
                self.free_slots -= job_info['required_slots']
                self.running_jobs[job_info['id']] = job_info

                print(f"  Starting Job {job_info['id']} (Prio {job_info['priority']}) ('{job_info['command']}'), using {job_info['required_slots']} slots.")
                jobs_started_this_cycle += 1
                # Continue loop: try to schedule the *next* highest priority job
            else:
                # Highest priority ready job cannot fit, so stop for this cycle
                # (no backfilling implemented)
                # print(f"  Job {job_info['id']} (Prio {priority}) needs {job_info['required_slots']}, only {self.free_slots} free. Waiting.")
                break # Stop trying to schedule

        # Note: Requeuing logic removed as popping only happens if runnable.

        if jobs_started_this_cycle == 0 and self.pending_jobs_heap:
             print("  No pending jobs could be started this cycle (highest priority ready job may be waiting for resources).")


    def run_scheduler_cycle(self):
        """
        Executes one cycle of the scheduler logic.
        """
        print(f"\n--- Running Scheduler Cycle at {time.time():.2f} ---")
        # self.show_status() # Optional: Show status before changes

        # Check completions/failures first to free slots and resolve dependencies
        self._check_running_jobs()
        # Then try to start jobs based on priority, dependencies, and resources
        self._try_start_pending_jobs()

        print("--- Cycle Complete ---")
        self.show_status() # Show status after changes

    def get_job_status(self, job_id: int) -> str | None:
        """
        Gets the current state of a specific job.
        """
        if job_id in self.all_jobs:
            return self.all_jobs[job_id]['state']
        return None # Job ID not found

    def show_status(self):
        """
        Prints a summary of the scheduler's current state.
        """
        print(f"  Status: {self.free_slots}/{self.total_slots} slots free.")
        pending_ready_count = len(self.pending_jobs_heap)
        # Example: Get IDs from heap (less efficient, for display only)
        # temp_heap = copy.deepcopy(self.pending_jobs_heap)
        # ready_ids = [heapq.heappop(temp_heap)[2] for _ in range(len(temp_heap))]
        # print(f"  Pending Jobs (Ready in Heap): {pending_ready_count} {ready_ids}")
        print(f"  Pending Jobs (Ready in Heap): {pending_ready_count}")

        waiting_deps_count = 0
        waiting_deps_ids = []
        for job_id, job in self.all_jobs.items():
             if job['state'] == PENDING and job['unmet_dependencies'] > 0:
                 waiting_deps_count += 1
                 waiting_deps_ids.append(job_id)
        print(f"  Pending Jobs (Waiting Deps): {waiting_deps_count} {waiting_deps_ids}")

        running_ids = list(self.running_jobs.keys())
        completed_ids = list(self.completed_jobs.keys())
        failed_ids = list(self.failed_jobs.keys())
        print(f"  Running Jobs: {len(running_ids)} {running_ids}")
        print(f"  Completed Jobs: {len(completed_ids)} {completed_ids}")
        print(f"  Failed Jobs: {len(failed_ids)} {failed_ids}")


# --- Example Usage ---
if __name__ == "__main__":
    print("Starting Dependency Scheduler Simulation")
    scheduler = DependencyScheduler(total_slots=10)

    # jA -> jB (high prio)
    # jA -> jC (low prio)
    # (jB, jC) -> jD
    # jE (independent)
    try:
        jA = scheduler.submit_job("Task A", 4, priority=10, duration=4.0)
        jB = scheduler.submit_job("Task B", 3, priority=5, duration=3.0, depends_on=[jA])
        jC = scheduler.submit_job("Task C", 5, priority=15, duration=5.0, depends_on=[jA])
        jD = scheduler.submit_job("Task D", 2, priority=8, duration=2.0, depends_on=[jB, jC])
        jE = scheduler.submit_job("Task E", 3, priority=20, duration=3.0) # Low prio, independent

        print("\n--- Initial State ---")
        scheduler.show_status()

        # Run cycles
        for i in range(8):
            print(f"\n>>> Simulating Pass {i+1} <<<")
            scheduler.run_scheduler_cycle()

            # Check if simulation can end
            if not scheduler.pending_jobs_heap and not scheduler.running_jobs and \
               all(j['state'] != PENDING for j in scheduler.all_jobs.values()):
                 print("\nAll jobs finished or failed. Stopping simulation.")
                 break

            # Simulate time passing
            print("\n...Simulating 2 seconds passing...\n")
            time.sleep(2.0)

    except ValueError as e:
        print(f"\nError submitting job: {e}")

    print("\n--- Final State ---")
    scheduler.show_status()
    print("\nSimulation Complete.")
