# Assignment - 2

1. Word Count
2. Extended Word Count
3. Page Rank Algorithm
4. Dijkstra's Algorithm

# Word Count

In [25]:
import findspark
findspark.init()
from pyspark.sql.functions import explode, split, col, desc
from pyspark.sql import SparkSession
import nltk
from nltk.corpus import stopwords
import shutil
import os
import string

#nltk.download('stopwords')
stop_words = set(stopwords.words('english')) 

output_path_1 = "output_1"

# Remove the output directories if they already exist
if os.path.exists(output_path_1):
    shutil.rmtree(output_path_1)

try:
    spark = SparkSession.builder.appName("WordCountApp").master("local[*]").getOrCreate()
    sc = spark.sparkContext
except Exception as e:
    print(f"An error occurred: {e}")


# 1.Normal Word Count Problem

In [26]:
try:
    # Load text files
    text_file_1 = sc.textFile("littlewoman.txt")
    text_file_2 = sc.textFile("pride_and_prejudice.txt")
    
    # Combine both text files
    count_combined = text_file_1.union(text_file_2)
    

    count_combined_basic = count_combined.flatMap(lambda line: line.split())
    count_combined_mapping = count_combined_basic.map(lambda word: (word, 1))
    count_combined_unique = count_combined_mapping.reduceByKey(lambda a, b: a + b)
    
    # Save basic word count output
    output_path_1 = "output_1"
    count_combined_unique.saveAsTextFile(output_path_1)
    
    # Merge partition files into a single file
    with open("output_1.txt", "w") as outfile:
        for filename in sorted(os.listdir(output_path_1)):
            if filename.startswith("part-"):
                with open(os.path.join(output_path_1, filename), "r") as infile:
                    outfile.write(infile.read())

except Exception as e:
    print(f"An error occurred: {e}")



                                                                                

In [29]:
sc.stop()

# 2. Extened Word Count Program

In [38]:
import string
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Word Count Extended").master("local[*]").getOrCreate()
sc = spark.sparkContext

stop_words = set(stopwords.words('english')) 

try:
    # Load the text files again
    text_file_1 = sc.textFile("littlewoman.txt")
    text_file_2 = sc.textFile("pride_and_prejudice.txt")
    
    # Combine both text files
    count_combined = text_file_1.union(text_file_2)
    
    # Transformations
    count_combined_transformation = count_combined.flatMap(lambda line: line.translate(str.maketrans("", "", string.punctuation))\
                                                           .lower().split())
    count_combined_filtered = count_combined_transformation.filter(lambda word: word not in stop_words)
    
    # Mapping and Counting
    count_combined_mapping = count_combined_filtered.map(lambda word: (word, 1))
    count_combined_reduced = count_combined_mapping.reduceByKey(lambda a, b: a + b)
    
    # Sorting in descending order
    count_combined_sorted = count_combined_reduced.sortBy(lambda x: x[1], ascending=False)
    _
    output_path_extended = "output_1_extended"
    count_combined_sorted.saveAsTextFile(output_path_extended)
    
    # Merge partition files into a single output file for transformed word count
    with open("output_1_extended.txt", "w") as outfile:
        for filename in sorted(os.listdir(output_path_extended)):
            if filename.startswith("part-"):
                with open(os.path.join(output_path_extended, filename), "r") as infile:
                    outfile.write(infile.read())
    
except Exception as e:
    print(f"An error occurred: {e}")


                                                                                

In [39]:
text_df = spark.read.text("output_1_extended.txt")
top_25_words = text_df.limit(25)
top_25_words.show(25, truncate=False)

+------------------+
|value             |
+------------------+
|('jo', 1293)      |
|('said', 1245)    |
|('one', 1159)     |
|('mr', 1123)      |
|('little', 961)   |
|('would', 929)    |
|('could', 893)    |
|('much', 704)     |
|('like', 676)     |
|('meg', 653)      |
|('mrs', 606)      |
|('never', 605)    |
|('elizabeth', 601)|
|('amy', 588)      |
|('see', 574)      |
|('good', 572)     |
|('laurie', 564)   |
|('well', 557)     |
|('know', 557)     |
|('dont', 552)     |
|('time', 522)     |
|('go', 501)       |
|('think', 496)    |
|('must', 462)     |
|('away', 453)     |
+------------------+



In [40]:
sc.stop()

# 3. Page Rank Algorithm

In [41]:
import re
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession.builder.appName("PageRank").master("local[*]").getOrCreate()
sc = spark.sparkContext

lines = sc.textFile("question3.txt")

def parse_neighbors(line):
    parts = re.split(r':\s*\[|\]', line)
    if len(parts) < 2:
        return None
    page = parts[0].strip()
    neighbors = parts[1].strip().split(', ')
    return page, neighbors

# Create an RDD of (page, list of neighbors)
links = lines.map(parse_neighbors).filter(lambda x: x is not None)

# Initialize each page's rank to 1.0
ranks = links.mapValues(lambda _: 1.0)

iterations = 15 # Number of iterations for convergence
damping_factor = 0.85  # Damping factor for PageRank

# PageRank algorithm for a fixed number of iterations
for _ in range(iterations):
    contributions = links.join(ranks).flatMap(
        lambda page_neighbors_rank: [(neighbor, page_neighbors_rank[1][1] / len(page_neighbors_rank[1][0])) 
                                     for neighbor in page_neighbors_rank[1][0]]
    )
    
    # Calculate new ranks
    ranks = contributions.reduceByKey(lambda a, b: a + b).mapValues(
        lambda rank: (1 - damping_factor) + damping_factor * rank
    )
page_ranks = ranks.collect()
with open("output_page_ranks.txt", "w") as f:
    for page, rank in page_ranks:
        f.write(f"{page}: {rank}\n")

# Find the page with the highest and lowest PageRank
max_page = max(page_ranks, key=lambda x: x[1])
min_page = min(page_ranks, key=lambda x: x[1])

print(f"Page with highest rank: {max_page}")
print(f"Page with lowest rank: {min_page}")





Page with highest rank: ('50', 4.176468539659126)
Page with lowest rank: ('35', 0.2592591391247631)


                                                                                

In [42]:
sc.stop()

# 4. Dijkstra's

In [43]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession.builder.appName("Dijkstra").master("local[*]").getOrCreate()
sc = spark.sparkContext

edges_rdd = sc.textFile('question2_1.txt').union(sc.textFile('question2_2.txt'))

# Parse the edges into (source_node, destination_node, weight)
# summing their weights
edges = edges_rdd.map(lambda line: line.strip().split(',')) \
                 .map(lambda parts: ((int(parts[0]), int(parts[1])), float(parts[2]))) \
                 .reduceByKey(lambda w1, w2: w1 + w2) \
                 .map(lambda x: (x[0][0], x[0][1], x[1]))

# Create an adjacency list RDD
adjacency_list = edges.map(lambda x: (x[0], (x[1], x[2]))) \
                      .groupByKey() \
                      .mapValues(list) \
                      .cache()

nodes_from = edges.map(lambda x: x[0])
nodes_to = edges.map(lambda x: x[1])
all_nodes = nodes_from.union(nodes_to).distinct().cache()


start_node = 0 
infinity = float('inf')
distances = all_nodes.map(lambda node: (node, infinity))
distances = distances.map(lambda x: (x[0], 0.0) if x[0] == start_node else x)
distances = distances.cache()


updated = True
iteration = 0
max_iterations = all_nodes.count() - 1 

while updated and iteration < max_iterations:
    iteration += 1
    joined = distances.join(adjacency_list, numPartitions=8)
    
    tentative_distances = joined.flatMap(lambda x: [ 
        (neighbor[0], x[1][0] + neighbor[1]) for neighbor in x[1][1]
    ])

    new_distances = distances.union(tentative_distances) \
                             .reduceByKey(lambda x, y: min(x, y))
    
    changes = new_distances.join(distances).filter(lambda x: x[1][0] != x[1][1])
    updated = not changes.isEmpty()
    
    distances = new_distances
    distances = distances.cache()


final_distances = distances.collectAsMap()


with open('output_2.txt', 'w') as f:
    for node in sorted(final_distances.keys()):
        dist = final_distances[node]
        if dist == infinity:
            f.write(f"{node} unreachable\n")
        else:
            f.write(f"{node} {dist}\n")


reachable_nodes = {node: dist for node, dist in final_distances.items() if dist != infinity and node != start_node}
if reachable_nodes:

    max_distance = max(reachable_nodes.values())
    min_distance = min(reachable_nodes.values())
    
    max_nodes = [node for node, dist in reachable_nodes.items() if dist == max_distance]
    min_nodes = [node for node, dist in reachable_nodes.items() if dist == min_distance]

    print(f"Nodes with greatest distance from {start_node} (Distance: {max_distance}): {max_nodes}")
    print(f"Nodes with least distance from {start_node} (Distance: {min_distance}): {min_nodes}")
else:
    print("No reachable nodes from the starting node.")


                                                                                

Nodes with greatest distance from 0 (Distance: 14.0): [15]
Nodes with least distance from 0 (Distance: 2.0): [16]


In [44]:
sc.stop()