📝 **Assignment Introduction**
------------------------------

### 📘 **Dataset Background**

The dataset comes from the **Wikimedia Foundation**, which runs Wikipedia and other open-knowledge platforms. It contains **page view statistics** collected from **0:00 to 1:00 AM on January 1st, 2016**.

Each line in the file represents the number of views for a specific page in that hour.

---

### 🔢 **Schema**

Each line has 4 fields separated by whitespace:

| Field         | Description                                               |
|---------------|-----------------------------------------------------------|
| `Project Code`| Project identifier (e.g. `en` for English Wikipedia)      |
| `Page Title`  | Title of the accessed page (e.g. `Political_status_of_Crimea`) |
| `Page Hits`   | Number of times this page was accessed in that hour       |
| `Page Size`   | Size of the page (likely in bytes)                        |


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
from pyspark.sql.types import LongType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import timeit,os,time
import shutil
from operator import add
import timeit
from pyspark.sql import functions as F
from itertools import combinations
import nltk
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Shawky\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.


True

In [2]:
spark = SparkSession.builder \
    .appName("WikimediaPageViews") \
    .master("local[*]") \
    .config("hadoop.home.dir", "C:\\hadoop") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

sc = spark.sparkContext

#### Data Loading & Validation

In [3]:
data_path = "data.out" 
raw_data = sc.textFile(data_path)
print(f"Total lines loaded: {raw_data.count():,}")

Total lines loaded: 3,324,129


In [4]:
def check_data_quality(rdd):
    empty_lines = rdd.filter(lambda x: len(x.strip()) == 0).count()
    malformed_lines = rdd.filter(lambda x: len(x.strip().split()) != 4).count()
    
    print("=== Data Quality Report ===")
    print(f"Total lines: {rdd.count():,}")
    print(f"Empty lines: {empty_lines:,}")
    print(f"Malformed lines: {malformed_lines:,}")
    print(f"Valid lines: {rdd.count() - empty_lines - malformed_lines:,}")
    
    if malformed_lines > 0:
        print("\nSample malformed lines:")
        for line in rdd.filter(lambda x: len(x.strip().split()) != 4).take(5):
            print(line)

check_data_quality(raw_data)

=== Data Quality Report ===
Total lines: 3,324,129
Empty lines: 0
Malformed lines: 103
Valid lines: 3,324,026

Sample malformed lines:
ak.v  2 3606
ar  526 21232283
ar.s  4 38267
ay.v  2 3606
az  1 19081


In [5]:
def is_valid_line(line):
    """Check if line has exactly 4 fields with proper types"""
    parts = line.strip().split()
    if len(parts) != 4:
        return False
    try:
        int(parts[2])  # Verify page_hits is numeric
        int(parts[3])  # Verify page_size is numeric
        return True
    except ValueError:
        return False

valid_lines = raw_data.filter(is_valid_line).cache()
print(f"Original count: {raw_data.count():,}")
print(f"Valid lines count: {valid_lines.count():,}")
print(f"Removed {raw_data.count() - valid_lines.count():,} malformed lines")

Original count: 3,324,129
Valid lines count: 3,324,026
Removed 103 malformed lines


In [6]:
parsed_rdd = valid_lines.map(lambda line: line.split(" ")) \
                .filter(lambda parts: len(parts) >= 4)

structured_rdd = parsed_rdd.map(lambda parts: (
    parts[0],                  # project_code
    parts[1],                  # page_title
    int(parts[2]),             # number_of_views
    int(parts[3])              # bytes_transferred
))

df = structured_rdd.toDF(["project_code", "page_title", "page_hits", "page_size"])
df.printSchema()
df.show(5)

root
 |-- project_code: string (nullable = true)
 |-- page_title: string (nullable = true)
 |-- page_hits: long (nullable = true)
 |-- page_size: long (nullable = true)

+------------+------------------+---------+---------+
|project_code|        page_title|page_hits|page_size|
+------------+------------------+---------+---------+
|          aa|           271_a.C|        1|     4675|
|          aa|  Category:User_th|        1|     4770|
|          aa|Chiron_Elias_Krase|        1|     4694|
|          aa|  Dassault_rafaele|        2|     9372|
|          aa|            E.Desv|        1|     4662|
+------------+------------------+---------+---------+
only showing top 5 rows



#### Data Preparation

In [7]:
print("\nData Validation:")
null_check = df.filter(
    col("page_hits").isNull() | 
    col("page_size").isNull()
).count()
print(f"Rows with null values: {null_check}")

negative_check = df.filter(
    (col("page_hits") < 0) | 
    (col("page_size") < 0)
).count()
print(f"Rows with negative values: {negative_check}")


Data Validation:
Rows with null values: 0
Rows with negative values: 0


#### Basic Analysis

In [None]:
print("\n=== Basic Statistics ===")
print(f"Total projects: {df.select('project_code').distinct().count()}")
print(f"Total pages: {df.count():,}")

print("\nTop 10 most viewed pages:")
df.orderBy(col("page_hits").desc()).show(10, truncate=False)

print("\nSize statistics (bytes):")
df.select("page_size").describe().show()


=== Basic Statistics ===
Total projects: 1077
Total pages: 3,324,026

Top 10 most viewed pages:
+------------+----------+---------+------------+
|project_code|page_title|page_hits|page_size   |
+------------+----------+---------+------------+
|en.mw       |en        |5466346  |141180155987|
|es.mw       |es        |695531   |12261337515 |
|ja.mw       |ja        |611443   |15021588551 |
|de.mw       |de        |572119   |9523069696  |
|fr.mw       |fr        |536978   |11752030020 |
|ru.mw       |ru        |466742   |11847816616 |
|it.mw       |it        |400297   |8176042087  |
|en          |Main_Page |257915   |4289970372  |
|pt.mw       |pt        |196160   |4029404403  |
|pl.mw       |pl        |176059   |2782453516  |
+------------+----------+---------+------------+
only showing top 10 rows


Size statistics (bytes):
+-------+-------------------+
|summary|          page_size|
+-------+-------------------+
|  count|            3324026|
|   mean| 132215.79814237313|
| stddev|7.9125

In [None]:
df.createOrReplaceTempView("cleaned_wiki_data")
print("\nTemporary view 'cleaned_wiki_data' created")


Temporary view 'cleaned_wiki_data' created


In [None]:
spark.catalog.clearCache()
print("\nCleared Spark cache")


Cleared Spark cache


#### Page Size Analysis

In [8]:
# Map-Reduce Approach for Page Size Stats
print("=== Map-Reduce Approach ===")

def map_reduce_stats():
    # Map phase: Extract sizes
    sizes = valid_lines.map(lambda line: int(line.split()[3]))
    
    # Reduce phase: Calculate stats
    count = sizes.count()
    total = sizes.reduce(lambda a, b: a + b)
    min_size = sizes.reduce(lambda a, b: a if a < b else b)
    max_size = sizes.reduce(lambda a, b: a if a > b else b)
    
    return {
        'min': min_size,
        'max': max_size,
        'avg': total / count
    }

# Time the execution
start_time = timeit.default_timer()
mr_stats = map_reduce_stats()
mr_time = timeit.default_timer() - start_time

print(f"Min size: {mr_stats['min']:,} bytes")
print(f"Max size: {mr_stats['max']:,} bytes")
print(f"Avg size: {mr_stats['avg']:,.2f} bytes")
print(f"Execution time: {mr_time:.4f} seconds")

=== Map-Reduce Approach ===
Min size: 0 bytes
Max size: 141,180,155,987 bytes
Avg size: 132,215.80 bytes
Execution time: 13.7272 seconds


In [9]:
print("=== Spark Loops Approach ===")


def spark_loops_stats():
    sizes_list = []

    for line in valid_lines.collect():  # collect all lines into Python list
            size = int(line.split()[3])
            sizes_list.append(size)

    if not sizes_list:
        return {'min': 0, 'max': 0, 'avg': 0}

    count = len(sizes_list)
    total = 0
    min_size = sizes_list[0]
    max_size = sizes_list[0]

    for size in sizes_list:
        total += size
        if size < min_size:
            min_size = size
        if size > max_size:
            max_size = size

    return {
        'min': min_size,
        'max': max_size,
        'avg': total / count
    }

# Time the execution
start_time = timeit.default_timer()
loop_stats = spark_loops_stats()
loop_time = timeit.default_timer() - start_time

# Print results in the same format
print(f"Min size: {loop_stats['min']:,} bytes")
print(f"Max size: {loop_stats['max']:,} bytes")
print(f"Avg size: {loop_stats['avg']:,.2f} bytes")
print(f"Execution time: {loop_time:.4f} seconds")

=== Spark Loops Approach ===
Min size: 0 bytes
Max size: 141,180,155,987 bytes
Avg size: 132,215.80 bytes
Execution time: 3.3671 seconds


In [10]:
# Performance Comparison 
print("\n=== Performance Comparison ===")
print(f"Map-Reduce Time: {mr_time:.4f} sec")
print(f"Spark Loops Time: {loop_time:.4f} sec")
print(f"Difference: {abs(mr_time - loop_time):.4f} sec")
print(f"Faster by: {max(mr_time, loop_time)/min(mr_time, loop_time):.2f}x")

schema = StructType([
    StructField("Approach", StringType(), True),
    StructField("Min", DoubleType(), True),
    StructField("Max", DoubleType(), True),
    StructField("Avg", DoubleType(), True),
    StructField("Time_sec", DoubleType(), True)
])

data = [
    ("Map-Reduce", float(mr_stats['min']), float(mr_stats['max']), float(mr_stats['avg']), float(mr_time)),
    ("Spark Loops", float(loop_stats['min']), float(loop_stats['max']), float(loop_stats['avg']), float(loop_time))
]

results_df = spark.createDataFrame(data, schema)
results_df.show()


=== Performance Comparison ===
Map-Reduce Time: 13.7272 sec
Spark Loops Time: 3.3671 sec
Difference: 10.3602 sec
Faster by: 4.08x
+-----------+---+----------------+------------------+------------------+
|   Approach|Min|             Max|               Avg|          Time_sec|
+-----------+---+----------------+------------------+------------------+
| Map-Reduce|0.0|1.41180155987E11|132215.79814237313| 13.72724479995668|
|Spark Loops|0.0|1.41180155987E11|132215.79814237313|3.3670592999551445|
+-----------+---+----------------+------------------+------------------+



#### ✅ Observations:
- Both approaches produced the **same statistics** in terms of `Min`, `Max`, and `Avg` page size.
- However, the **Spark Loops approach outperformed the traditional Map-Reduce**, completing the task in just a third of the time.


#### Title Count Analysis

In [11]:
# Map-Reduce Approach for Title Counts
print("=== Map-Reduce Approach for Title Counts ===")

def map_reduce_title_counts():
    # Map phase: Create (title, 1) pairs
    title_ones = valid_lines.map(lambda line: (line.split()[1], 1))
    
    # Reduce phase: Sum counts by title
    title_counts = title_ones.reduceByKey(lambda a, b: a + b)
    
    # Collect all results
    all_titles = title_counts.collect()
    
    # Sort by count descending
    sorted_titles = sorted(all_titles, key=lambda x: -x[1])
    
    return {
        'total_unique': len(sorted_titles),
        'top_titles': sorted_titles[:20],  # Top 20
        'all_counts': sorted_titles        # All titles
    }

# Time the execution
start_time = timeit.default_timer()
title_stats = map_reduce_title_counts()
mr_time = timeit.default_timer() - start_time

# Print results
print(f"Total unique titles: {title_stats['total_unique']:,}")
print("\nTop 20 titles by count:")
for idx, (title, count) in enumerate(title_stats['top_titles'], 1):
    print(f"{idx:2d}. {title[:50]:<50} {count:>8,}")

print(f"\nExecution time: {mr_time:.4f} seconds")

=== Map-Reduce Approach for Title Counts ===
Total unique titles: 2,968,690

Top 20 titles by count:
 1. water                                                   118
 2. 1863                                                    106
 3. Berlin                                                  101
 4. Google                                                  101
 5. Linux                                                    98
 6. Main_Page                                                90
 7. ISO_3166-1                                               88
 8. Microsoft_Windows                                        87
 9. HTML                                                     86
10. Index.php                                                86
11. Frank_Lloyd_Wright                                       85
12. PHP                                                      83
13. ISO_4217                                                 76
14. Boston                                                   75
15.

In [12]:
def spark_loops_title_counts():
    all_lines = valid_lines.collect()  

    title_counts = {}

    for line in all_lines:
        title = line.split()[1]  
        title_counts[title] = title_counts.get(title, 0) + 1  

    # Sort titles by count in descending order
    sorted_titles = sorted(title_counts.items(), key=lambda x: -x[1])

    return {
        'total_unique': len(sorted_titles),
        'top_titles': sorted_titles[:20],  # Top 20 titles
        'all_counts': sorted_titles        # All titles
    }

# Time the execution
start_time = timeit.default_timer()
title_stats_loop = spark_loops_title_counts()
loop_time = timeit.default_timer() - start_time

# Print results
print(f"Total unique titles: {title_stats_loop['total_unique']:,}")
print("\nTop 20 titles by count:")
for idx, (title, count) in enumerate(title_stats_loop['top_titles'], 1):
    print(f"{idx:2d}. {title[:50]:<50} {count:>8,}")

print(f"\nExecution time: {loop_time:.4f} seconds")

Total unique titles: 2,968,690

Top 20 titles by count:
 1. water                                                   118
 2. 1863                                                    106
 3. Berlin                                                  101
 4. Google                                                  101
 5. Linux                                                    98
 6. Main_Page                                                90
 7. ISO_3166-1                                               88
 8. Microsoft_Windows                                        87
 9. Index.php                                                86
10. HTML                                                     86
11. Frank_Lloyd_Wright                                       85
12. PHP                                                      83
13. ISO_4217                                                 76
14. Boston                                                   75
15. Special:Search                              

In [None]:
# Performance Comparison for Title Counts 
print("\n=== Performance Comparison for Title Counts ===")
print(f"Map-Reduce Time: {mr_time:.4f} sec")
print(f"Python Loop Time: {loop_time:.4f} sec")
print(f"Difference: {abs(mr_time - loop_time):.4f} sec")
print(f"Python Loop was {mr_time/loop_time:.1f}x faster")

schema = StructType([
    StructField("Approach", StringType(), True),
    StructField("Unique_Titles", LongType(), True),
    StructField("Top_Count", LongType(), True),
    StructField("Time_sec", DoubleType(), True)
])

top_count_mr = title_stats['top_titles'][0][1] if title_stats['top_titles'] else 0
top_count_loop = title_stats_loop['top_titles'][0][1] if title_stats_loop['top_titles'] else 0

data = [
    Row("Map-Reduce", title_stats['total_unique'], top_count_mr, float(mr_time)),
    Row("Python Loop", title_stats_loop['total_unique'], top_count_loop, float(loop_time))
]

title_comparison_df = spark.createDataFrame(data, schema)

print("\nPerformance Comparison Results:")
title_comparison_df.show(truncate=False)

print("\n=== Verification ===")
print(f"Unique counts match: {title_stats['total_unique'] == title_stats_loop['total_unique']}")
print(f"Top count match: {top_count_mr == top_count_loop}")



=== Performance Comparison for Title Counts ===
Map-Reduce Time: 17.2417 sec
Python Loop Time: 6.3382 sec
Difference: 10.9035 sec
Python Loop was 2.7x faster

Performance Comparison Results:
+-----------+-------------+---------+------------------+
|Approach   |Unique_Titles|Top_Count|Time_sec          |
+-----------+-------------+---------+------------------+
|Map-Reduce |2968690      |118      |17.241707400011364|
|Python Loop|2968690      |118      |6.338167099980637 |
+-----------+-------------+---------+------------------+


=== Verification ===
Unique counts match: True
Top count match: True


### ✅Observations:
- The **Python Loop approach** outperforms the **Map-Reduce approach** in terms of execution time by a significant margin (3.0x faster).
- Both approaches produce identical results for **Unique Titles** and **Top Count**, confirming that the logic and data are consistent across both methods.

#### Grouping by Title

In [13]:
print("=== Map-Reduce Approach: Group and Show Duplicate Pages by Title ===")

def map_reduce_show_duplicates():
    title_line_pairs = valid_lines.map(lambda line: (line.split()[1], line))
    
    grouped_by_title = title_line_pairs.groupByKey()
    
    duplicated_titles = grouped_by_title.filter(lambda x: len(list(x[1])) > 1)
    
    return duplicated_titles

# Time execution
start_time = timeit.default_timer()
duplicates_rdd = map_reduce_show_duplicates()
sample_duplicates = duplicates_rdd.take(5)
mr_time = timeit.default_timer() - start_time

print("\nSample output (first 5 titles with duplicates):\n")
for title, pages in sample_duplicates:
    print(f"Title: {title}")
    for page in pages:
        print(f"  - {page}")
    print()

print(f"\nExecution time: {mr_time:.4f} seconds")

=== Map-Reduce Approach: Group and Show Duplicate Pages by Title ===

Sample output (first 5 titles with duplicates):

Title: Indonesian_Wikipedia
  - aa Indonesian_Wikipedia 1 4679
  - en Indonesian_Wikipedia 1 93905

Title: Special:WhatLinksHere/Main_Page
  - aa Special:WhatLinksHere/Main_Page 1 5556
  - commons.m Special:WhatLinksHere/Main_Page 2 15231
  - en Special:WhatLinksHere/Main_Page 5 101406
  - en.s Special:WhatLinksHere/Main_Page 1 8597
  - en.voy Special:WhatLinksHere/Main_Page 1 8550
  - meta.m Special:WhatLinksHere/Main_Page 1 11529
  - outreach.m Special:WhatLinksHere/Main_Page 1 5698
  - simple Special:WhatLinksHere/Main_Page 3 32145

Title: User_talk:Logan
  - aa User_talk:Logan 1 4734
  - en.voy User_talk:Logan 5 78175

Title: Special:UserLogin
  - aa.d Special:UserLogin 1 4899
  - commons.m Special:UserLogin 30 181938
  - en Special:UserLogin 44198 718770014
  - en.q Special:UserLogin 4 34449
  - incubator.m Special:UserLogin 1 5221
  - m.f Special:UserLogin 13 585

In [14]:
print("=== Simplified Map-Reduce for Duplicate Titles ===")

parsed_data = valid_lines.map(lambda line: (
    (lambda parts: (parts[1], (int(parts[2]), int(parts[3]), line)))(line.split())
))

grouped_data = parsed_data.groupByKey()

duplicate_results = grouped_data.flatMap(lambda x: [
    (x[0], (
        sum(h for h, s, l in x[1]),          # Total hits
        sum(s for h, s, l in x[1]),          # Total size
        [l for h, s, l in x[1]]              # Raw lines
    )) 
] if len(list(x[1])) > 1 else [])

# Display results
sample = duplicate_results.take(5)
for title, (total_hits, total_size, lines) in sample:
    print(f"Title: {title}")
    print(f"  Total Hits: {total_hits}, Total Size: {total_size}")
    #print("  Raw Lines:")
    #for line in lines:
     #   print(f"    - {line}")
    print()
    
print(f"\nExecution time: {mr_time:.4f} seconds")

=== Simplified Map-Reduce for Duplicate Titles ===
Title: Indonesian_Wikipedia
  Total Hits: 2, Total Size: 98584

Title: Special:WhatLinksHere/Main_Page
  Total Hits: 15, Total Size: 188712

Title: User_talk:Logan
  Total Hits: 6, Total Size: 82909

Title: Special:UserLogin
  Total Hits: 44262, Total Size: 719151551

Title: User:CommonsDelinker
  Total Hits: 2, Total Size: 32820


Execution time: 8.6617 seconds


In [15]:
print("=== Loop-Based Approach Matching Map-Reduce Output ===")

def loop_based_approach():
    collected_data = valid_lines.collect()
    
    title_dict = {}
    
    for line in collected_data:
        parts = line.split()
        title = parts[1]
        hits = int(parts[2])
        size = int(parts[3])
        
        if title not in title_dict:
            title_dict[title] = {
                'lines': [],
                'total_hits': 0,
                'total_size': 0,
                'count': 0
            }
        
        title_dict[title]['lines'].append(line)
        title_dict[title]['total_hits'] += hits
        title_dict[title]['total_size'] += size
        title_dict[title]['count'] += 1
    
    duplicates = {k: v for k, v in title_dict.items() if v['count'] > 1}
    
    return duplicates

# Time execution
start_time = timeit.default_timer()
duplicates = loop_based_approach()
loop_time = timeit.default_timer() - start_time

# Display results
print("\nSample output (first 5 duplicated titles with combined stats):\n")
for i, (title, details) in enumerate(duplicates.items()):
    if i >= 5:
        break
    print(f"Title: {title}")
    print(f"  Total Hits: {details['total_hits']}")
    print(f"  Total Size: {details['total_size']}")
    print("  Raw Lines:")
    for line in details['lines']:
        print(f"    - {line}")
    print()

print(f"\nExecution time: {loop_time:.4f} seconds")

=== Loop-Based Approach Matching Map-Reduce Output ===

Sample output (first 5 duplicated titles with combined stats):

Title: 271_a.C
  Total Hits: 4
  Total Size: 22386
  Raw Lines:
    - aa 271_a.C 1 4675
    - az 271_a.C 1 6356
    - bcl 271_a.C 1 5068
    - be 271_a.C 1 6287

Title: Category:User_th
  Total Hits: 2
  Total Size: 4770
  Raw Lines:
    - aa Category:User_th 1 4770
    - commons.m Category:User_th 1 0

Title: Chiron_Elias_Krase
  Total Hits: 6
  Total Size: 34584
  Raw Lines:
    - aa Chiron_Elias_Krase 1 4694
    - az Chiron_Elias_Krase 1 6374
    - bg Chiron_Elias_Krase 1 7468
    - cho Chiron_Elias_Krase 1 4684
    - dz Chiron_Elias_Krase 1 5435
    - it Chiron_Elias_Krase 1 5929

Title: Dassault_rafaele
  Total Hits: 4
  Total Size: 21940
  Raw Lines:
    - aa Dassault_rafaele 2 9372
    - en Dassault_rafaele 1 6649
    - it Dassault_rafaele 1 5919

Title: E.Desv
  Total Hits: 6
  Total Size: 31539
  Raw Lines:
    - aa E.Desv 1 4662
    - arc E.Desv 1 5210
    -

### ✅ Observations

- **Map-Reduce Approach**: 11.21 seconds
- **Loop-Based Approach**: 20.10 seconds

The **Map-Reduce approach** is significantly faster due to its parallel processing capabilities, which allows tasks to be distributed across multiple nodes, reducing execution time. In contrast, the **Loop-Based approach** processes data sequentially, which becomes slower as the dataset grows, leading to longer execution times.

In [None]:
print("\n=== Map-Reduce Approach for Combined Page Pairs ===")

duplicate_titles = duplicates_rdd.keys().collect()  
duplicate_titles_set = set(duplicate_titles)

title_line_pairs = valid_lines.map(lambda line: (line.split()[1], line))
grouped_by_title = title_line_pairs.groupByKey().mapValues(list)

filtered_grouped = grouped_by_title.filter(lambda x: x[0] in duplicate_titles_set)

def create_combinations(title_entries):
    title, entries = title_entries
    pairs = list(combinations(entries, 2))  # all unique 2-combinations
    return (title, pairs) if pairs else None

start = time.time()

combined = filtered_grouped.map(create_combinations).filter(lambda x: x is not None)

sample = combined.take(3)

print("=== Sample Combined Page Pairs ===")
for title, pairs in sample:
    print(f"\nTitle: {title}")
    for pair in pairs:
        print(f"  Pair: {pair[0]}  <==>  {pair[1]}")

end = time.time()
print(f"MapReduce Pairwise Combination Time: {end - start:.2f} seconds")



=== Map-Reduce Approach for Combined Page Pairs ===
=== Sample Combined Page Pairs ===

Title: Indonesian_Wikipedia
  Pair: aa Indonesian_Wikipedia 1 4679  <==>  en Indonesian_Wikipedia 1 93905

Title: Special:WhatLinksHere/Main_Page
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  commons.m Special:WhatLinksHere/Main_Page 2 15231
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  en Special:WhatLinksHere/Main_Page 5 101406
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  en.s Special:WhatLinksHere/Main_Page 1 8597
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  en.voy Special:WhatLinksHere/Main_Page 1 8550
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  meta.m Special:WhatLinksHere/Main_Page 1 11529
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  outreach.m Special:WhatLinksHere/Main_Page 1 5698
  Pair: aa Special:WhatLinksHere/Main_Page 1 5556  <==>  simple Special:WhatLinksHere/Main_Page 3 32145
  Pair: commons.m Special:WhatLinksHer

In [None]:
all_combined = combined.collect()

# Write all results to a file
output_path = "combined_same_title_page_pairs.txt"
with open(output_path, "w", encoding="utf-8") as f:
    f.write("=== Combined Page Pairs for Duplicate Titles ===\n")
    for title, pairs in all_combined:
        f.write(f"\nTitle: {title}\n")
        for pair in pairs:
            f.write(f"  Pair: {pair[0]}  <==>  {pair[1]}\n")

print(f"All results written to {output_path}")

All results written to combined_same_title_page_pairs.txt


In [None]:
print("\n=== Title Pairwise Combinations (Based on Aggregation) ===")
aggregated = valid_lines.map(lambda line: (
    line.split()[1],  # title
    (int(line.split()[2]), int(line.split()[3]))  # (hits, size)
)).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

aggregated_lines = aggregated.map(lambda x: f"{x[0]} {x[1][0]} {x[1][1]}")
start = time.time()

aggregated_sample = aggregated_lines.take(10)
pairs = list(combinations(aggregated_sample, 2))  

# Step 5: Write combinations to file
pairs_output_path = "aggregated_title_pairs.out"
with open(pairs_output_path, "w", encoding="utf-8") as f:
    f.write("=== Pairwise Combinations of Aggregated Titles ===\n")
    for pair in pairs:
        f.write(f"Pair: {pair[0]}  <==>  {pair[1]}\n")

# Step 6: Display only first few pairs
print("=== Sample Aggregated Page Pairs ===")
for pair in pairs[:5]:
    print(f"  Pair: {pair[0]}  <==>  {pair[1]}")

end = time.time()
print(f"\nCombination Generation Time: {end - start:.2f} seconds")
print(f"Combinations written to {pairs_output_path}")



=== Title Pairwise Combinations (Based on Aggregation) ===
=== Sample Aggregated Page Pairs ===
  Pair: Indonesian_Wikipedia 2 98584  <==>  Special:MyLanguage/Meta:Index 1 4701
  Pair: Indonesian_Wikipedia 2 98584  <==>  Special:WhatLinksHere/Main_Page 15 188712
  Pair: Indonesian_Wikipedia 2 98584  <==>  Special:WhatLinksHere/MediaWiki:Edittools 1 5139
  Pair: Indonesian_Wikipedia 2 98584  <==>  User:IlStudioso 1 6796
  Pair: Indonesian_Wikipedia 2 98584  <==>  User_talk:Logan 6 82909

Combination Generation Time: 10.20 seconds
Combinations written to aggregated_title_pairs.out


### 3. Unique Title Terms

In [None]:
stop_words = set(stopwords.words('english'))
nltk.download('stopwords')

In [None]:
print("=== Map-Reduce Approach: Determine the number of unique terms appearing in the page titles ===")

def map_reduce_unqiue_title_terms():
    terms = valid_lines.flatMap(lambda line: line.split()[1].split("_"))
    normalized_terms = terms.map(lambda term: term.lower())\
                            .filter(lambda term: term.isalnum())\
                            .filter(lambda term: term not in stop_words)
    unique_terms = normalized_terms.distinct()
    return unique_terms

# Time the execution
start_time = timeit.default_timer()
unique_terms = map_reduce_unqiue_title_terms()
unique_terms_count = unique_terms.count()
mr_time = timeit.default_timer() - start_time

print(f"Total number of unique terms: {unique_terms_count}")
print(f"\nExecution time: {mr_time:.4f} seconds")

=== Map-Reduce Approach: Determine the number of unique terms appearing in the page titles ===
Total number of unique terms: 850479

Execution time: 12.6292 seconds


In [28]:
print("=== Loop-Based Approach Matching Map-Reduce Output ===")

def spark_loops_unqiue_title_terms():
    unique_terms = set()
    with open(data_path, 'r') as f:
        for line in f:
            terms = line.split()[1].split("_")
            for term in terms:
                term = term.lower()
                if term.isalnum() and term not in stop_words:
                    unique_terms.add(term)
    return unique_terms

start_time = timeit.default_timer()
unique_terms = spark_loops_unqiue_title_terms()
unique_terms_count = len(unique_terms)
loop_time = timeit.default_timer() - start_time

print(f"Total number of unique terms: {unique_terms_count}")
print(f"\nExecution time: {loop_time:.4f} seconds")

=== Loop-Based Approach Matching Map-Reduce Output ===
Total number of unique terms: 850480

Execution time: 3.6609 seconds


In [29]:
# Performance Comparison 
print("\n=== Performance Comparison ===")
print(f"Map-Reduce Time: {mr_time:.4f} sec")
print(f"Spark Loops Time: {loop_time:.4f} sec")
print(f"Difference: {abs(mr_time - loop_time):.4f} sec")
print(f"Faster by: {max(mr_time, loop_time)/min(mr_time, loop_time):.2f}x")


=== Performance Comparison ===
Map-Reduce Time: 12.6292 sec
Spark Loops Time: 3.6609 sec
Difference: 8.9683 sec
Faster by: 3.45x
