# Lab - Parallel Processing in Python

## Introduction

Parallel processing is a mode of operation where tasks are executed simultaneously across multiple processors on the same computer. This approach is designed to reduce overall processing time for computationally intensive operations.

However, there is usually a bit of overhead when communicating between processes which can actually increase the overall time taken for small tasks instead of decreasing it. Understanding when and how to apply parallel processing is crucial for effective optimization.

In Python, the `multiprocessing` module is used to run independent parallel processes by using subprocesses (instead of threads). It allows you to leverage multiple processors on a machine, which means the processes can be run in completely separate memory locations. This is particularly important in Python due to the Global Interpreter Lock (GIL).

## Learning Objectives

By the end of this lab you will:

* Understand how to structure code and use syntax for parallel processing with `multiprocessing`
* Implement both synchronous and asynchronous parallel processing
* Learn to parallelize operations on Pandas DataFrames
* Solve 3 different use cases using the `multiprocessing.Pool()` interface
* Compare performance between serial and parallel implementations

## Environment Note

This lab is designed to run on your Amazon EC2 instance accessed through VSCode. All multiprocessing features will work correctly in this Linux environment.

Import `multiprocessing` (you may need to install it)

In [23]:
import multiprocessing as mp

Determine the maximum number of parallel processes can you run. Note: you don't want to use all the available cores since your computer needs to process other things as well.

In [24]:
print("Number of processors: ", mp.cpu_count())

Number of processors:  8


## Synchronous and Asynchronous execution

In parallel processing, there are two types of execution models:

* **_Synchronous_** runs the processes in the same order in which they were started. This is achieved by locking the main program until the respective processes are finished.

* **_Asynchronous_**, on the other hand, doesn’t involve locking. As a result, the order of results can get mixed up but usually gets done quicker.

There are 2 main objects in `multiprocessing` to implement parallel execution of a function: The `Pool` Class and the `Process` Class.

### `Pool` Class

* Synchronous execution
    * `Pool.map()` and `Pool.starmap()`
    * `Pool.apply()` not used in lab
    
* Asynchronous execution
    * `Pool.map_async()` and `Pool.starmap_async()`
    * `Pool.apply_async())` not used in lab

### `Process` Class

Let’s take up a typical problem and implement parallelization using the above techniques. In this lab, we stick to the `Pool` class, because it is most convenient to use and serves most common practical applications.

More info on these classes [here]((https://docs.python.org/3/library/multiprocessing.html))

Read about the differences between apply, map, and starmap [here](https://stackoverflow.com/questions/8533318/multiprocessing-pool-when-to-use-apply-apply-async-or-map)

## Example

Given a 2D matrix (or list of lists), count how many numbers are present between a given range in each row. We will transform a matrix into a list of rows. 

Import the following libraries:

In [25]:
import numpy as np
from time import time

Use `np.randon.RandomState(100)` to set a random seed.

In [26]:
np.random.RandomState(100)

RandomState(MT19937) at 0x7AB56E600440

Let's create a large numpy array matrix of integers between 0 and 9 with 1,000,000 rows and 5 columns. You can use `np.randon.randint` for this.

In [27]:
arr = np.random.randint(0, 10, size=[1000000, 5])

Convert the array you created in the previous step to a list using `tolist()`.

In [28]:
data = arr.tolist()

Explore your array

In [29]:
arr

array([[4, 7, 0, 1, 3],
       [4, 9, 4, 2, 3],
       [8, 4, 3, 0, 7],
       ...,
       [9, 1, 8, 0, 0],
       [5, 7, 1, 8, 0],
       [4, 4, 5, 6, 7]], shape=(1000000, 5))

Check that the length of your list is 1,000,000

In [30]:
len(data)

1000000

### Implement solution without parallelization

Let’s see how long it takes to compute it without parallelization. For this, we create a function called `howmany_within_range()` and we iterate the function over every row of the matrix. Since we converted the matrix to a list, then we iterate over every element in the list. The function receives all the values on the row (list element) as input and return the count.

In [31]:
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

Iterate the function `howmany_within_range` over every row in the matrix and measure the time.

In [None]:
import time

start = time.time()
# TODO: Fill in code here to iterate howmany_within_range over every row
# Process each row in data with minimum=4, maximum=8

end = time.time()
print(end - start)

Print the first 10 results

In [None]:
# TODO: Print the first 10 results

Check that the length of results is 1,000,000 and your implementation didn't cheat!

In [None]:
# TODO: Check that the length of results is 1,000,000

## How to parallelize any function?

### Platform Note
This notebook is designed to run on Amazon EC2 instances accessed through VSCode. The multiprocessing examples will work correctly in this Linux environment.

The general way to parallelize any operation is to take a particular function that should be run multiple times and make it run in parallel using different processors.

To do this, you initialize a _Pool_ with n number of processors (or cores) and pass the function you want to parallelize to one of _Pool's_ parallelization methods.

`multiprocessing.Pool()` provides the `apply()`, `map()` and `starmap()` methods to make any function run in parallel.

### Key Differences Between Methods

Both `apply` and `map` take the function to be "parallelized" as the main argument. But the difference is:
- `apply()` takes an _args_ argument that accepts the parameters passed to the _function-to-be-parallelized_ as an argument
- `map()` can take only one _iterable_ as an argument

This is because `apply()` only runs one worker in the pool (so it's not true parallelization of the function), while `map()` distributes work across all workers in the pool.

So `map()` is really more suitable for simpler iterable operations and also does the job faster because it uses all available workers.

We will explore `starmap()` once we see how to parallelize `howmany_within_range()` function with `apply()` and `map()`.

### Parallelizing using `Pool.map()`

`Pool.map()` accepts only _one iterable as argument_. So as a workaround, we modify the `howmany_within_range` function by setting a default to the minimum and maximum parameters to create a new `howmany_within_range_rowonly()` function so it accetps only an _iterable list_ of rows as input. This is not a nice use case of map(), but it clearly shows how it differs from apply().

In [None]:
# Parallelizing using Pool.map()

# TODO: Redefine function with only 1 mandatory argument
# Create howmany_within_range_rowonly() with default parameters

start = time.time()
pool = mp.Pool(mp.cpu_count())

t1 = time.time()
# TODO: Use pool.map function

t2 = time.time()
pool.close()
end = time.time()

print('total time',end - start)
print('time to set up pool',t1 - start)
print('time to multiprocess',t2 - t1)

# TODO: Print results after implementing

### Parallelizing using `Pool.starmap()`

In previous example, we have to redefine `howmany_within_range` function to make couple of parameters to take default values. Using `starmap()`, you can avoid doing this. How you ask?

Like `Pool.map()`, `Pool.starmap()` also accepts only one iterable as argument, but in `starmap()`, each element in that iterable is also a iterable. You can to provide the arguments to the _function-to-be-parallelized_ in the same order in this inner iterable element, will in turn be unpacked during execution. Internally Python is performing `_function-to-be-parallelized_(*args)`, for each iteration, where `args` are the iterable argument supplied to `Pool.starmap`.

So effectively, `Pool.starmap()` is like a version of Pool.map() that accepts arguments.

In [None]:
# Parallelizing with Pool.starmap()
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())

# TODO: Use pool.starmap with the original howmany_within_range function

pool.close()

# TODO: Print results after implementing

## Asynchronous Parallel Processing

The asynchronous equivalents `apply_async()`, `map_async()` and `starmap_async()` lets you do execute the processes in parallel asynchronously, that is the next process can start as soon as previous one gets over without regard for the starting order. As a result, there is no guarantee that the result will be in the same order as the input.

## Parallelizing with `Pool.starmap_async()`

You saw how `apply_async()` works. Can you imagine and write up an equivalent version for starmap_async and map_async? The implementation is below anyways.

In [None]:
# Parallelizing with Pool.starmap_async()

def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

# TODO: Use starmap_async to process the data
# Remember to use .get() to retrieve results from async call

pool.close()
# TODO: Print results after implementing

# Exercises

## Problem 1

Use `Pool.starmap()` to get the row wise common items in list_a and list_b. Each iteration will be row_i in list_a and list_b:

```
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
```

In [None]:
import multiprocessing as mp

list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]

# TODO: Define a function to find common elements between two lists
def common_item(list_a, list_b):
    pass

# TODO: Use Pool.starmap to apply common_item to corresponding rows
# Save result as prob1_answer

print("Implement the solution to see results")

## Problem 2: CPU-Intensive Work Performance Comparison

### Overview
Compare serial vs parallel execution for CPU-intensive tasks using the `do_busy_work` function from `script.py`.

### The do_busy_work Function
The `do_busy_work(time_in_seconds: int) -> float` function:
- Takes an integer representing seconds of work to simulate
- Sleeps for that many seconds (simulating CPU-intensive work)
- Prints process ID and timing information
- Returns the actual elapsed time as a float

### Instructions

Your task:

1. **Part A - Serial Execution**: 
   - Process the list `[1, 2, 4, 6]` serially using the built-in `map()` function
   - Expected total time: ~13 seconds on an 8-core machine (1+2+4+6)

2. **Part B - Parallel with CPU Count**:
   - Use `multiprocessing.Pool` with size = `mp.cpu_count()`
   - Expected time: ~6 seconds (limited by longest task)

3. **Part C - Parallel with Optimal Pool Size**:
   - Use `multiprocessing.Pool` with size = `len(work_list)`
   - Compare performance with Part B

### Key Concepts
- Observe how parallel execution reduces total time
- Understand relationship between pool size and performance
- See why parallel time is limited by the longest individual task

In [None]:
import time
from typing import List
import multiprocessing as mp
from script import do_busy_work

work: List = [1, 2, 4, 6]

print("=== Part A: Serial Execution ===")
start = time.perf_counter()

# TODO: Serial execution using map()

time_elapsed = time.perf_counter() - start
prob2_part1_answer = time_elapsed
print(f"\nSerial execution time: {time_elapsed:.2f} seconds")
print(f"Expected ~13 seconds on an 8-core machine, got {time_elapsed:.2f} seconds")

print("\n=== Part B: Parallel with CPU Count ===")
start = time.perf_counter()

# TODO: Parallel execution with CPU count

time_elapsed = time.perf_counter() - start
prob2_part2_answer = time_elapsed
print(f"\nParallel execution time (CPU count={mp.cpu_count()}): {time_elapsed:.2f} seconds")

print("\n=== Part C: Parallel with Optimal Pool Size ===")
start = time.perf_counter()

# TODO: Parallel execution with pool size = work list size

time_elapsed = time.perf_counter() - start
prob2_part3_answer = time_elapsed
print(f"\nParallel execution time (pool size={len(work)}): {time_elapsed:.2f} seconds")

# TODO: Uncomment after implementing all parts to see performance summary
# print("\n" + "="*50)
# print("PERFORMANCE SUMMARY")
# print("="*50)
# speedup_cpu = prob2_part1_answer/prob2_part2_answer
# speedup_opt = prob2_part1_answer/prob2_part3_answer
# print(f"Serial: {prob2_part1_answer:.2f}s | Parallel (CPU): {prob2_part2_answer:.2f}s ({speedup_cpu:.2f}x) | Parallel (optimal): {prob2_part3_answer:.2f}s ({speedup_opt:.2f}x)")

## Problem 3: Large-Scale Parallel Data Processing

### Overview
Real-world data processing often involves operations on large datasets where parallelization can provide significant performance benefits. In this problem, you'll normalize a large dataset and observe how parallel processing scales with data size.

In [None]:
import multiprocessing as mp
import numpy as np
import time

# TODO: Create your normalization function
def normalize(row):
    """
    Normalize a row to [0, 1] range using min-max normalization.
    Handle edge case where all values are the same.
    
    Formula: z_i = (x_i - min(x)) / (max(x) - min(x))
    """
    pass

# Test with small example first
test_data = [
    [2, 3, 4, 5],
    [6, 9, 10, 12],
    [11, 12, 13, 14],
    [21, 24, 25, 26],
    [100, 100, 100, 100],  # Edge case: all same values
]

print("=== Testing normalization function ===")
# TODO: Test your normalize function with test_data

print("\n" + "="*70)
print("PERFORMANCE COMPARISON WITH DIFFERENT DATASET SIZES")
print("="*70)

np.random.seed(42)  # For reproducibility
sizes = [
    ("Small", 100, 50),
    ("Medium", 1000, 50),
    ("Large", 10000, 50),
    ("XL", 50000, 100),
    ("XXL", 100000, 100),
]

results = {}

for size_name, rows, cols in sizes:
    print(f"\n--- {size_name} Dataset: {rows:,} rows × {cols} columns ---")
    
    # Generate random data
    data = np.random.randint(0, 1000, size=(rows, cols)).tolist()
    
    print(f"Running serial processing...")
    start = time.perf_counter()
    # TODO: Serial processing
    serial_time = time.perf_counter() - start
    print(f"  Serial time: {serial_time:.4f} seconds")
    
    print(f"Running parallel processing ({mp.cpu_count()} cores)...")
    start = time.perf_counter()
    # TODO: Parallel processing
    parallel_time = time.perf_counter() - start
    print(f"  Parallel time: {parallel_time:.4f} seconds")
    
    # TODO: Calculate and display speedup
    
    # TODO: Store results for summary table
    
    # Store XXL results for submission
    if size_name == "XXL":
        prob3_answer = parallel_results[:5]

# TODO: Display performance summary table and insights after implementation

## **Save your analytics results to a json object - then add, commit, and push your notebook and json to GitHub!**

In [None]:
import json
import socket
from typing import Dict
from pathlib import Path

# TODO: Make sure all problem variables are defined before running this cell:
# - prob1_answer (from Problem 1)
# - prob2_part1_answer, prob2_part2_answer, prob2_part3_answer (from Problem 2)
# - prob3_answer (from Problem 3)

answers: Dict = dict(
    prob1=str(prob1_answer),
    prob2=dict(part1=prob2_part1_answer, part2=prob2_part2_answer, part3=prob2_part3_answer),
    prob3=str(prob3_answer),
    host=socket.gethostname()
)

Path("soln.json").write_text(json.dumps(answers, indent=2))
print("Results saved to soln.json")