# Exercise: Implementing a MapReduce Algorithm to Calculate the Average Value for Each Key

# 1. Simplified version implementation(directly run)

## Step 1: Map Function
The Map function will read the key-value pairs and output intermediate key-value pairs where the key is unchanged, and the value is a tuple containing the original value and a count of 1. This helps in the subsequent aggregation step.

In [6]:
def map_function(data):
    """Process each key-value pair and emit (key, (value, count))"""
    intermediate = []
    for key, value in data:
        intermediate.append((key, (value, 1)))
    return intermediate


## Step 2: Shuffle and Sort
This step is typically handled by the MapReduce framework, where it organizes the data from the Map function to bring together all values associated with the same key to the same reducer. In our implementation, we'll simulate this with a dictionary.

In [7]:
def shuffle_and_sort(intermediate):
    """Organize data by keys"""
    grouped = {}
    for key, value in intermediate:
        if key in grouped:
            grouped[key].append(value)
        else:
            grouped[key] = [value]
    return grouped

## Step 3: Reduce Function
The Reduce function will take the organized data from the shuffle and sort phase, sum the values and counts for each key, and then compute the average.

In [8]:
def reduce_function(grouped):
    """Aggregate sums and counts and compute the average for each key"""
    result = {}
    for key, values in grouped.items():
        total_sum = sum(value[0] for value in values)
        total_count = sum(value[1] for value in values)
        average = total_sum / total_count
        result[key] = average
    return result


In [9]:
data = [('a', 4), ('b', 5), ('a', 3)]
# Combine the functions to compute the average value for each key
intermediate = map_function(data)
grouped = shuffle_and_sort(intermediate)
result = reduce_function(grouped)
# Output the results
print(result)

{'a': 3.5, 'b': 5.0}


# 2. Multi-process simulation implementation

To simulate a distributed environment more realistically in Python,  i use multiprocessing to mimic the parallel processing that happens in a true MapReduce framework. 

- Data Splitting: The dataset is split into chunks, with each chunk processed by a separate worker. This simulates the distribution of data across different nodes in a distributed system.

- Map Phase: Each worker applies the map_function to its chunk of data. This is done in parallel across all workers.

- Shuffling and Combining: The results from all workers are collected into a single list. In a true distributed system, this would correspond to the shuffle and sort phase, where data with the same keys are moved to the same reducer.

- Reduce Phase: The reduce_function processes the combined intermediate results to compute the final average for each key.

- Multiprocessing Pool: The multiprocessing.Pool manages the distribution of work and collection of results across multiple processes, mimicking the behavior of a MapReduce framework's managing system.

This approach, while still running on a single machine, better simulates the behavior of distributed systems by utilizing parallel processing capabilities.

## 2.1 Multi-process Code

In [None]:
def map_function(data_chunk):
    """Map function to process each chunk of data."""
    result = []
    for key, value in data_chunk:
        result.append((key, (value, 1)))
    return result

def reduce_function(intermediate_data):
    """Reduce function to compute the average from summed values."""
    totals = defaultdict(list)
    for key, value in intermediate_data:
        totals[key].append(value)
    
    final_result = {}
    for key, values in totals.items():
        total_sum = sum(val[0] for val in values)
        total_count = sum(val[1] for val in values)
        final_result[key] = total_sum / total_count
    return final_result

def distribute_computation(data, map_func, reduce_func, num_workers=None):
    if num_workers is None:
        num_workers = multiprocessing.cpu_count()

    num_workers = min(num_workers, len(data))
    chunk_size = max(1, len(data) // num_workers)  
    
    with multiprocessing.Pool(processes=num_workers) as pool:
        # Split the data into chunks
        data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
        
        # Map phase: process each chunk in parallel
        map_results = pool.map(map_func, data_chunks)
        
        # Combine all intermediate results for the reduce phase
        combined_results = [item for sublist in map_results for item in sublist]
    
    # Reduce phase: compute final results
    final_result = reduce_func(combined_results)
    return final_result

## 2.2 Related tool functions

### 2.2.1 Function that writes data to a file
This function generates a specified number of key-value pairs, one letter for each key, and a random number for each value, and appends these pairs to data.txt.

In [10]:
import random
import string

def append_data_to_file(num_pairs, filename="data.txt"):
    """Append specified number of key-value pairs to a file."""
    with open(filename, "a", encoding="utf-8") as file:
        for _ in range(num_pairs):
            key = random.choice(string.ascii_lowercase)  # Generate a random letter
            value = random.randint(1, 100)  # Generate a random integer between 1 and 100
            file.write(f"{key},{value}\n")  # Write the key-value pair to the file

append_data_to_file(50)

### 2.2.2 Functions that read data from files
This function is used to read key-value pairs from data.txt and convert them to a format (list form) suitable for processing.

In [5]:
def read_data_from_file(filename="data.txt"):
    """Read key-value pairs from a file and return as a list of tuples."""
    data = []
    with open(filename, "r", encoding="utf-8") as file:
        for line in file:
            key, value = line.strip().split(',')
            data.append((key, int(value)))
    return data


### 2.2.3 A function that saves processing results

In [None]:
def save_results_to_file(results, filename="result.txt"):
    """Save the results dictionary to a file."""
    with open(filename, "w", encoding="utf-8") as file:
        for key, value in results.items():
            file.write(f"{key},{value}\n")

# save_results_to_file({'a': 25.5, 'b': 75})
# # data = read_data_from_file()


## Execute `python Multi-process.py` to get the result

{'d': 43.5, 'e': 54.0, 'w': 47.0, 'o': 42.0, 'l': 35.5, 'b': 64.0, 's': 37.666666666666664, 'f': 78.66666666666667, 'q': 46.666666666666664, 'm': 42.333333333333336, 'c': 48.333333333333336, 'r': 67.5, 't': 44.25, 'i': 45.333333333333336, 'z': 19.5, 'j': 37.666666666666664, 'h': 33.0, 'x': 92.5, 'u': 32.5, 'v': 79.66666666666667, 'g': 48.0, 'k': 31.0, 'p': 7.0}