In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=64f3a4bff61271ca923ce6fec53c095b6e20c8478693e4d10b3418c3d14c4f97
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split
import time
from pyspark import SparkContext, SparkConf
from operator import add

spark = SparkSession.builder \
    .appName("Wikimedia Statistics") \
    .getOrCreate()


from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')



# Load data
data_path = "/content/drive/MyDrive/Databases/pagecounts-20160101-000000_parsed.out"  # Update this to the path of your file
df = spark.read.text(data_path)

# Define a schema for the DataFrame
df = df.withColumn("page_size", split(col("value"), " ")[3].cast("long"))

# Sample or limit data to reduce the size and manage memory usage
df = df.limit(3324129)  # Adjust the limit based on the memory capacity of your system


start_time = time.time()
# Compute statistics using Spark loops
total_page_sizes = 0
min_page_size = float('inf')
max_page_size = float('-inf')
num_page_sizes = 0

for row in df.collect():
    page_size = row.page_size
    total_page_sizes += page_size
    num_page_sizes += 1
    if page_size < min_page_size:
        min_page_size = page_size
    if page_size > max_page_size:
        max_page_size = page_size

# Avoid division by zero if no rows are returned
if num_page_sizes > 0:
    avg_page_size = total_page_sizes / num_page_sizes
else:
    avg_page_size = 0

end_time = time.time()
elapsed_time = end_time - start_time

print("Q1 loop\n")
print("Elapsed time:", elapsed_time, "seconds\n")




# Load data
data_path = "/content/drive/MyDrive/Databases/pagecounts-20160101-000000_parsed.out"
df = spark.read.text(data_path)

# Define a function to extract project code and page size
def extract_project_size(line):
    fields = line.split(" ")
    # Make sure there are enough fields and they are numeric
    if len(fields) > 3 and fields[3].isdigit():
        return int(fields[3])
    return 0  # Default value if not enough fields or not a digit

start_time = time.time()

# Map phase: Extract page sizes
page_sizes_rdd = df.rdd.map(lambda row: row.value).map(extract_project_size)

# Reduce phase: Compute statistics
total_page_sizes = page_sizes_rdd.sum()
num_page_sizes = page_sizes_rdd.count()
min2_page_size = page_sizes_rdd.min()
max2_page_size = page_sizes_rdd.max()
avg2_page_size = total_page_sizes / num_page_sizes if num_page_sizes > 0 else 0

end_time = time.time()
elapsed_time = end_time - start_time


print("Q1 map-reduce\n")
print("Elapsed time:", elapsed_time, "seconds")







# Save results to Google Drive
output_path = "/content/drive/MyDrive/Results/output.txt"
with open(output_path, 'w') as file:
    file.write("Q1 Loop: \n")
    file.write("Min Page Size: " + str(min_page_size) + "\n")
    file.write("Max Page Size: " + str(max_page_size) + "\n")
    file.write("Avg Page Size: " + str(avg_page_size) + "\n")
    file.write("Q1 Map-Reduce: \n")
    file.write("Min Page Size: " + str(min2_page_size) + "\n")
    file.write("Max Page Size: " + str(max2_page_size) + "\n")
    file.write("Avg Page Size: " + str(avg2_page_size) + "\n")
    file.write("------------------------------------------------------------------------\n")




Mounted at /content/drive
Q1 loop

Elapsed time: 57.52911972999573 seconds

Q1 map-reduce

Elapsed time: 110.9327871799469 seconds


In [None]:

start_time = time.time()

# Load data
data_path = "/content/drive/MyDrive/Databases/pagecounts-20160101-000000_parsed.out"
df = spark.read.text(data_path)

# Define a schema for the DataFrame
df = df.withColumn("project_code", split(col("value"), " ")[0]) \
       .withColumn("page_title", split(col("value"), " ")[1]) \
       .withColumn("page_hits", split(col("value"), " ")[2].cast("long")) \
       .withColumn("page_size", split(col("value"), " ")[3].cast("long")) \
       .drop("value")

# Sample or limit data to reduce the size
sampled_df = df.limit(3324129)  # Adjust this number based on your environment's capacity

# Initialize counters
titles_starting_with_the = 0
english_project_titles_starting_with_the = 0

# Loop through a more manageable subset of data
for row in sampled_df.collect():
    if row.page_title.startswith("The_"):
        titles_starting_with_the += 1
        if row.project_code == "en":
            english_project_titles_starting_with_the += 1

# Calculate the number of titles starting with "The" that are not part of the English project
non_english_project_titles_starting_with_the = titles_starting_with_the - english_project_titles_starting_with_the

end_time = time.time()
elapsed_time = end_time - start_time

print("Q2 loop:")
print("Elapsed time:", elapsed_time, "seconds\n")






start_time = time.time()

# Load data
data_path = "/content/drive/MyDrive/Databases/pagecounts-20160101-000000_parsed.out"
df = spark.read.text(data_path)

# Define a function to extract project code and page title
def extract_project_title(line):
    fields = line.split(" ")
    return (fields[0], fields[1])

# Map phase: Extract project code and page title
project_title_rdd = df.rdd.map(lambda row: row.value).map(extract_project_title)

# Reduce phase: Count page titles starting with "The"
titles_starting_with_the_2 = project_title_rdd.filter(lambda x: x[1].startswith("The_")).count()

# Reduce phase: Count page titles starting with "The" that are part of the English project
english_project_titles_starting_with_the_2 = project_title_rdd.filter(lambda x: x[1].startswith("The_") and x[0] == "en").count()

# Calculate the number of titles starting with "The" that are not part of the English project
non_english_project_titles_starting_with_the_2 = titles_starting_with_the_2 - english_project_titles_starting_with_the_2

end_time = time.time()
elapsed_time = end_time - start_time

# Print results to console
print("Q2 map reduce")
print("Elapsed time:", elapsed_time, "seconds\n")



# Save or append results to Google Drive
output_path = "/content/drive/MyDrive/Results/output.txt"
with open(output_path, 'a') as file:  # 'a' is for append mode; use 'a+' if you also need to read
    file.write("Q2 Map-Reduce: \n")
    file.write("Number of page titles starting with 'The': " + str(titles_starting_with_the_2) + "\n")
    file.write("Number of those page titles that are not part of the English project: " + str(non_english_project_titles_starting_with_the_2) + "\n")

    file.write("Q2 Loop: \n")
    file.write("Number of page titles starting with 'The': " + str(titles_starting_with_the) + "\n")
    file.write("Number of those page titles that are not part of the English project: " + str(non_english_project_titles_starting_with_the) + "\n")
    file.write("------------------------------------------------------------------------\n")


# Stop the Spark session
spark.stop()


Q2 loop:
Elapsed time: 53.31073784828186 seconds

Q2 map reduce
Elapsed time: 57.76884698867798 seconds



In [None]:
from google.colab import drive
drive.mount('/content/drive')

from google.colab import drive

# # Mount Google Drive
# drive.mount('/content/drive')

from pyspark import SparkContext, SparkConf
import time
from operator import add
# from google.colab import drive
# drive.mount('/content/drive')

conf = SparkConf().setMaster("local").setAppName("Wikimedia Project")
sc = SparkContext(conf=conf)
data = sc.textFile("/content/drive/MyDrive/Databases/pagecounts-20160101-000000_parsed.out")#/content/drive/MyDrive/pagecounts-20160101-000000_parsed.out

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Question 3

In [None]:
import time
start_time = time.time()
page_title_rdd = data.map(lambda line: line.split(" ")[1] if len(line.split(" ")) >= 2 else line.split(" ")[0])


splited_title = page_title_rdd.flatMap(lambda line: line.split('_'))


normalized_splited_title = splited_title.map(lambda line: line.lower())
normalized_splited_title = normalized_splited_title.map(lambda line: ''.join(ch for ch in line if ch.isalnum()))

from operator import add
term_count = normalized_splited_title.map(lambda term: (term,1)).reduceByKey(add)
distinct_terms = term_count.filter(lambda term: term[1] == 1)
count = distinct_terms.count()


end_time = time.time()
elapsed_time = end_time - start_time

print("Q3 map reduce:")
print("Elapsed time:", elapsed_time, "seconds")







# get page titles
start_time = time.time()
title_list = []
for line in page_title_rdd.toLocalIterator():
  if len(line.split(" ")) >= 2:
    title = line.split(" ")[1]
    title_list.append(title)

# split titles into words

splited_title = []
for line in page_title_rdd.toLocalIterator():
    words = line.split('_')
    splited_title.extend(words)

normalized_terms = []
for term in splited_title:
    term_normalized = term.lower()
    term_normalized = ''.join(ch for ch in term_normalized if ch.isalnum())
    normalized_terms.append(term_normalized)


# normalize terms

# count distinct terms
term_count = {}
for term in normalized_splited_title.toLocalIterator():
    if term not in term_count:
        term_count[term] = 1
    else:
        term_count[term] += 1

distinct_terms = sc.parallelize([(term, count) for term, count in term_count.items() if count == 1])

# get count of distinct terms
count2 = 0
for term in distinct_terms.toLocalIterator():
    count2 += 1

end_time = time.time()
elapsed_time = end_time - start_time

print("Q3 Loop:")
print("Elapsed time:", elapsed_time, "seconds")



output_path = "/content/drive/MyDrive/Results/output.txt"
with open(output_path, 'a') as file:  # 'a' is for append mode; use 'a+' if you also need to read
    file.write("Q3 Map-Reduce: \n")
    file.write(" count: "+ str(count) + "\n")
    file.write("Q3 Loop: \n")
    file.write("count: " + str(count2) + "\n")
    file.write("------------------------------------------------------------------------\n")


Q3 map reduce:
Elapsed time: 55.09373617172241 seconds
Q3 Loop:
Elapsed time: 85.36036324501038 seconds


Question 4

In [None]:
page_title_rdd = data.map(lambda line: line.split(" ")[1] if len(line.split(" ")) >= 2 else line.split(" ")[0])

start_time = time.time()
# Count the frequency of each title
freq_title = page_title_rdd.map(lambda term: (term,1)).reduceByKey(lambda a,b:a +b)
most_freq_title = freq_title.sortBy(lambda pair: pair[1], False)

# Print the results using foreach
#most_freq_title.foreach(lambda x: print(f"({x[0]}, {x[1]})"))

#most_freq_title.filter(lambda x: x[0]).saveAsTextFile('./q4544451')
# most_freq = most_freq_title.first()

end_time = time.time()
elapsed_time = end_time - start_time
print("Question 4")
print("Map-Reduce")
print("Elapsed time:", elapsed_time, "seconds")

start_time = time.time()

# Count the frequency of each title
title_counts = {}
for line in data.toLocalIterator():
    fields = line.split(" ")
    title = fields[1] if len(fields) >= 2 else fields[0]
    if title not in title_counts:
        title_counts[title] = 1
    else:
        title_counts[title] += 1

# Sort the title counts in descending order
sorted_counts = sorted(title_counts.items(), key=lambda item: item[1], reverse=True)



end_time = time.time()
elapsed_time = end_time - start_time
print("Spark Loops")
print("Elapsed time:", elapsed_time, "seconds")


output_path = "/content/drive/MyDrive/Results/output.txt"
with open(output_path, 'a') as file:  # 'a' is for append mode; use 'a+' if you also need to read
    file.write("Q4 Map-Reduce: \n")
    file.write(" counts: \n" + str(most_freq_title.take(20)) + " seconds\n")
    file.write("Q4 Loop: \n")
    file.write("counts: \n" + str(sorted_counts[:20]) + " seconds\n")
    file.write("------------------------------------------------------------------------\n")


Question 4
Map-Reduce
Elapsed time: 36.57109260559082 seconds
Spark Loops
Elapsed time: 38.06556558609009 seconds


Question 5

In [None]:
fields = data.map(lambda line: line.split(" "))

#using for loops
startTime1 = time.time()
pages ={}
for f in fields.collect():

  if f[1] in pages:
    pages[f[1]] += ", page hits: "+str(f[2])+" page size: "+str(f[3])

  else:
    pages[f[1]] = "page hits: "+str(f[2])+" page size: "+str(f[3])

endTime1 = time.time()

#using map reduce
startTime2 = time.time()

titles = fields.map(lambda field: (field[1],field[2]+" "+field[3]))
resultRdd = titles.reduceByKey(lambda d1,d2 : d1+" , "+d2)
result = resultRdd.collect()

endTime2 = time.time()

print("Question 5")
print("Spark loops")
print(endTime1 - startTime1,"seconds")
print("Map-Reduce")
print(endTime2 - startTime2,"seconds")

print(result[:20])



output_path = "/content/drive/MyDrive/Results/output.txt"
with open(output_path, 'a') as file:  # 'a' is for append mode; use 'a+' if you also need to read
    file.write("Q5 Map-Reduce: \n")
    file.write("pages with same titles: \n")
    for r in result[:50]:
      file.write(str(r) +"\n")
    file.write("Q5 Loop: \n")
    first_50_pages = list(pages.items())[:50]
    for key, value in first_50_pages:
        file.write(f"{key}: {value}\n")
    file.write("------------------------------------------------------------------------\n")


Question 5
Spark loops
25.43810534477234 seconds
Map-Reduce
32.34749627113342 seconds
[('E.Desv', '1 4662 , 1 5210 , 1 4825 , 1 5237 , 1 7057 , 1 4548'), ('Special:Contributions/5.232.61.79', '1 5805'), ('Special:ListFiles/Nyttend', '1 5032'), ('Special:WhatLinksHere/Main_Page', '1 5556 , 2 15231 , 5 101406 , 1 8597 , 1 8550 , 1 11529 , 1 5698 , 3 32145'), ('Time_Inc', '1 4672 , 1 6182 , 1 4842 , 1 4923'), ('Special:Contributions/MBisanz', '1 5674'), ('Special:UserLogin', '1 4899 , 30 181938 , 44198 718770014 , 4 34449 , 1 5221 , 13 58547 , 3 12523 , 2 8696 , 1 5311 , 2 9960 , 1 5052 , 5 49952 , 1 4989'), ('Acanthophorus_serraticornis', '1 5942 , 1 5825 , 1 5480 , 1 5510 , 1 5429 , 1 4693'), ('Allen_R._Schindler,_Jr', '1 5957 , 1 5840 , 1 4711 , 1 4900 , 1 4881 , 1 7322'), ('Annales._Histoire,_Sciences_Sociales/en/Annales_d', '1 6008 , 1 7775 , 1 5678 , 1 6100'), ('N.P.R', '1 5900 , 1 5438 , 1 5000 , 1 4853 , 1 4659 , 1 6338 , 1 7159'), ('Nord-Pas-de-Calais', '1 5925 , 1 5806 , 1 5460 

In [None]:
sc.stop()