In [46]:
from pyspark import SparkContext
from itertools import combinations
import time

# Stop the existing SparkContext (if it exists)
try:
    sc.stop()
except:
    pass
# create a SparkContext
sc = SparkContext(appName="spark-task")
# load the page view statistics data into an RDD
data_file = "pagecounts-20160101-000000_parsed.out"
lines_rdd = sc.textFile(data_file)
# Extract the relevant attributes from each line and store as a tuple and split index 1 by _
data_rdd = lines_rdd.map(lambda line: tuple(line.split()))
# data format is : projectCode pageTitle pageHits pageSize

In [47]:
#Problem 1 -> Get min, max, and average of page size

# start the timer
start_time = time.time()

# Calculate the minimum, maximum, and average page size
min_size = data_rdd.filter(lambda data: len(data) == 4 and data[3].isdigit()).map(lambda data: int(data[3])).reduce(lambda x, y: min(x, y))
max_size = data_rdd.filter(lambda data: len(data) == 4 and data[3].isdigit()).map(lambda data: int(data[3])).reduce(lambda x, y: max(x, y))

sum_size = data_rdd.filter(lambda data: len(data) == 4 and data[3].isdigit()).map(lambda data: int(data[3])).reduce(lambda x, y: x + y)
count_size = data_rdd.filter(lambda data: len(data) == 4 and data[3].isdigit()).count()
avg_size = sum_size / count_size

end_time = time.time()
elapsed_time = end_time - start_time

# Write the results to a file
with open("spark-MapReduce.txt", "w",encoding="utf-8") as f:
    f.write("Spark Map Reduce \n")
    f.write("---------------------- Problem 1 ------------------ \n")
    elapsed_time = "Elapsed time:"+str(elapsed_time)+" seconds\n"
    f.write(elapsed_time)
    f.write("Min page size: {}\n".format(min_size))
    f.write("Max page size: {}\n".format(max_size))
    f.write("Avg page size: {}\n".format(avg_size))
    f.write("______________________________________________________\n")

In [48]:
#Problem 2 -> Get the number of page titles that start with "The" and are not part of the English project

# start the timer
start_time = time.time()

# Determine the number of page titles that start with "The" and are not part of the English project
num_non_en_the_pages = data_rdd.filter(lambda data: data[0] != "en" and data[1].startswith("The_")).map(lambda data: (True, 1)).reduceByKey(lambda x, y: x + y)


end_time = time.time()
elapsed_time = end_time - start_time
elapsed_time = "Elapsed time: {} seconds".format(elapsed_time)

# Write the results to a file
with open("spark-MapReduce.txt", "a",encoding="utf-8") as f:
    f.write("---------------------- Problem 2 ------------------ \n")
    elapsed_time = "Elapsed time:"+str(elapsed_time)+"\n"
    f.write(elapsed_time)
    f.write("Number of 'The' page titles that are not part of the English project: {}\n".format(num_non_en_the_pages.first()[1]))
    f.write("______________________________________________________\n")

In [49]:
#Problem 3 -> Get the number of unique terms in the page titles

# start the timer
start_time = time.time()

terms_rdd = data_rdd.flatMap(lambda data: data[1].split("_"))  

# Map each term to a key-value pair with value 1
term_map_rdd = terms_rdd.map(lambda term: (term, 1))

# Reduce by key to get the count of each term
term_count_rdd = term_map_rdd.reduceByKey(lambda x, y: x + y)

# Count the number of unique terms
unique_terms = term_count_rdd.count()

end_time = time.time()
elapsed_time = end_time - start_time
elapsed_time = "Elapsed time: {} seconds".format(elapsed_time)

# Write the results to a file
with open("spark-MapReduce.txt", "a",encoding="utf-8") as f:
    f.write("---------------------- Problem 3 ------------------ \n")
    elapsed_time = "Elapsed time:"+str(elapsed_time)+"\n"
    f.write(elapsed_time)
    f.write("Number of unique terms in the page titles : {}\n".format(unique_terms))
    f.write("______________________________________________________\n")

In [50]:
#Problem 4 -> Get the number of occurrences of each page title

# start the timer
start_time = time.time()

# Extract the page titles
titles_rdd = data_rdd.map(lambda data: data[1])

# Count the number of occurrences of each title
title_count_rdd = titles_rdd.map(lambda title: (title, 1)).reduceByKey(lambda x, y: x + y)

# # Print each row of the whole result without using collect 
# for title, count in title_count_rdd.take(100):
#     print(title, count)

end_time = time.time()
elapsed_time = end_time - start_time
elapsed_time = "Elapsed time: {} seconds".format(elapsed_time)

# Write the results to a file
with open("spark-MapReduce.txt", "a", encoding="utf-8") as f:
    f.write("---------------------- Problem 4 ------------------ \n")
    elapsed_time = "Elapsed time:"+str(elapsed_time)+"\n"
    f.write(elapsed_time)
    for title, count in title_count_rdd.collect():
        f.write("{}: {}\n".format(title, count))
    f.write("______________________________________________________\n")

In [51]:
#Problem 5 -> Get pairs of pages with the same title and combination of pairs

# start the timer
start_time = time.time()

# Group pages with the same title together
grouped_pages_rdd = data_rdd.groupBy(lambda data: data[1])

# For each group of pages with the same title, save the data for each page in a pairwise manner
page_pairs_rdd = grouped_pages_rdd.flatMap(lambda x: [(x[0], pair) for pair in combinations(x[1], 2)])

end_time = time.time()
elapsed_time = end_time - start_time
elapsed_time = "Elapsed time: {} seconds".format(elapsed_time)

# Write the results to a file
with open("spark-MapReduce.txt", "a", encoding="utf-8") as f:
    f.write("---------------------- Problem 5 ------------------ \n")
    elapsed_time = "Elapsed time:"+str(elapsed_time)+"\n"
    f.write(elapsed_time)
    
    for title, pair in page_pairs_rdd.toLocalIterator():
        f.write("Pages with title '{}':".format(title))
        f.write("\n")
        for data in pair:
            f.write("\t{}\n".format(data))
    f.write("______________________________________________________\n")
