In [1]:
from pyspark import SparkConf, SparkContext
import re
import time

In [2]:
Start_time = time.time()

In [3]:
# Create a SparkContext
conf = SparkConf().setMaster("local").setAppName("PopularWikipediaPagesMapReduce")
sc = SparkContext(conf=conf)

In [4]:
def parseLine(line):
    try:
        fields = line.split()
        project = fields[0]
        title = fields[1]
        hits = int(fields[2])
        size = int(fields[3])
        return (project, title, hits, size)
    except Exception as e:
        # print("Error parsing line:", line)
        # print("Exception:", e)
        return None

# Read the data from the file using parseLine function
lines = sc.textFile("pagecounts-20160101-000000_parsed.out")
rdd = lines.map(parseLine).filter(lambda x: x is not None)


In [5]:
# Compute min, max, and average page sizes
minSize = rdd.map(lambda x: x[3]).min()
maxSize = rdd.map(lambda x: x[3]).max()
avgSize = rdd.map(lambda x: x[3]).mean()

# print("Min page size: {}, Max page size: {}, Average page size: {}".format(minSize, maxSize, avgSize))


In [6]:
# # Count page titles that start with "The"
# the_titles_count = rdd.filter(lambda x: x[1].startswith("The")).count()

# Count page titles that start with "The" and are not part of the English project
english_the_titles_count = rdd.filter(lambda x: x[0] != "en" and x[1].startswith("The")).count()
# print("The titles count: {}, English The titles count: {}".format(the_titles_count, english_the_titles_count))

# print("English The titles count: {}".format(english_the_titles_count))

In [7]:
def preprocess_title(title):
    # Lowercase the title
    title = title.lower()
    # Remove non-alphanumeric characters an
    title = re.sub(r'[^a-z0-9_]', '', title)
    return title

# Preprocess page titles
preprocessed_titles = rdd.map(lambda x: preprocess_title(x[1]))
print(preprocessed_titles.take(5))

# Split titles into terms and flatten
terms = preprocessed_titles.flatMap(lambda title: title.split("_"))
print(terms.take(5))

# Count unique terms
unique_terms_count = terms.distinct().count()

# print("Number of unique terms appearing in the page titles:", unique_terms_count)


# re.sub(): This function is used for performing substitutions based on regular expressions. It takes three main arguments:
# The first argument is the regular expression pattern to search for.
# The second argument is the replacement string, which will replace the matched pattern.
# The third argument is the string on which the operation is performed.
# r'[^a-zA-Z0-9_]': This regular expression pattern matches any character that is not alphanumeric (a-z, A-Z, 0-9) or an underscore _.
# '': This is the replacement string, which is an empty string. It means that any character that matches the pattern will be removed (replaced with nothing).
# 'title: This is the string on which the substitution operation is performed. In this case, it's the page title.
# So, the re.sub() function will remove all characters from the page title that are not alphanumeric or underscores.

# ["hello world", "foo bar", "baz"]
# map
# ["hello", "world"]
# ["foo", "bar"]
# ["baz"]
# flatMap
# ["hello", "world", "foo", "bar", "baz"]

['271_ac', 'categoryuser_th', 'chiron_elias_krase', 'dassault_rafaele', 'edesv']
['271', 'ac', 'categoryuser', 'th', 'chiron']


In [8]:
# Extract each title and the number of times it was repeated
title_counts = rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)

# for title, count in title_counts.take(10):
#     print("Title: {}, Count: {}".format(title, count))

In [9]:
# Combine between data of pages with the same title and save each pair of pages data in order to display them
# combined_titles = rdd.map(lambda x: (x[1], (x[0], x[2], x[3]))).groupByKey().mapValues(list)
combined_titles = rdd.map(lambda x: (x[1], (x[0], x[2], x[3]))).groupByKey().filter(lambda x: len(x[1]) > 1).mapValues(list)

# print("Combined titles:")
# for title, data in combined_titles.take(10):
#     print("Title: {}, Data: {}".format(title, data))

# mapValues(list) to convert the iterator of values into a list. This will give you each title paired with a list of its corresponding data tuples.

In [10]:
# create a document includes all the results of each query
with open("map_reduce_results.txt", "w", encoding="utf-8") as f:
    f.write("Min page size: {}\n".format(minSize))
    f.write("Max page size: {}\n".format(maxSize))
    f.write("Average page size: {}\n".format(avgSize))
    f.write("English The titles count: {}\n".format(english_the_titles_count))
    f.write("Number of unique terms appearing in the page titles: {}\n".format(unique_terms_count))
    f.write("\n")
    f.write("Title Counts:\n")
    for title, count in title_counts.collect():
        f.write("{}: {}\n".format(title, count))
    f.write("\n")
    f.write("Combined Titles:\n")
    for title, data_list in combined_titles.collect():
        f.write("{}:\n".format(title))
        for data in data_list:
            f.write("{}\n".format(data))


In [1]:
# Stop the existing SparkContext
sc.stop()  


NameError: name 'sc' is not defined

In [12]:
end_time = time.time()

In [13]:
total_time = end_time - Start_time
print("Total time: ", total_time)
total_time_minutes = total_time / 60
print("Total time in minutes: ", total_time_minutes)

Total time:  232.958838224411
Total time in minutes:  3.8826473037401836
