# Critical Path Method

The Critical Path Method (CPM) is a project management technique used to schedule tasks and manage complex projects. It identifies the longest sequence of dependent activities (the critical path) that determines the shortest possible duration of a project. Identifying the the critical path helps project managers prioritize activities that must be completed on time to avoid delaying the entire project.

The steps of the critical path method are:
1. Define tasks with their dependencies and durations.
2. For each task, initialize:
   - Earliest Start (ES) = 0
   - Earliest Finish (EF) = 0
   - Latest Start (LS) = $\infty$
   - Latest Finish (LF) = $\infty$
3. Perform forward pass to calculate ES and EF for each task:
   - For each task in topological order:
     - ES[task] = max(EF[all predecessors])
     - EF[task] = ES[task] + duration[task]
4. Identify the project duration as the maximum EF of any task.
5. Perform backward pass to calculate LS and LF for each task:
   - For each task in reverse topological order:
     - LF[task] = min(LS[all successors])
     - LS[task] = LF[task] - duration[task]
6. Calculate slack for each task:
   Slack[task] = LS[task] - ES[task]
7. Identify the critical path:
   - Tasks with Slack = 0 form the critical path.
   - The total project duration is determined by the total duration of tasks on the critical path.

The following subsections implement each of the described steps of the CPM.

## 1. Define tasks with their dependencies and durations.

In [None]:
# Define tasks
# project_data = {
#     "A": {"time": 4, "successors": ["C"]},
#     "B": {"time": 3, "successors": ["C", "D"]},
#     "C": {"time": 5, "successors": ["E"]},
#     "D": {"time": 5, "successors": ["E"]},
#     "E": {"time": 4, "successors": []},
# }

project_data = {
    "A1": {"time": 5, "successors": ["B1", "B2"]},
    "A2": {"time": 4, "successors": ["B2", "B3"]},
    "A3": {"time": 3, "successors": ["B3", "B4"]},
    "B1": {"time": 6, "successors": ["C1"]},
    "B2": {"time": 5, "successors": ["C1", "C2"]},
    "B3": {"time": 4, "successors": ["C2", "C3"]},
    "B4": {"time": 3, "successors": ["C3"]},
    "C1": {"time": 4, "successors": []},
    "C2": {"time": 5, "successors": []},
    "C3": {"time": 6, "successors": []}
}

for task in project_data:
    project_data[task]['predecessors'] = []

for task in project_data:
    successors = project_data[task]['successors']
    for successor in successors:
        project_data[successor]['predecessors'].append(task)

project_data

## 2. Initialization

In [None]:
start_time = 0
max_time = start_time + sum([val['time'] for val in project_data.values()])

for task in project_data:
    project_data[task]['ES'] = 0.0
    project_data[task]['EF'] = 0.0
    project_data[task]['LS'] = max_time
    project_data[task]['LF'] = max_time

## 3. Forward Pass

In [None]:
# forward pass (early start and early finish times)

print('Starting forward pass')

unprocessed_tasks = set(project_data.keys())

sequence = 0
while unprocessed_tasks:
    processed_tasks = set()
    for task in unprocessed_tasks:
        predecessors = project_data[task]['predecessors']
        
        if predecessors:
            unprocessed_predecessors = set(project_data[task]['predecessors']).intersection(unprocessed_tasks)
            if not unprocessed_predecessors:
                max_predecessor_EF = 0
                for predecessor in predecessors:
                    if project_data[predecessor]['EF'] > max_predecessor_EF:
                        max_predecessor_EF = project_data[predecessor]['EF']
                project_data[task]['ES'] = max_predecessor_EF
                project_data[task]['EF'] = max_predecessor_EF + project_data[task]['time']
                processed_tasks.add(task)
                sequence += 1
                print(f'{sequence:>2}. Processed task {task}')
        else:
            project_data[task]['ES'] = start_time
            project_data[task]['EF'] = start_time + project_data[task]['time']
            processed_tasks.add(task)
            sequence += 1
            print(f'{sequence:>2}. Processed task {task}')

    for task in processed_tasks:
        unprocessed_tasks.remove(task)

## 4. Determine Project Length

In [None]:
project_length = -1
for task in project_data:
    if project_data[task]['EF'] > project_length:
        project_length =  project_data[task]['EF']

print(f'{project_length = }')

## 5. Backward Pass

In [None]:
print('Starting backward pass')
# backward pass (late start and late finish times)

unprocessed_tasks = set(project_data.keys())

sequence = 0
while unprocessed_tasks:
    processed_tasks = set()
    for task in unprocessed_tasks:
        successors = project_data[task]['successors']
        
        if successors:
            unprocessed_successors = set(project_data[task]['successors']).intersection(unprocessed_tasks)
            if not unprocessed_successors:
                min_successor_LS = project_length
                for successor in successors:
                    if project_data[successor]['LS'] < min_successor_LS:
                        min_successor_LS = project_data[successor]['LS']
                project_data[task]['LF'] = min_successor_LS
                project_data[task]['LS'] = min_successor_LS - project_data[task]['time']
                processed_tasks.add(task)
                sequence += 1
                print(f'{sequence:>2}. Processed task {task}')
        else:
            project_data[task]['LF'] = project_length
            project_data[task]['LS'] = project_length - project_data[task]['time']
            processed_tasks.add(task)
            sequence += 1
            print(f'{sequence:>2}. Processed task {task}')

    for task in processed_tasks:
        unprocessed_tasks.remove(task)

## 6. Slack Calculations

In [None]:
for task in project_data:
    project_data[task]['slack'] = project_data[task]['LF'] - project_data[task]['EF']

## 7. Determine Critical Activities

In [None]:
crticial_activities = []
for task in project_data:
    if project_data[task]['slack'] == 0:
        project_data[task]['critical'] = True
        crticial_activities.append(task)
    else:
        project_data[task]['critical'] = False

## Visualization

The following code block defines a function to plot a Gantt chart of the project activities, with critical activities indicated in red.

In [None]:
def make_gantt_chart(project_data_dict, figsize=(8, 3)):

    import matplotlib.pyplot as plt
    
    fig, ax = plt.subplots(1, 1, figsize=figsize)
    
    for task in project_data_dict:
        start_time = project_data_dict[task]['ES']
        completion_time = project_data_dict[task]['EF']
        length = completion_time - start_time
        if project_data_dict[task]['critical']:
            task_color='red'
        else:
            task_color='lightgray'
        ax.barh(
            task, 
            length,
            left=start_time,
            color=task_color,
            edgecolor='k',
        )
        ax.text(
            start_time + length/2,
            task,
            task,
            va='center',
            ha='center',
            color='k',
        )
    ax.spines[['right', 'top']].set_visible(False)
    
    ax.set_xlabel('Time')
    ax.set_ylabel('Task')
    ax.set_title('Gantt Chart')
    plt.show()

The following code block demonstrates the functions use.

In [None]:
make_gantt_chart(project_data)

## CPM Function

The following code block defines a function that executes the CPM end-to-end.

In [None]:
def execute_CPM(
    project_data_dict: dict,
    start_time: float = 0.0,
) -> dict:
    import copy

    project_data_dict = copy.deepcopy(project_data_dict)

    for task in project_data_dict:
        project_data_dict[task]['predecessors'] = []
    
    for task in project_data_dict:
        successors = project_data_dict[task]['successors']
        for successor in successors:
            project_data_dict[successor]['predecessors'].append(task)
    
    max_time = start_time + sum([val['time'] for val in project_data_dict.values()])
    for task in project_data_dict:
        project_data_dict[task]['ES'] = 0.0
        project_data_dict[task]['EF'] = 0.0
        project_data_dict[task]['LS'] = max_time
        project_data_dict[task]['LF'] = max_time

    # forward pass
    unprocessed_tasks = set(project_data_dict.keys())    
    while unprocessed_tasks:
        processed_tasks = set()
        for task in unprocessed_tasks:
            predecessors = project_data_dict[task]['predecessors']
            if predecessors:
                unprocessed_predecessors = set(project_data_dict[task]['predecessors']).intersection(unprocessed_tasks)
                if not unprocessed_predecessors:
                    max_predecessor_EF = 0
                    for predecessor in predecessors:
                        if project_data_dict[predecessor]['EF'] > max_predecessor_EF:
                            max_predecessor_EF = project_data_dict[predecessor]['EF']
                    project_data_dict[task]['ES'] = max_predecessor_EF
                    project_data_dict[task]['EF'] = max_predecessor_EF + project_data_dict[task]['time']
                    processed_tasks.add(task)
            else:
                project_data_dict[task]['ES'] = start_time
                project_data_dict[task]['EF'] = start_time + project_data_dict[task]['time']
                processed_tasks.add(task)

        if not processed_tasks:
            print(' - Foward pass iteration completed without update. Something is wrong. Check project data')
            break
        for task in processed_tasks:
            unprocessed_tasks.remove(task)
    
    # project length computation
    project_length = 0
    for task in project_data_dict:
        if project_data_dict[task]['EF'] > project_length:
            project_length =  project_data_dict[task]['EF']
    
    # backward pass
    unprocessed_tasks = set(project_data_dict.keys())
    while unprocessed_tasks:
        processed_tasks = set()
        for task in unprocessed_tasks:
            successors = project_data_dict[task]['successors']
            if successors:
                unprocessed_successors = set(project_data_dict[task]['successors']).intersection(unprocessed_tasks)
                if not unprocessed_successors:
                    min_successor_LS = project_length
                    for successor in successors:
                        if project_data_dict[successor]['LS'] < min_successor_LS:
                            min_successor_LS = project_data_dict[successor]['LS']
                    project_data_dict[task]['LF'] = min_successor_LS
                    project_data_dict[task]['LS'] = min_successor_LS - project_data_dict[task]['time']
                    processed_tasks.add(task)
            else:
                project_data_dict[task]['LF'] = project_length
                project_data_dict[task]['LS'] = project_length - project_data_dict[task]['time']
                processed_tasks.add(task)
    
        if not processed_tasks:
            print(' - Backward pass iteration completed without update. Something is wrong. Check project data')
            break
        for task in processed_tasks:
            unprocessed_tasks.remove(task)
    
    # slack computations
    for task in project_data_dict:
        project_data_dict[task]['slack'] = project_data_dict[task]['LF'] - project_data_dict[task]['EF']
    
    # determine critical activities
    crticial_activities = []
    for task in project_data_dict:
        if project_data_dict[task]['slack'] == 0:
            project_data_dict[task]['critical'] = True
            crticial_activities.append(task)
        else:
            project_data_dict[task]['critical'] = False

    return project_data_dict

## End-to-end usage

In [None]:
my_project_data = {
    "A": {"time": 4, "successors": ["C"]},
    "B": {"time": 3, "successors": ["C", "D"]},
    "C": {"time": 5, "successors": ["E"]},
    "D": {"time": 5, "successors": ["E"]},
    "E": {"time": 4, "successors": []},
}
my_project_data = execute_CPM(my_project_data)
make_gantt_chart(my_project_data)