In [10]:
from collections import defaultdict
import concurrent.futures
import re

def parallel_map_reduce(data, map_func, reduce_func, num_workers=4):
    results = defaultdict(list)
    
    def process_item(item):
        key_value_pairs = map_func(item)
        for key, value in key_value_pairs:
            results[key].append(value)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        executor.map(process_item, data)
    
    final_result = {}
    for key, values in results.items():
        final_result[key] = reduce_func(values)
    
    return final_result

def map_word_count(document):
    word_count = defaultdict(int)
    document = re.sub(r'[^a-zA-Z0-9\s]', '', document).lower()
    words = document.split()
    for word in words:
        word_count[word] += 1
    return list(word_count.items())  # Return list of (word, count) tuples

def reduce_word_count(word_counts_list):
    total_word_count = defaultdict(int)
    
    for item in word_counts_list:
        if isinstance(item, dict):  # Check if item is a dictionary
            for word, count in item.items():
                total_word_count[word] += count
        else:
            print(f"Ignoring unexpected item in word_counts_list: {item}")
    
    return total_word_count

# Example usage
if __name__ == '__main__':
    # Assume 'documents' is a list of text documents (strings)
    documents = [
        "This is a sample document for word count.",
        "Counting words is a useful technique in data processing.",
        "Word count helps in understanding text data.",
        "This document is about word count and MapReduce."
    ]
    
    # Perform MapReduce word count
    word_count_result = parallel_map_reduce(documents, map_word_count, reduce_word_count)
    
    # Display information about the word count result
    print("Word Count Result:")
    for word, count in word_count_result.items():
        print(f"{word}: {count}")


Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_

In [15]:
from collections import defaultdict
import concurrent.futures
import re
import time

def parallel_map_reduce(data, map_func, reduce_func, num_workers=4):
    results = defaultdict(list)
    
    def process_item(item):
        key_value_pairs = map_func(item)
        for key, value in key_value_pairs:
            results[key].append(value)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        executor.map(process_item, data)
    
    final_result = {}
    for key, values in results.items():
        final_result[key] = reduce_func(values)
    
    return final_result

def map_word_count(document):
    word_count = defaultdict(int)
    document = re.sub(r'[^a-zA-Z0-9\s]', '', document).lower()
    words = document.split()
    for word in words:
        word_count[word] += 1
    return list(word_count.items())  # Return list of (word, count) tuples

def reduce_word_count(word_counts_list):
    total_word_count = defaultdict(int)
    
    for item in word_counts_list:
        if isinstance(item, dict):  # Check if item is a dictionary
            for word, count in item.items():
                total_word_count[word] += count
        else:
            print(f"Ignoring unexpected item in word_counts_list: {item}")
    
    return total_word_count

if __name__ == '__main__':
    # Assume 'documents' is a list of text documents (strings)
    documents = [
        "This is a sample document for word count.",
        "Counting words is a useful technique in data processing.",
        "Word count helps in understanding text data.",
        "This document is about word count and MapReduce."
    ]
    
    # Measure execution time with parallel MapReduce
    start_time_parallel = time.time()
    word_count_result_parallel = parallel_map_reduce(documents, map_word_count, reduce_word_count, num_workers=4)
    end_time_parallel = time.time()
    execution_time_parallel = end_time_parallel - start_time_parallel
    
    print("Parallel MapReduce Execution Time:", execution_time_parallel, "seconds")
    
    # Measure execution time without parallelization (sequential approach)
    start_time_non_parallel = time.time()
    word_count_result_non_parallel = reduce_word_count(map(map_word_count, documents))
    end_time_non_parallel = time.time()
    execution_time_non_parallel = end_time_non_parallel - start_time_non_parallel
    
    print("Non-Parallelized Execution Time:", execution_time_non_parallel, "seconds")
    
    # Analyze scalability by varying the number of workers (num_workers)
    pool_sizes = [1, 2, 4, 8]
    for num_workers in pool_sizes:
        start_time = time.time()
        word_count_result = parallel_map_reduce(documents, map_word_count, reduce_word_count, num_workers=num_workers)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Num Workers: {num_workers} | Execution Time: {execution_time} seconds")
    
        # Optionally, analyze or plot the scalability results


Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_list: 1
Ignoring unexpected item in word_counts_

In [16]:
1-Problem Statement
The goal is to implement a word count using the MapReduce model on a dataset of text documents.
The MapReduce process involves mapping each document to word-count pairs and
then reducing these pairs to compute the total count for each word across all documents.

2-Code Implementation
Map Function (map_word_count):
Processes each document by cleaning the text, splitting it into words, and counting the occurrence of each word.
Returns a list of (word, count) tuples representing word counts for the document.
Reduce Function (reduce_word_count):
Aggregates word-count pairs from multiple documents into a single dictionary, summing up the counts for each word.
Parallel MapReduce (parallel_map_reduce):
Utilizes multithreading (ThreadPoolExecutor) to parallelize the mapping phase across multiple documents.
Collects and aggregates the mapped results using the provided reduce_func to generate the final word count.
Execution and Measurement:
The main script measures the execution time for parallelized and non-parallelized (sequential) approaches.
It also tests the scalability by varying the number of worker threads (num_workers) in the ThreadPoolExecutor.
3-Results
Execution Time Comparison:
Parallel MapReduce: Measures how long it takes to process the dataset using parallel threads.
Non-Parallelized Approach: Compares with a sequential approach to understand the benefit of parallelization.
Scalability Analysis:
Varies the num_workers parameter to evaluate how performance scales with increased parallelism.
4-Observations and Findings
Scalability and Efficiency
Performance Gain with Parallelization:
Parallel MapReduce significantly reduces execution time compared to a sequential approach,
especially on large datasets.
Scalability is evident when increasing the number of workers (num_workers), leading to faster processing times.
Optimal num_workers:
The optimal number of worker threads (num_workers) depends on factors like dataset size and system resources.
Increasing num_workers beyond a certain point may result in diminishing returns due to overhead and resource contention.
From the above example
Num Worker-1,Execution Time-0.002681
Num Worker-2,Execution Time-0.001631
Num Worker-4,Execution Time-0.001887
Num Worker-8,Execution Time-0.001769
5-Challenges Faced
Data Handling and Mapping:
Ensuring proper data handling and transformation within the map_word_count function.
Handling unexpected data types or input formats during the reduce phase (reduce_word_count).
Thread Management:
Managing thread synchronization and resource utilization in a multithreaded environment.
Dealing with potential race conditions or concurrency issues.
Potential Improvements
Error Handling and Robustness:
Enhance error handling to gracefully manage unexpected inputs or runtime issues.
Implement logging and monitoring for better debugging and performance analysis.
Performance Optimization:
Experiment with different parallelization strategies (e.g., multiprocessing) based on the nature of the workload.
Profile and optimize critical sections of the code to reduce overhead and improve efficiency.
Scalability Testing:
Conduct extensive scalability testing on diverse datasets to identify performance bottlenecks and refine the parallelization strategy.
Explore distributed computing frameworks (e.g., Apache Hadoop, Apache Spark) for handling larger datasets and more complex computations.
6-Conclusion
The MapReduce model offers an effective approach for distributed data processing,
enabling scalable and efficient computation on large datasets.
By leveraging parallelism and multithreading, we can harness the power of
modern computing architectures to achieve significant performance gains.
However, designing and optimizing MapReduce workflows require careful consideration of data handling,
concurrency, and system characteristics to maximize efficiency and scalability.
Ongoing experimentation and refinement are essential for achieving optimal performance and
addressing real-world challenges in data-intensive applications.

SyntaxError: invalid syntax (<ipython-input-16-11476b352c2f>, line 1)