In [8]:
from pyspark import SparkContext, SparkConf
import time
import os
import random

In [9]:
def setup_spark_context():
    """Initialize and return a SparkContext for the application"""
    # Configure Spark
    conf = SparkConf().setAppName("MapReduceLab").setMaster("local[*]")
    # Create Spark context
    sc = SparkContext(conf=conf)
    # Set log level to reduce console output
    sc.setLogLevel("ERROR")
    return sc

In [10]:
def word_count_mapreduce(sc, input_file):
    """Perform word count using PySpark's MapReduce approach"""
    # Time the operation
    start_time = time.time()
    
    # 1. Read the input file into an RDD
    lines_rdd = sc.textFile(input_file)

    print("Number of partitions: ", lines_rdd.getNumPartitions())
    print("Total number of lines: ", lines_rdd.count())

    print("First 10 lines of the input file:")
    for i, line in enumerate(lines_rdd.take(10)):
        print(f"Line {i}: {line}")
    
    # 2. MAP phase: Split each line into words and create (word, 1) pairs
    words_rdd = lines_rdd.flatMap(lambda line: line.split())
    word_pairs_rdd = words_rdd.map(lambda word: (word, 1))

    print("First 10 word pairs:")
    for i, word_pair in enumerate(word_pairs_rdd.take(10)):
        print(f"Word pair {i}: {word_pair}")
    
    # 3. REDUCE phase: Sum the counts for each word
    word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b) #the key is the word, the value is the count
    
    # 4. Sort results by count (descending)
    sorted_counts = word_counts_rdd.sortBy(lambda x: x[1], ascending=False)

    print("First 10 word counts:")
    for i, word_count in enumerate(sorted_counts.take(10)):
        print(f"Word count {i}: {word_count}")
    
    # 5. Collect results to driver program (this triggers execution)
    result = sorted_counts.collect()
    
    # Calculate execution time
    execution_time = time.time() - start_time
    
    return result, execution_time

In [11]:
def word_count_single_machine(input_file):
    """Perform word count using a traditional single-machine approach"""
    # Time the operation
    start_time = time.time()
    
    # Dictionary to store word counts
    word_counts = {}
    
    # Read the file and count words
    with open(input_file, 'r') as f:
        for line in f:
            for word in line.strip().split():
                if word in word_counts:
                    word_counts[word] += 1
                else:
                    word_counts[word] = 1
    
    # Sort results by count
    sorted_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
    
    # Calculate execution time
    execution_time = time.time() - start_time
    
    return sorted_counts, execution_time

In [12]:
def demonstrate_partitioning(sc, input_file, num_partitions=4):
    """Demonstrate how data is partitioned in Spark"""
    # Create an RDD with specified number of partitions
    lines_rdd = sc.textFile(input_file, minPartitions=num_partitions)
    
    # Show partition information
    print(f"Number of partitions: {lines_rdd.getNumPartitions()}")
    
    # Map each item with its partition ID to visualize distribution
    def get_partition_info(item, partition_id):
        return (partition_id, item)
    
    # Use mapPartitionsWithIndex to get partition information
    partition_info = lines_rdd.mapPartitionsWithIndex(
        lambda idx, items: [(idx, sum(1 for _ in items))],
        preservesPartitioning=True
    ).collect()
    
    print("Items per partition:")
    for partition_id, count in partition_info:
        print(f"  Partition {partition_id}: {count} items")

In [23]:
def main_processor(word_count_mr, word_count_single):
    """Main function to run the MapReduce lab"""
    data_dir = "data"
    input_file = f"{data_dir}/RevengeoftheSithScript.txt"
    
    # Initialize Spark context
    sc = setup_spark_context()
    
    print("\n" + "="*50)
    print("MAPREDUCE LAB - DISTRIBUTED SYSTEMS")
    print("="*50 + "\n")
    
    # Part 1: Demonstrate basic word count using PySpark
    print("Part 1: Word Count with PySpark MapReduce")
    print("-"*40)
    spark_result, spark_time = word_count_mr(sc, input_file)
    
    # Display top 10 words
    print("\nTop 10 words by frequency:")
    for word, count in spark_result[:10]:
        print(f"  {word}: {count}")
    
    print(f"\nPySpark execution time: {spark_time:.2f} seconds")
    
    # Part 2: Compare with single-machine approach
    print("\nPart 2: Comparison with Single-Machine Processing")
    print("-"*40)
    single_result, single_time = word_count_single(input_file)
    
    print(f"Single-machine execution time: {single_time:.2f} seconds")
    print(f"Speed improvement with PySpark: {single_time/spark_time:.2f}x")
    
    # Part 3: Demonstrate partitioning
    print("\nPart 3: Understanding Data Partitioning")
    print("-"*40)
    demonstrate_partitioning(sc, input_file)
    
    # Part 4: Demonstrate the effect of increasing partitions
    print("\nPart 4: Effect of Increasing Partitions")
    print("-"*40)
    
    # Try different numbers of partitions
    partition_sizes = [2, 4, 8, 16]
    times = []
    
    for partitions in partition_sizes:
        # Configure RDD with specific number of partitions
        lines_rdd = sc.textFile(input_file, minPartitions=partitions)
        words_rdd = lines_rdd.flatMap(lambda line: line.split())
        word_pairs_rdd = words_rdd.map(lambda word: (word, 1))
        
        # Time the reduceByKey operation with different partition counts
        start_time = time.time()
        word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b, numPartitions=partitions)
        result = word_counts_rdd.collect()
        execution_time = time.time() - start_time
        
        times.append(execution_time)
        print(f"Execution with {partitions} partitions: {execution_time:.2f} seconds")
    
    # Clean up
    sc.stop()
    
main_processor(word_count_mr=word_count_mapreduce, word_count_single=word_count_single_machine)
    


MAPREDUCE LAB - DISTRIBUTED SYSTEMS

Part 1: Word Count with PySpark MapReduce
----------------------------------------
Number of partitions:  2


                                                                                

Total number of lines:  4080
First 10 lines of the input file:
Line 0: 
Line 1: 
Line 2: STAR WARS EPISODE 3: REVENGE OF THE SITH SCRIPT 
Line 3: 
Line 4: George Lucas 
Line 5: 
Line 6: 
Line 7: 1 EXT. SPACE 
Line 8: 
Line 9: A long time ago in a galaxy far, far away. 
First 10 word pairs:
Word pair 0: ('STAR', 1)
Word pair 1: ('WARS', 1)
Word pair 2: ('EPISODE', 1)
Word pair 3: ('3:', 1)
Word pair 4: ('REVENGE', 1)
Word pair 5: ('OF', 1)
Word pair 6: ('THE', 1)
Word pair 7: ('SITH', 1)
Word pair 8: ('SCRIPT', 1)
Word pair 9: ('George', 1)
First 10 word counts:
Word count 0: ('the', 1723)
Word count 1: ('.', 805)
Word count 2: ('to', 704)
Word count 3: ('and', 675)
Word count 4: ('of', 522)
Word count 5: ('a', 467)
Word count 6: ('I', 387)
Word count 7: ('is', 351)
Word count 8: ('The', 315)
Word count 9: ('ANAKIN:', 312)

Top 10 words by frequency:
  the: 1723
  .: 805
  to: 704
  and: 675
  of: 522
  a: 467
  I: 387
  is: 351
  The: 315
  ANAKIN:: 312

PySpark execution time: 1.01 se

In [22]:
stop_words = set([
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "with", 
"by", "of", "is", "was", "were", "be", "been", "being", "am", "are", "this", 
"that", "these", "those", "it", "its", "from", "as", "i", "he", "she", "they", 
"we", "you", "him", "her", "them", "their", "our", "your", "my", "has", "have"
])

import re

def clean_text(text):
    # Convert to lowercase and replace punctuation with spaces
    text = text.lower()
    text = re.sub(r'[^\w\s]', ' ', text)  # Replace punctuation with space
    # Split by whitespace and filter out empty strings
    words = [word for word in text.split() if word]
    return words

In [None]:
def word_count_single_machine_with_cleaning(input_file):
    """Perform word count with cleaning using a traditional single-machine approach"""
    # Time the operation
    start_time = time.time()
    
    # Import regular expressions for punctuation removal
    
    
    # Dictionary to store word counts
    word_counts = {}
    
    # Read the file and count words
    with open(input_file, 'r') as f:
        for line in f:
            # Clean text (convert to lowercase and remove punctuation)
            line = clean_text(line)
            
            # Split into words
            for word in line.split():
                # Apply filtering (stop words and length)
                if word not in stop_words and len(word) > 1:
                    if word in word_counts:
                        word_counts[word] += 1
                    else:
                        word_counts[word] = 1
    
    # Sort results by count
    sorted_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
    
    # Calculate execution time
    execution_time = time.time() - start_time
    
    return sorted_counts, execution_time

In [None]:
def word_count_with_cleaning(sc, input_file):
    """Perform word count with stop words and punctuation removal using PySpark's MapReduce approach"""
    # Time the operation
    start_time = time.time()
    
    # Define common stop words
    stop_words = set([
        "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "with", 
        "by", "of", "is", "was", "were", "be", "been", "being", "am", "are", "this", 
        "that", "these", "those", "it", "its", "from", "as", "i", "he", "she", "they", 
        "we", "you", "him", "her", "them", "their", "our", "your", "my", "has", "have"
    ])

    
    # 1. Read the input file into an RDD
    lines_rdd = sc.textFile(input_file)
    
    # 2. MAP phase: Clean text, split into words, remove stop words, and create (word, 1) pairs
    words_rdd = lines_rdd.flatMap(clean_text)

    print("First 10 words:")
    for i, word in enumerate(words_rdd.take(10)):
        print(f"Word {i}: {word}")

    filtered_words_rdd = words_rdd.filter(lambda word: word not in stop_words and len(word) > 1)


    word_pairs_rdd = filtered_words_rdd.map(lambda word: (word, 1))

    print("First 10 word pairs:")
    for i, word_pair in enumerate(word_pairs_rdd.take(10)):
        print(f"Word pair {i}: {word_pair}")
    
    # 3. REDUCE phase: Sum the counts for each word
    word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)


    
    # 4. Sort results by count (descending)
    sorted_counts = word_counts_rdd.sortBy(lambda x: x[1], ascending=False)

    print("First 10 word counts:")
    for i, word_count in enumerate(sorted_counts.take(10)):
        print(f"Word count {i}: {word_count}")
    
    # 5. Collect results to driver program (this triggers execution)
    result = sorted_counts.collect()
    
    # Calculate execution time
    execution_time = time.time() - start_time
    
    return result, execution_time

In [24]:
main_processor(word_count_mr=word_count_with_cleaning, word_count_single=word_count_single_machine_with_cleaning)


MAPREDUCE LAB - DISTRIBUTED SYSTEMS

Part 1: Word Count with PySpark MapReduce
----------------------------------------
First 10 words:
Word 0: star
Word 1: wars
Word 2: episode
Word 3: 3
Word 4: revenge
Word 5: of
Word 6: the
Word 7: sith
Word 8: script
Word 9: george
First 10 word pairs:
Word pair 0: ('star', 1)
Word pair 1: ('wars', 1)
Word pair 2: ('episode', 1)
Word pair 3: ('revenge', 1)
Word pair 4: ('sith', 1)
Word pair 5: ('script', 1)
Word pair 6: ('george', 1)
Word pair 7: ('lucas', 1)
Word pair 8: ('ext', 1)
Word pair 9: ('space', 1)
First 10 word counts:
Word count 0: ('anakin', 701)
Word count 1: ('wan', 566)
Word count 2: ('obi', 566)
Word count 3: ('jedi', 256)
Word count 4: ('padme', 251)
Word count 5: ('his', 246)
Word count 6: ('palpatine', 208)
Word count 7: ('int', 153)
Word count 8: ('yoda', 140)
Word count 9: ('will', 134)

Top 10 words by frequency:
  anakin: 701
  wan: 566
  obi: 566
  jedi: 256
  padme: 251
  his: 246
  palpatine: 208
  int: 153
  yoda: 140
 

In [None]:
#ADJUST THE MB SIZE TO GENERATE A LARGE OR SMALLER FILE
def generate_large_sample_data(filename, size_in_mb=500):
    """Generate a large sample text file with random words."""
    print(f"Generating large sample data (~{size_in_mb}MB)...")
    
    # List of sample words to generate random text
    words = ["hadoop", "spark", "distributed", "computing", "mapreduce", 
             "cluster", "data", "algorithm", "parallel", "processing",
             "scale", "node", "system", "framework", "cloud", "partition",
             "memory", "storage", "driver", "executor", "job", "task",
             "shuffle", "reduce", "map", "cache", "persist", "lineage",
             "transformation", "action", "rdd", "dataframe", "dataset"]
    
    # Calculate approximate number of lines needed
    # Assuming average 6 words per line and average 6 chars per word + spaces
    chars_per_line = 6 * 6 + 5  # words * avg_word_length + spaces
    lines_needed = (size_in_mb * 1024 * 1024) // chars_per_line
    
    # Create directory if it doesn't exist
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    
    # Generate the file with random words
    with open(filename, 'w') as f:
        for i in range(lines_needed):
            # Generate a random line with 4-8 words
            line_length = random.randint(4, 8)
            line = ' '.join(random.choice(words) for _ in range(line_length))
            f.write(line + '\n')
            
            # Print progress every million lines
            if i % 1000000 == 0 and i > 0:
                print(f"  Generated {i:,} lines (~{i*chars_per_line/(1024*1024):.1f}MB)")
    
    file_size_mb = os.path.getsize(filename) / (1024 * 1024)
    print(f"Large sample data generated and saved to {filename} ({file_size_mb:.1f}MB)")