# ***Install the PySpark Package***

In [None]:
#pip install pyspark

# **Run This if you work on the Google Colab**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# ***Import Spark Modules***

In [59]:

from pyspark import SparkContext
import time
import re


# ***Initialize The Spark Session and Read the Data***

In [60]:
# Initialize Spark session
sc = SparkContext(appName="WikimediaPageViews")

# Uncomment the line below if you work on google colab
# Example path: "/content/drive/MyDrive/Big Data Ass 2/pagecounts-20160101-000000_parsed.out"
# data_path = "put-your-content-drive-path-to-the-input-file-here/pagecounts-20160101-000000_parsed.out"

# Uncomment the line below if you work on local
data_path = "./pagecounts-20160101-000000_parsed.out"

lines = sc.textFile(data_path)

In [61]:
# Load and parse the data
def parse_line(line):
    parts = line.strip().split(' ')
    if len(parts) < 4:
        return None
    project, title, hits, size = parts[0], parts[1], int(parts[2]), int(parts[3])
    return (project, title, hits, size)

In [62]:
parsed = lines.map(parse_line).filter(lambda x: x is not None)

# ***Function 1: Compute min, max, and average page size***

In [None]:
# Function_1 - Spark Map Reduce

def function_1_map_reduce(parsed_rdd):
    start = time.time()
    print(f"Start time: {start}")
    
    # Extract page sizes
    page_sizes = parsed_rdd.map(lambda x: x[3])  # x[3] is page size
    
    # MapReduce operations
    min_size = page_sizes.min()
    max_size = page_sizes.max()
    
    total_size = page_sizes.sum()
    count = page_sizes.count()
    avg_size = total_size / count if count > 0 else 0

    end = time.time()
    print(f"End time: {end}")
    
    print(f"Min Page Size: {min_size}")
    print(f"Max Page Size: {max_size}")
    print(f"Average Page Size: {avg_size:.2f}")

    
# running function 1   
function_1_map_reduce(parsed)

In [None]:
# Function_1 - Spark Loops
def function_1_spark_loop(rdd):
    print("\nQ1 Spark foreach")
    start = time.time()
    print(f"Start Time: {start}")

    sum_size = sc.accumulator(0)
    count = sc.accumulator(0)

    min_size = float('inf')
    max_size = float('-inf')

    def update_stats(x):
        size = x[3]
        sum_size.add(size)
        count.add(1)
        return size 

    rdd.foreach(update_stats)

    sizes = rdd.map(lambda x: x[3]).collect()
    min_size = min(sizes)
    max_size = max(sizes)

    end = time.time()
    print(f"End Time: {end}")
    print(f"Min page size: {min_size}")
    print(f"Max page size: {max_size}")
    print(f"Average page size: {sum_size.value / count.value:.2f}")
    print(f"Time: {end - start:.4f} seconds")

function_1_spark_loop(parsed)

In [None]:
# Function_1 - Normal Loops
def function_1_normal_loop(rdd):
    start_time = time.time()
    min_page_title = float('inf')
    max_page_title = float('-inf')
    sum_page_title = 0
    count = 0

    for _, _, _, value in rdd.toLocalIterator():
        if value < min_page_title:
            min_page_title = value

        if value > max_page_title:
            max_page_title = value

        sum_page_title += value
        count += 1

    avg_page_title = sum_page_title / count

    print("Minimum page title size:", min_page_title)
    print("Maximum page title size:", max_page_title)
    print("Average page title size:", avg_page_title)
    end_time = time.time()
    elapsed_time = end_time - start_time
    print("Elapsed time:", elapsed_time, "seconds")


function_1_normal_loop(parsed)

# ***Function 2: Count page titles starting with "The" not in English project***

In [None]:
# Function_2 - Spark Map Reduce
def function_2_map_reduce(parsed_rdd):
    start = time.time()
    print(f"Start time: {start}")
    
    titles_with_the = parsed_rdd.filter(lambda x: x[1].startswith("The")) # x[1] is title
    
    count_titles_with_the = titles_with_the.count()
    
    non_english_titles_with_the = titles_with_the.filter(lambda x: x[0] != "en") # x[0] is project code
    
    # Count how many non-English titles start with "The"
    count_non_english = non_english_titles_with_the.count()

    end = time.time()
    print(f"End time: {end}")
    
    print(f"Total titles starting with 'The': {count_titles_with_the}")
    print(f"Titles starting with 'The' and NOT in English project: {count_non_english}")
    
    
# running function 2
function_2_map_reduce(parsed)


Total titles starting with 'The': 45020
Titles starting with 'The' NOT in English project: 10292


In [None]:
# Function_2 - Spark Loops
def function_2_spark_loop(rdd):
    print("\nQ2 Spark foreach")
    start = time.time()
    print(f"Start Time: {start:.4f}")

    total_count = sc.accumulator(0)
    not_en_count = sc.accumulator(0)

    def count_titles(x):
        if x[1].startswith("The"):
            total_count.add(1)
            if x[0] != "en":
                not_en_count.add(1)

    rdd.foreach(count_titles)

    end = time.time()
    print(f"End Time: {end:.4f}")
    print(f"Titles starting with 'The': {total_count.value}")
    print(f"Titles not in English: {not_en_count.value}")
    print(f"Time: {end - start:.4f} seconds")


function_2_spark_loop(parsed)

In [None]:
# Function_2 - Normal Loops
def function_2_normal_loop(rdd):
    print("\nQ2 Normal Loops")
    total = 0
    not_en = 0
    start = time.time()
    print(f"Start Time: {start:.4f}")

    for x in rdd.toLocalIterator():
        if x[1].startswith("The"):
            total += 1
            if x[0] != "en":
                not_en += 1

    end = time.time()
    print(f"End Time: {end:.4f}")
    print(f"Titles starting with 'The': {total}")
    print(f"Titles not in English: {not_en}")
    print(f"Time: {end - start:.4f} seconds")


function_2_normal_loop(parsed)

# ***Function 3: Count unique terms in page titles***

In [None]:
# Function_3 - Spark Map Reduce

In [None]:
# Function_3 - Spark Loops
def function_3_spark_loop(rdd):
    print("\nQ3 Spark foreach")
    start = time.time()
    print(f"Start Time: {start}")

    all_terms = rdd.flatMap(lambda x: re.split(r'_', x[1].lower())).collect()

    unique_terms = set(all_terms)

    end = time.time()
    print(f"End Time: {end}")
    print(f"Number of unique terms: {len(unique_terms)}")
    print(f"Time: {end - start:.4f} seconds")


function_3_spark_loop(parsed)

In [None]:
# Function_3 - Normal Loops
def function_3_normal_loop(rdd):
    print("\nQ3 Normal Loops")
    start = time.time()
    print(f"Start Time: {start}")

    terms = set()
    for x in rdd.toLocalIterator():
        for term in re.split(r'_', x[1].lower()):
            terms.add(re.sub(r'[^a-zA-Z0-9]', '', term))

    end = time.time()
    print(f"End Time: {end}")
    print(f"Unique terms: {len(terms)}")
    print(f"Time: {end - start:.4f} seconds")


function_3_normal_loop(parsed)

# ***Function 4: Extract title counts***

In [None]:
# Function_4 - Spark Map Reduce

In [None]:
# Function_4 - Spark Loops
def function_4_spark_loop(rdd):
    print("\nQ4 Spark foreach")
    start = time.time()
    print(f"Start Time: {start}")

    title_counts = rdd.map(lambda x: (x[1], 1)).collect()

    counts = {}
    for title, count in title_counts:
        counts[title] = counts.get(title, 0) + count

    end = time.time()
    print(f"End Time: {end}")
    print("First 5 title counts:")
    for i, (title, count) in enumerate(list(counts.items())[:5]):
        print(f"{title}: {count}")
    print(f"Time: {end - start:.4f} seconds")


function_4_spark_loop(parsed)

In [None]:
# Function_4 - Normal Loops
def function_4_normal_loop(rdd):
    print("\nQ4 Normal Loops")
    start = time.time()
    print(f"Start Time: {start}")

    counts = {}
    for x in rdd.toLocalIterator():
        counts[x[1]] = counts.get(x[1], 0) + 1

    end = time.time()
    print(f"End Time: {end}")

    results = list(counts.items())[:5]
    end = time.time()
    print(f"End Time: {end}")
    print("First 5 title counts:")
    for title, count in results:
        print(f"{title}: {count}")
    print(f"Time: {end - start:.4f} seconds")


function_4_normal_loop(parsed)

# ***Function 5: Combine pages with same title***

In [None]:
# Function_5 - Spark Map Reduce

In [None]:
# Function_5 - Spark Loops
def function_5_spark_loop(rdd):
    print("\nQ5 Spark foreach")
    start = time.time()
    print(f"Start Time: {start}")

    title_groups = rdd.map(lambda x: (x[1], x)).collect()

    grouped = {}
    for title, record in title_groups:
        if title not in grouped:
            grouped[title] = []
        grouped[title].append(record)

    multiple_pages = {k: v for k, v in grouped.items() if len(v) > 1}

    end = time.time()
    print(f"End Time: {end}")
    print("First 5 titles with multiple pages:")
    for i, (title, pages) in enumerate(list(multiple_pages.items())[:5]):
        print(f"\nTitle: {title}")
        print("Pages:")
        for page in pages:
            print(f"  Project: {page[0]}, Hits: {page[2]}, Size: {page[3]}")
    print(f"Time: {end - start:.4f} seconds")


function_5_spark_loop(parsed)

In [None]:
# Function_5 - Normal Loops
def function_5_normal_loop(rdd):
    print("\nQ5 Normal Loops")
    start = time.time()
    print(f"Start Time: {start}")

    data = rdd.toLocalIterator()
    grouped = {}
    for x in data:
        grouped.setdefault(x[1], []).append(x)
    results = [(k, v) for k, v in grouped.items() if len(v) > 1][:5]

    end = time.time()
    print(f"End Time: {end}")
    print("First 5 titles with multiple pages:")
    for title, pages in results:
        print(f"\nTitle: {title}")
        print("Pages:")
        for page in pages:
            print(f"  Project: {page[0]}, Hits: {page[2]}, Size: {page[3]}")
    print(f"Time: {end - start:.4f} seconds")


function_5_normal_loop(parsed)

# ***End The Spark Session***

In [57]:
# Stop Spark session
sc.stop()