In [1]:
import heapq
from collections import defaultdict, deque

# Example task definitions:
# Each task is a key with a tuple: (duration, [dependencies])
tasks = {
    'A': (3, []),
    'B': (2, ['A']),
    'C': (4, ['A']),
    'D': (2, ['B', 'C']),
    'E': (3, ['C']),
    'F': (1, ['D', 'E'])
}

# Number of processors available
NUM_PROCESSORS = 2

# Build a graph for successors (reverse dependencies)
successors = defaultdict(list)
for task, (duration, deps) in tasks.items():
    for dep in deps:
        successors[dep].append(task)

# Precompute the critical path length (longest duration from each task to an exit)
# For tasks with no successors, the critical path is just their own duration.
critical_path = {}

def compute_critical_path(task):
    if task in critical_path:
        return critical_path[task]
    # duration of this task
    dur = tasks[task][0]
    if task not in successors or not successors[task]:
        critical_path[task] = dur
    else:
        cp = max(compute_critical_path(succ) for succ in successors[task])
        critical_path[task] = dur + cp
    return critical_path[task]

# Compute for every task
for t in tasks:
    compute_critical_path(t)

# A* Search Implementation

# A state is defined as:
# (current_time, completed, in_progress)
#   - completed: frozenset of tasks that have finished.
#   - in_progress: tuple of (finish_time, task) for tasks currently running.
# The total cost is the current time. Our heuristic is the maximum critical path of not-yet-started tasks.
def heuristic(state):
    current_time, completed, in_progress = state
    # tasks already scheduled (either in progress or completed)
    scheduled = set(completed) | {task for (_, task) in in_progress}
    remaining = set(tasks.keys()) - scheduled
    if not remaining:
        return 0
    # Lower bound: the longest chain among the remaining tasks
    return max(critical_path[t] for t in remaining)

def is_goal(state):
    _, completed, _ = state
    return len(completed) == len(tasks)

def get_available_tasks(completed, in_progress):
    # A task is available if it is not scheduled (not in completed or in_progress)
    scheduled = set(completed) | {task for (_, task) in in_progress}
    available = []
    for task, (dur, deps) in tasks.items():
        if task in scheduled:
            continue
        # A task is ready if all its prerequisites are in completed
        if all(dep in completed for dep in deps):
            available.append(task)
    return available

def a_star_schedule():
    # Priority queue: elements are (f, state, path)
    # For debugging, we also record the actions leading to the state.
    initial_state = (0, frozenset(), tuple())
    initial_path = []
    open_list = []
    heapq.heappush(open_list, (heuristic(initial_state), initial_state, initial_path))
    visited = {}

    while open_list:
        f, state, path = heapq.heappop(open_list)
        current_time, completed, in_progress = state

        if is_goal(state):
            return current_time, path

        # Check if state was visited with a lower cost
        key = (completed, tuple(sorted(in_progress)))
        if key in visited and visited[key] <= current_time:
            continue
        visited[key] = current_time

        # Determine available processors
        processors_in_use = len(in_progress)
        free_processors = NUM_PROCESSORS - processors_in_use

        # Generate successors: try scheduling available tasks if any processors are free.
        available = get_available_tasks(completed, in_progress)
        if free_processors > 0 and available:
            for task in available:
                # Schedule this task on a free processor
                dur = tasks[task][0]
                finish_time = current_time + dur
                new_in_progress = list(in_progress)
                new_in_progress.append((finish_time, task))
                new_in_progress.sort()  # sort by finish time
                new_state = (current_time, completed, tuple(new_in_progress))
                new_path = path + [f"Time {current_time}: Start task {task} (finishes at {finish_time})"]
                h = heuristic(new_state)
                heapq.heappush(open_list, (current_time + h, new_state, new_path))
        else:
            # Otherwise, advance time to the next finishing task
            if in_progress:
                next_time = min(ft for (ft, _) in in_progress)
                # Advance time to next_time, and mark tasks finished
                new_completed = set(completed)
                new_in_progress = []
                for (ft, task) in in_progress:
                    if ft <= next_time:
                        new_completed.add(task)
                    else:
                        new_in_progress.append((ft, task))
                new_state = (next_time, frozenset(new_completed), tuple(new_in_progress))
                new_path = path + [f"Advance time to {next_time}, finished tasks: {set(new_completed) - completed}"]
                h = heuristic(new_state)
                heapq.heappush(open_list, (next_time + h, new_state, new_path))
            # If no tasks are in progress and no available tasks, then dead end (should not occur in a well-formed DAG)
    return None

# Greedy Algorithm for Comparison
def greedy_schedule():
    current_time = 0
    completed = set()
    in_progress = []
    schedule_log = []
    while len(completed) < len(tasks):
        # Schedule all available tasks up to free processors.
        available = get_available_tasks(completed, in_progress)
        free_processors = NUM_PROCESSORS - len(in_progress)
        if available and free_processors > 0:
            # Greedy choice: choose tasks with shortest duration first
            available.sort(key=lambda t: tasks[t][0])
            for task in available[:free_processors]:
                finish_time = current_time + tasks[task][0]
                in_progress.append((finish_time, task))
                schedule_log.append(f"Time {current_time}: Start task {task} (finishes at {finish_time})")
            in_progress.sort()
        else:
            if in_progress:
                next_time = min(ft for (ft, _) in in_progress)
                # Advance time and mark finished tasks.
                finished = [task for (ft, task) in in_progress if ft <= next_time]
                completed.update(finished)
                in_progress = [(ft, task) for (ft, task) in in_progress if ft > next_time]
                schedule_log.append(f"Advance time to {next_time}, finished tasks: {finished}")
                current_time = next_time
            else:
                # No available tasks and nothing in progress (should not happen if graph is connected)
                break
    return current_time, schedule_log

# Run A* search
a_star_result = a_star_schedule()
if a_star_result:
    a_star_time, a_star_path = a_star_result
    print("A* Search Schedule:")
    for action in a_star_path:
        print(action)
    print(f"Total time (A*): {a_star_time}")
else:
    print("A* Search did not find a solution.")

print("\n" + "="*40 + "\n")

# Run Greedy algorithm
greedy_time, greedy_log = greedy_schedule()
print("Greedy Schedule:")
for action in greedy_log:
    print(action)
print(f"Total time (Greedy): {greedy_time}")


A* Search Schedule:
Time 0: Start task A (finishes at 3)
Advance time to 3, finished tasks: {'A'}
Time 3: Start task C (finishes at 7)
Time 3: Start task B (finishes at 5)
Advance time to 5, finished tasks: {'B'}
Advance time to 7, finished tasks: {'C'}
Time 7: Start task E (finishes at 10)
Time 7: Start task D (finishes at 9)
Advance time to 9, finished tasks: {'D'}
Advance time to 10, finished tasks: {'E'}
Time 10: Start task F (finishes at 11)
Advance time to 11, finished tasks: {'F'}
Total time (A*): 11


Greedy Schedule:
Time 0: Start task A (finishes at 3)
Advance time to 3, finished tasks: ['A']
Time 3: Start task B (finishes at 5)
Time 3: Start task C (finishes at 7)
Advance time to 5, finished tasks: ['B']
Advance time to 7, finished tasks: ['C']
Time 7: Start task D (finishes at 9)
Time 7: Start task E (finishes at 10)
Advance time to 9, finished tasks: ['D']
Advance time to 10, finished tasks: ['E']
Time 10: Start task F (finishes at 11)
Advance time to 11, finished tasks: [