In [1]:
import numpy as np
import pandas as pd

In [2]:
def get_data(filename):
    data = pd.read_csv(filename)
    start = data['From'].tolist()
    end = data['To'].tolist()
    duration = data['Time'].tolist()
    network = dict() 
    for i in range(0,len(start)): 
        task = f'{start[i]} -> {end[i]}'
        network[task]= dict()
        network[task]['name'] = task
        network[task]['duration'] = duration[i]
        network[task]['dependencies'] = get_dependencies(start[i], start, end)
        network[task]['neighbors'] = get_neighbors(end[i], start, end)
        network[task]['ES'] = 0
        network[task]['EF'] = 0
        network[task]['LS'] = 0
        network[task]['LF'] = 0
    return start, end, duration, network

In [3]:
def print_network(start, end, duration):
    print("Network:")
    for i in range(0,len(start)):
        print(f"Task: {start[i]} -> {end[i]}\tDuration: {duration[i]}")

In [4]:
def check_feasability(start, end):
    if len(set(start)-set(end)):
        return True
    if len(set(end)-set(start)):
        return True
    return False

In [5]:
def get_dependencies(task_name, start, end):
    dependency_index = [j for j, val in enumerate(end) if val == task_name]
    if len(dependency_index):
        dependencies = [f"{start[j]} -> {end[j]}" for j in dependency_index]
        return dependencies
    return []

In [6]:
def get_neighbors(task_name, start, end):
    neighbor_index = [j for j, val in enumerate(start) if val == task_name]
    if len(neighbor_index):
        neighbors = [f"{start[j]} -> {end[j]}" for j in neighbor_index]
        return neighbors
    return []

In [7]:
def get_attribute(network, dependencies, attribute, cond="min"):
    attributes = []
    for dependency in dependencies:
        attributes.append(network[dependency][attribute])
    if cond == "min":
        return min(attributes), dependencies[attributes.index(min(attributes))]
    elif cond == "max":
        return max(attributes), dependencies[attributes.index(max(attributes))]
    else:
        print("Wrong condition passed")
        return -1

In [8]:
def forward_pass(network):
    for task in network: 
        dependencies = network[task]['dependencies']
        if len(dependencies) > 0: 
            # if not first task
            # calculate ES = maximum EF value from immediate predecessors
            network[task]['ES'], pos = get_attribute(network,dependencies,"EF","max")
            network[task]['EF'] = network[task]['ES'] + network[task]['duration']
        else: 
            #if first task
            network[task]['ES'] = 0
            network[task]['EF'] = (network[task]['duration'])
    return network

In [9]:
def backward_pass(network):
    back_network = [i for i in network.keys()]
    back_network.reverse()
    max_EF = 0
    last_task = back_network[-1]
    # Find the end node
    for task in back_network:
        if network[task]['EF'] > max_EF:
            max_EF = network[task]['EF']
            last_task = task
    for task in back_network:
        if len(network[task]["neighbors"])>0:
            # if not last task
            # calculate LF = minimum LS value from immediate successors
            network[task]['LF'], pos = get_attribute(network,network[task]["neighbors"],"LS","min")
            network[task]['LS'] = network[task]['LF'] - network[task]['duration']      
        else:
            # if last task
            network[task]['LF']=network[last_task]['EF']
            network[task]['LS']=network[task]['LF'] - network[task]['duration']
    return network

In [10]:
def print_network_table(network):
    print('task name\tduration\tES\t\tEF\t\tLS\t\tLF')
    for task in network:
        print(f"{network[task]['name']}\t\t{network[task]['duration']}\t\t{network[task]['ES']}\t\t{network[task]['EF']}\t\t{network[task]['LS']}\t\t{network[task]['LF']}")

In [11]:
def critical_nodes(network):
    nodes = []
    for task in network:
        if(network[task]['ES'] == network[task]['LS'] and network[task]['EF'] == network[task]['LF']):
            nodes.append(task)
    print(f"\nCritical Nodes: {set(nodes)}")

In [12]:
filename = 'critical_path_problem.csv'
start, end, duration, network = get_data(filename)
if check_feasability(start, end):
    network = forward_pass(network)
    network = backward_pass(network)
    print_network_table(network)
    critical_nodes(network)
else:
    print("The provided network is incorrect!")

task name	duration	ES		EF		LS		LF
1 -> 2		10		0		10		0		10
1 -> 3		8		0		8		1		9
1 -> 4		9		0		9		1		10
2 -> 5		8		10		18		10		18
4 -> 6		7		9		16		10		17
3 -> 7		16		8		24		9		25
5 -> 7		7		18		25		18		25
6 -> 7		7		16		23		18		25
5 -> 8		6		18		24		18		24
6 -> 9		5		16		21		17		22
7 -> 10		12		25		37		25		37
8 -> 10		13		24		37		24		37
9 -> 10		15		21		36		22		37

Critical Nodes: {'5 -> 8', '1 -> 2', '5 -> 7', '8 -> 10', '7 -> 10', '2 -> 5'}
