In [1]:
# pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=a674b35d9e3dca2de9d3e4bfe0ee915325eb104f015b6d36f2d54e14fff8658b
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from google.colab import drive
import os
from pyspark import SparkConf, SparkContext
import time
# drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Create a SparkSession
SparkContext.getOrCreate().stop()
conf = SparkConf().setAppName("PageTitleAnalysis")
sc = SparkContext(conf=conf)

In [4]:
os.chdir(r"/content/drive/MyDrive/data/")
lines = sc.textFile("/content/drive/MyDrive/data/pagecounts-20160101-000000_parsed.out")

In [None]:
# problem 1
# Using map reduce

start_time = time.time()

# Define the map function to extract the page size from each line
def extract_page_size(line):
    fields = line.split()
    if len(fields) >= 4:
        try:
            page_size = int(fields[3])
            return page_size
        except ValueError:
            pass
    return 0

# Define the reduce function to find the minimum, maximum, and average page sizes
def find_min_max_avg(page_sizes):
    min_size = float('inf')
    max_size = float('-inf')
    total_size = 0
    total_count = 0
    for size in page_sizes:
        if size < min_size:
            min_size = size
        if size > max_size:
            max_size = size
        total_size += size
        total_count += 1
    avg_size = total_size / total_count if total_count > 0 else 0
    return (min_size, max_size, avg_size)

# Apply the map function to extract the page sizes and group them by a dummy key
page_sizes = lines.map(extract_page_size).groupBy(lambda x: 0)

# Apply the reduce function to find the minimum, maximum, and average page sizes
min_max_avg_sizes = page_sizes.mapValues(find_min_max_avg)

# Extract the results from the RDD

# Extract the minimum, maximum, and average page sizes from the RDD
min_max_avg = min_max_avg_sizes.take(1)[0]

for i in range(1000000):
    pass

# stop measuring the execution time
end_time = time.time()

# Print the minimum, maximum, and average sizes
print("using map-reduce: \n")
print("Minimum page size:", min_max_avg[1][0])
print("Maximum page size:", min_max_avg[1][1])
print("Average page size:", min_max_avg[1][2])
print("Time taken:", end_time - start_time)

with open("output 1.txt", "w") as f2:
  print("using map-reduce \n\n", "Minimum page size (Spark loops):", min_max_avg[1][0] ,"\n", "Maximum page size (Spark loops):", min_max_avg[1][1] ,"\n", "Average page size (Spark loops):",  min_max_avg[1][2] ,"\n","time taken:" , end_time - start_time ,"\n", file = f2)

using map-reduce: 

Minimum page size: 0
Maximum page size: 141180155987
Average page size: 132211.70136297357
Time taken: 14.23216438293457


In [None]:
# problem 1
# Using spark loops

start_time = time.time()

# Compute the min, max, and average page size using Spark loops
min_size_spark = float('inf')
max_size_spark = float('-inf')
total_size_spark = 0
total_count_spark = 0
for line in lines.toLocalIterator():
    fields = line.split()
    if len(fields) >= 4:
        try:
            page_size = int(fields[3])
            if page_size < min_size_spark:
                min_size_spark = page_size
            if page_size > max_size_spark:
                max_size_spark = page_size
            total_size_spark += page_size
            total_count_spark += 1
        except ValueError:
            pass
avg_size_spark = total_size_spark / total_count_spark if total_count_spark > 0 else 0

for i in range(1000000):
    pass

# stop measuring the execution time
end_time = time.time()

# Print the minimum, maximum, and average page sizes using Spark loops
print("using spark loops: \n")
print("Minimum page size (Spark loops):", min_size_spark)
print("Maximum page size (Spark loops):", max_size_spark)
print("Average page size (Spark loops):", avg_size_spark)
print("time taken:" , end_time - start_time)

with open("output 1.txt", "a") as f2:
  print("using spark loops: \n \n","Minimum page size (Spark loops):", min_size_spark ,"\n", "Maximum page size (Spark loops):", max_size_spark ,"\n", "Average page size (Spark loops):", avg_size_spark ,"\n","time taken:" , end_time - start_time ,"\n", file = f2)

using spark loops: 

Minimum page size (Spark loops): 0
Maximum page size (Spark loops): 141180155987
Average page size (Spark loops): 132215.6390405622
time taken: 35.70821499824524


In [None]:
# problem 2
# Using map reduce

map_reduce_start_time = time.time()

def mapper(line):
    fields = line.split(" ")
    title = fields[1]
    language = fields[0] 

    if(language != ("en")):
        if(title.startswith("The_")):
            return (("the", "the_not_english"), (1, 1))
        else:
            return (("the", "the_not_english"), (0, 0))
    else:
        if(title.startswith("The_")):
            return (("the", "the_not_english"), (1, 0))
        else:
            return (("the", "the_not_english"), (0, 0))

def reducer(x, y):  # x = (1,1)  -----> y = (1,1)    x[0]="the" in the first key,  y[0]= "the" (in the next key)         
    return (x[0] + y[0], x[1] + y[1])              # x[1]= "the_not_english" in the first key      y[1]= "the_not_english" in the next key

mapped = lines.map(mapper)
counts = mapped.reduceByKey(reducer).collectAsMap() # counts = {("The_","the_not_english"),(41901,9160)}
result = list(counts[("the", "the_not_english")])  # result = [41901,9160]

for i in range(1000000):
    pass

# stop measuring the execution time
map_reduce_end_time = time.time()
map_reduce_time = map_reduce_end_time - map_reduce_start_time

print("Using map-reduce: \n \n", "Total number of page titles that start with 'The':", result[0],"\n",
      "Number of non-English page titles that start with 'The':", result[1],"\n",
      "map reduce Execution time:", map_reduce_time, "seconds")

with open("output 2.txt", "w") as f:
  print("Using map-reduce: \n \n", "Total number of page titles that start with 'The':", result[0],"\n",
      "Number of non-English page titles that start with 'The':", result[1],"\n",
      "map reduce Execution time:", map_reduce_time, "seconds", "\n", file=f)

Using map-reduce: 
 
 Total number of page titles that start with 'The': 41901 
 Number of non-English page titles that start with 'The': 9160 
 map reduce Execution time: 6.638685703277588 seconds


In [None]:
# problem 2
# Using Spark loops

loops_start_time = time.time()

num_of_the_pages = 0
num_of_the_non_english_pages = 0

for line in lines.toLocalIterator():
    # check if the page title starts with "The"
    fields = line.split(" ")
    if (fields[1].startswith("The_")):
        num_of_the_pages += 1
        # check if the page is not part of the English project
        if (fields[0] != "en"):
            num_of_the_non_english_pages += 1

for i in range(1000000):
    pass

# stop measuring the execution time
loops_end_time = time.time()
loops_time = loops_end_time - loops_start_time

dic = {loops_time:"spark loops algorithm", map_reduce_time:"map reduce algorithm"}  # dictionary to store the time of two algorithms
best_algo = dic[min(loops_time, map_reduce_time)]

print("Using Spark loops: \n \n" , "Total number of page titles that start with 'The':", num_of_the_pages ,"\n", 
      "Number of non-English page titles that start with 'The':", num_of_the_non_english_pages, "\n",
      "Spark loops Execution time:", loops_time, "seconds \n \n",
      "best algorithm in terms of performance:", best_algo)

with open("output 2.txt", "a") as f:
  print("Using Spark loops: \n \n" , "Total number of page titles that start with 'The':", num_of_the_pages ,"\n", 
      "Number of non-English page titles that start with 'The':", num_of_the_non_english_pages, "\n",
      "Spark loops Execution time:", loops_time, "seconds \n \n",
      "best algorithm in terms of performance:", best_algo, file=f)

Using Spark loops: 
 
 Total number of page titles that start with 'The': 41901 
 Number of non-English page titles that start with 'The': 9160 
 Spark loops Execution time: 25.064776182174683 seconds 
 
 best algorithm in terms of performance: map reduce algorithm
