### Part-2 Implement and analyze Dijakstra’s Shortest Path algorithm.
1. You must write a basic Dijakstra’s shortest path algorithm for the two text files that 
were generated (question2_1.txt and question2_2.txt). Where the first column of each 
row is the initial node, the second column of each row is the destination, and the third 
column is the weight associated with the connection.
2. The algorithm should read both the files and compute the shortest path between the first 
node to all the other nodes and save them in a text file named output_2.txt.
3. Find the nodes with the greatest and the least distance from the starting node and
provide a screenshot of the same.

In [1]:
from pyspark import SparkContext, SparkConf
import sys
import os

#Set environment variables for PySpark to use Python
os.environ['PYSPARK_PYTHON']='python'
os.environ['PYSPARK_DRIVER_PYTHON']='python'

#Splitting the lines of file into parts (node, neighbor and weight)
def parse_line(line):
    parts=line.strip().split()
    node=int(parts[0].strip(','))
    neighbor=int(parts[1].strip(','))
    weight=int(parts[2].strip(','))
    return node,(neighbor,weight)

#Parsing the input edge list and creating adjacency lists
def initialize_graph(input_paths,sc):
    #Converting the input files line into (node, (neighbor, weight)) form
    edges=sc.textFile(input_paths[0]).map(parse_line)
    for input_path in input_paths[1:]:
        edges=edges.union(sc.textFile(input_path).map(parse_line))
    
    #Added the weights if source and destination nodes match and reformatting to (node, (neighbor, weight))
    edges=edges.map(lambda x: ((x[0],x[1][0]),x[1][1])).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0][0], (x[0][1], x[1])))
    
    #print(edges.collect())
    #Creating adjacency list
    adjacency_list=edges.groupByKey().mapValues(list)
    print("Adjacency list size:", len(adjacency_list.collect()))
    
    #Initializing all nodes with distance as infinity except the source
    return adjacency_list.map(lambda x: (x[0], (float('inf'), x[1])))

#Emitting the distance updates for each neighbor
def mapper(node,distance_neighbors):
    distance,neighbors= distance_neighbors
    results=[(node,(distance,neighbors))]
    for neighbor,weight in neighbors:
        results.append((neighbor,(distance+weight, [])))
    return results

#Combining distances and adjacency lists, keeping the shortest distance
def reducer(data1,data2):
    distance1,neighbors1=data1
    distance2,neighbors2=data2
    return min(distance1,distance2),neighbors1+neighbors2

#Formatting the output
def format_output(node,distance_neighbors):
    distance,_=distance_neighbors
    return node,distance

#Dijkstra's algorithm
def dijkstra(sc,input_paths,output_path):
    nodes_rdd=initialize_graph(input_paths,sc)
    
    #Getting the first node as source node
    source=nodes_rdd.keys().first()
    print("Source node: ",source)
    
    #Set the distance of the source node to 0
    nodes_rdd=nodes_rdd.map(lambda x: (x[0],(0,x[1][1])) if x[0]==source else x)
    
    # Dijkstra's algorithm iteration
    converged=False
    i =0 
    while not converged:
        mapped=nodes_rdd.flatMap(lambda x:mapper(x[0],x[1]))
        reduced=mapped.reduceByKey(reducer)
        converged=nodes_rdd.join(reduced).map(lambda x:x[1][0]==x[1][1]).reduce(lambda a,b: a and b)
        i = i+1
        nodes_rdd=reduced
    
    # Formatting the output as (node, distance)
    print(i)
    results=nodes_rdd.map(lambda x: format_output(x[0],x[1]))
    data=results.collect()
    print(data)
    
    #Write the output to a output_2.txt
    with open(output_path,'w',encoding='utf-8') as file:
        for node,distance in data:
            file.write(f'{node}\t{distance}\n')
    
    #Finding the nodes with the greatest and least distance from source
    max_node=results.max(lambda x:x[1])
    min_node=results.filter(lambda x:x[1]>0).min(lambda x:x[1])
    
    print("Node with greatest distance from source node", source, "to", max_node[0], "is:", max_node[1])
    print("Node with least distance from source node ", source, "to", min_node[0], "is:", min_node[1])

try:
    sc.stop()
except NameError:
    pass

#Setting up the Spark context
conf=SparkConf().setAppName("Dijkstra").setMaster("local").set("spark.pyspark.python", "python").set("spark.pyspark.driver.python", "python")
sc=SparkContext(conf=conf)
input_paths = ["question2_1.txt", "question2_2.txt"]
output_path = "output_2.txt"

dijkstra(sc, input_paths, output_path)


Adjacency list size: 99
Source node:  0
4
[(0, 0), (2, 3), (4, 9), (6, 11), (8, 8), (10, 10), (12, 13), (14, 10), (16, 9), (18, 11), (20, 10), (22, 4), (24, 5), (26, 10), (28, 7), (30, 11), (32, 12), (34, 9), (36, 8), (38, 7), (40, 9), (42, 10), (44, 14), (46, 4), (48, 10), (50, 11), (52, 11), (54, 9), (56, 8), (58, 10), (60, 6), (62, 9), (64, 8), (66, 9), (68, 5), (70, 9), (72, 9), (74, 6), (76, 8), (78, 8), (80, 9), (82, 7), (84, 4), (86, 9), (88, 10), (90, 9), (92, 8), (94, 6), (96, 7), (98, 7), (1, 12), (3, 8), (5, 8), (7, 9), (9, 6), (11, 12), (13, 10), (15, 13), (17, 13), (19, 11), (21, 5), (23, 12), (25, 8), (27, 9), (29, 8), (31, 10), (33, 8), (35, 4), (37, 9), (39, 4), (41, 7), (43, 13), (45, 11), (47, 7), (49, 10), (51, 8), (53, 8), (55, 7), (57, 7), (59, 7), (61, 8), (63, 8), (65, 11), (67, 9), (69, 6), (71, 9), (73, 6), (75, 4), (77, 8), (79, 6), (81, 9), (83, 10), (85, 6), (87, 6), (89, 8), (91, 9), (93, 7), (95, 5), (97, 8), (99, 9)]
Node with greatest distance from sourc

In [3]:
sc.stop()

In [2]:
sc