# Lab - Parallel Processing in Python

## Introduction

Parallel processing is a mode of operation where tasks are executed simultaneously across multiple processors on the same computer. This approach is designed to reduce overall processing time for computationally intensive operations.

However, there is usually a bit of overhead when communicating between processes which can actually increase the overall time taken for small tasks instead of decreasing it. Understanding when and how to apply parallel processing is crucial for effective optimization.

In Python, the `multiprocessing` module is used to run independent parallel processes by using subprocesses (instead of threads). It allows you to leverage multiple processors on a machine, which means the processes can be run in completely separate memory locations. This is particularly important in Python due to the Global Interpreter Lock (GIL).

## Learning Objectives

By the end of this lab you will:

* Understand how to structure code and use syntax for parallel processing with `multiprocessing`
* Implement both synchronous and asynchronous parallel processing
* Learn to parallelize operations on Pandas DataFrames
* Solve 3 different use cases using the `multiprocessing.Pool()` interface
* Compare performance between serial and parallel implementations

## Environment Note

This lab is designed to run on your Amazon EC2 instance accessed through VSCode. All multiprocessing features will work correctly in this Linux environment.

Import `multiprocessing` (you may need to install it)

In [12]:
import multiprocessing as mp

Determine the maximum number of parallel processes can you run. Note: you don't want to use all the available cores since your computer needs to process other things as well.

In [13]:
print("Number of processors: ", mp.cpu_count())

Number of processors:  10


## Synchronous and Asynchronous execution

In parallel processing, there are two types of execution models:

* **_Synchronous_** runs the processes in the same order in which they were started. This is achieved by locking the main program until the respective processes are finished.

* **_Asynchronous_**, on the other hand, doesn’t involve locking. As a result, the order of results can get mixed up but usually gets done quicker.

There are 2 main objects in `multiprocessing` to implement parallel execution of a function: The `Pool` Class and the `Process` Class.

### `Pool` Class

* Synchronous execution
    * `Pool.map()` and `Pool.starmap()`
    * `Pool.apply()` not used in lab
    
* Asynchronous execution
    * `Pool.map_async()` and `Pool.starmap_async()`
    * `Pool.apply_async())` not used in lab

### `Process` Class

Let’s take up a typical problem and implement parallelization using the above techniques. In this lab, we stick to the `Pool` class, because it is most convenient to use and serves most common practical applications.

More info on these classes [here]((https://docs.python.org/3/library/multiprocessing.html))

Read about the differences between apply, map, and starmap [here](https://stackoverflow.com/questions/8533318/multiprocessing-pool-when-to-use-apply-apply-async-or-map)

## Example

Given a 2D matrix (or list of lists), count how many numbers are present between a given range in each row. We will transform a matrix into a list of rows. 

Import the following libraries:

In [14]:
import numpy as np
from time import time

Use `np.randon.RandomState(100)` to set a random seed.

In [15]:
np.random.RandomState(100)

RandomState(MT19937) at 0x10BDEBE40

Let's create a large numpy array matrix of integers between 0 and 9 with 1,000,000 rows and 5 columns. You can use `np.randon.randint` for this.

In [16]:
arr = np.random.randint(0, 10, size=[1000000, 5])

Convert the array you created in the previous step to a list using `tolist()`.

In [17]:
data = arr.tolist()

Explore your array

In [18]:
arr

array([[7, 3, 0, 2, 8],
       [7, 7, 2, 3, 9],
       [9, 2, 5, 4, 2],
       ...,
       [4, 8, 6, 6, 8],
       [5, 9, 6, 4, 0],
       [6, 5, 1, 3, 3]], shape=(1000000, 5))

Check that the length of your list is 1,000,000

In [19]:
len(data)

1000000

### Implement solution without parallelization

Let’s see how long it takes to compute it without parallelization. For this, we create a function called `howmany_within_range()` and we iterate the function over every row of the matrix. Since we converted the matrix to a list, then we iterate over every element in the list. The function receives all the values on the row (list element) as input and return the count.

In [20]:
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

Iterate the function `howmany_within_range` over every row in the matrix and measure the time.

In [21]:
import time

start = time.time()

counts = [howmany_within_range(row, 4, 8) for row in data]

end = time.time()
print(end - start)

0.24921703338623047


Print the first 10 results

In [22]:
print(counts[:10])

[2, 2, 2, 2, 2, 1, 1, 3, 4, 4]


Check that the length of results is 1,000,000 and your implementation didn't cheat!

In [23]:
print(len(counts))

1000000


## How to parallelize any function?

### Platform Note
This notebook is designed to run on Amazon EC2 instances accessed through VSCode. The multiprocessing examples will work correctly in this Linux environment.

The general way to parallelize any operation is to take a particular function that should be run multiple times and make it run in parallel using different processors.

To do this, you initialize a _Pool_ with n number of processors (or cores) and pass the function you want to parallelize to one of _Pool's_ parallelization methods.

`multiprocessing.Pool()` provides the `apply()`, `map()` and `starmap()` methods to make any function run in parallel.

### Key Differences Between Methods

Both `apply` and `map` take the function to be "parallelized" as the main argument. But the difference is:
- `apply()` takes an _args_ argument that accepts the parameters passed to the _function-to-be-parallelized_ as an argument
- `map()` can take only one _iterable_ as an argument

This is because `apply()` only runs one worker in the pool (so it's not true parallelization of the function), while `map()` distributes work across all workers in the pool.

So `map()` is really more suitable for simpler iterable operations and also does the job faster because it uses all available workers.

We will explore `starmap()` once we see how to parallelize `howmany_within_range()` function with `apply()` and `map()`.

### Parallelizing using `Pool.map()`

`Pool.map()` accepts only _one iterable as argument_. So as a workaround, we modify the `howmany_within_range` function by setting a default to the minimum and maximum parameters to create a new `howmany_within_range_rowonly()` function so it accetps only an _iterable list_ of rows as input. This is not a nice use case of map(), but it clearly shows how it differs from apply().

In [24]:
# Parallelizing using Pool.map()
import importlib
import mp_helpers
importlib.reload(mp_helpers)
from mp_helpers import howmany_within_range_rowonly


start = time.time()
with mp.Pool(mp.cpu_count()) as pool:
    t1 = time.time()
    counts = pool.map(howmany_within_range_rowonly, data)
    t2 = time.time()

end = time.time()

print('total time', end - start)
print('time to set up pool', t1 - start)
print('time to multiprocess', t2 - t1)
print(counts[:10])
print(len(counts))


total time 0.286456823348999
time to set up pool 0.11023998260498047
time to multiprocess 0.16297078132629395
[2, 2, 2, 2, 2, 1, 1, 3, 4, 4]
1000000


### Parallelizing using `Pool.starmap()`

In previous example, we have to redefine `howmany_within_range` function to make couple of parameters to take default values. Using `starmap()`, you can avoid doing this. How you ask?

Like `Pool.map()`, `Pool.starmap()` also accepts only one iterable as argument, but in `starmap()`, each element in that iterable is also a iterable. You can to provide the arguments to the _function-to-be-parallelized_ in the same order in this inner iterable element, will in turn be unpacked during execution. Internally Python is performing `_function-to-be-parallelized_(*args)`, for each iteration, where `args` are the iterable argument supplied to `Pool.starmap`.

So effectively, `Pool.starmap()` is like a version of Pool.map() that accepts arguments.

In [25]:
# Parallelizing with Pool.starmap()
import multiprocessing as mp
import time
importlib.reload(mp_helpers)
from mp_helpers import howmany_within_range

start = time.time()

with mp.Pool(mp.cpu_count()) as pool:
    args = [(row, 4, 8) for row in data]
    results = pool.starmap(howmany_within_range, args)

end = time.time()

print("total time:", end - start)
print(results[:10])
print("length:", len(results))



total time: 1.0252742767333984
[2, 2, 2, 2, 2, 1, 1, 3, 4, 4]
length: 1000000


## Asynchronous Parallel Processing

The asynchronous equivalents `apply_async()`, `map_async()` and `starmap_async()` lets you do execute the processes in parallel asynchronously, that is the next process can start as soon as previous one gets over without regard for the starting order. As a result, there is no guarantee that the result will be in the same order as the input.

## Parallelizing with `Pool.starmap_async()`

You saw how `apply_async()` works. Can you imagine and write up an equivalent version for starmap_async and map_async? The implementation is below anyways.

In [26]:
import importlib
import mp_helpers
importlib.reload(mp_helpers)

import multiprocessing as mp
import time

pool = mp.Pool(mp.cpu_count())

start = time.time()

# each task is: (i, row, min, max)
tasks = [(i, row, 4, 8) for i, row in enumerate(data)]

async_result = pool.starmap_async(mp_helpers.howmany_within_range2, tasks)

results = async_result.get()          # IMPORTANT
pool.close()
pool.join()

end = time.time()

# restore original order (because async can return mixed order)
results_sorted = [c for i, c in sorted(results, key=lambda x: x[0])]

print("time:", end - start)
print(results_sorted[:10])
print(len(results_sorted))


time: 0.8601081371307373
[2, 2, 2, 2, 2, 1, 1, 3, 4, 4]
1000000


# Exercises

## Problem 1

Use `Pool.starmap()` to get the row wise common items in list_a and list_b. Each iteration will be row_i in list_a and list_b:

```
list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]
```

In [27]:
import multiprocessing as mp
import importlib
import mp_helpers

importlib.reload(mp_helpers)

list_a = [[1, 2, 3], [5, 6, 7, 8], [10, 11, 12], [20, 21]]
list_b = [[2, 3, 4, 5], [6, 9, 10], [11, 12, 13, 14], [21, 24, 25]]

with mp.Pool(mp.cpu_count()) as pool:
    prob1_answer = pool.starmap(mp_helpers.common_items, zip(list_a, list_b))

print(prob1_answer)



[[2, 3], [6], [11, 12], [21]]


## Problem 2: CPU-Intensive Work Performance Comparison

### Overview
Compare serial vs parallel execution for CPU-intensive tasks using the `do_busy_work` function from `script.py`.

### The do_busy_work Function
The `do_busy_work(time_in_seconds: int) -> float` function:
- Takes an integer representing seconds of work to simulate
- Sleeps for that many seconds (simulating CPU-intensive work)
- Prints process ID and timing information
- Returns the actual elapsed time as a float

### Instructions

Your task:

1. **Part A - Serial Execution**: 
   - Process the list `[1, 2, 4, 6]` serially using the built-in `map()` function
   - Expected total time: ~13 seconds on an 8-core machine (1+2+4+6)

2. **Part B - Parallel with CPU Count**:
   - Use `multiprocessing.Pool` with size = `mp.cpu_count()`
   - Expected time: ~6 seconds (limited by longest task)

3. **Part C - Parallel with Optimal Pool Size**:
   - Use `multiprocessing.Pool` with size = `len(work_list)`
   - Compare performance with Part B

### Key Concepts
- Observe how parallel execution reduces total time
- Understand relationship between pool size and performance
- See why parallel time is limited by the longest individual task

In [28]:
import time
from typing import List
import multiprocessing as mp
from script import do_busy_work

work: List = [1, 2, 4, 6]

print("=== Part A: Serial Execution ===")
start = time.perf_counter()

serial_results = list(map(do_busy_work, work))

time_elapsed = time.perf_counter() - start
prob2_part1_answer = time_elapsed
print(f"\nSerial execution time: {time_elapsed:.2f} seconds")
print(f"Expected ~13 seconds on an 8-core machine, got {time_elapsed:.2f} seconds")

print("\n=== Part B: Parallel with CPU Count ===")
start = time.perf_counter()

# Wrapped function for Part B
def run_parallel_cpu():
    with mp.Pool(mp.cpu_count()) as pool:
        return pool.map(do_busy_work, work)

# Execute Part B
if __name__ == "__main__":
    parallel_results_cpu = run_parallel_cpu()

time_elapsed = time.perf_counter() - start
prob2_part2_answer = time_elapsed
print(f"\nParallel execution time (CPU count={mp.cpu_count()}): {time_elapsed:.2f} seconds")

print("\n=== Part C: Parallel with Optimal Pool Size ===")
start = time.perf_counter()

# Wrapped function for Part C
def run_parallel_optimal():
    # Using pool size equal to number of tasks (4)
    with mp.Pool(len(work)) as pool:
        return pool.map(do_busy_work, work)

# Execute Part C
if __name__ == "__main__":
    parallel_results_opt = run_parallel_optimal()

time_elapsed = time.perf_counter() - start
prob2_part3_answer = time_elapsed
print(f"\nParallel execution time (pool size={len(work)}): {time_elapsed:.2f} seconds")

# Performance summary
print("\n" + "="*50)
print("PERFORMANCE SUMMARY")
print("="*50)
speedup_cpu = prob2_part1_answer/prob2_part2_answer if prob2_part2_answer > 0 else 0
speedup_opt = prob2_part1_answer/prob2_part3_answer if prob2_part3_answer > 0 else 0
print(f"Serial: {prob2_part1_answer:.2f}s | Parallel (CPU): {prob2_part2_answer:.2f}s ({speedup_cpu:.2f}x) | Parallel (optimal): {prob2_part3_answer:.2f}s ({speedup_opt:.2f}x)")

=== Part A: Serial Execution ===

do_busy_work, pid=34558, enter
do_busy_work, pid=34558, exit, elapsed_time=1.00 seconds

do_busy_work, pid=34558, enter
do_busy_work, pid=34558, exit, elapsed_time=2.00 seconds

do_busy_work, pid=34558, enter
do_busy_work, pid=34558, exit, elapsed_time=4.01 seconds

do_busy_work, pid=34558, enter
do_busy_work, pid=34558, exit, elapsed_time=6.01 seconds

Serial execution time: 13.15 seconds
Expected ~13 seconds on an 8-core machine, got 13.15 seconds

=== Part B: Parallel with CPU Count ===

do_busy_work, pid=36712, enter

do_busy_work, pid=36715, enter

do_busy_work, pid=36713, enter

do_busy_work, pid=36714, enter
do_busy_work, pid=36714, exit, elapsed_time=1.01 seconds
do_busy_work, pid=36712, exit, elapsed_time=2.01 seconds
do_busy_work, pid=36713, exit, elapsed_time=4.01 seconds
do_busy_work, pid=36715, exit, elapsed_time=6.01 seconds

Parallel execution time (CPU count=10): 6.12 seconds

=== Part C: Parallel with Optimal Pool Size ===

do_busy_wor

## Problem 3: Large-Scale Parallel Data Processing

### Overview
Real-world data processing often involves operations on large datasets where parallelization can provide significant performance benefits. In this problem, you'll normalize a large dataset and observe how parallel processing scales with data size.

In [29]:
import multiprocessing as mp
import numpy as np
import time
import importlib
import mp_helpers
importlib.reload(mp_helpers)
from mp_helpers import normalize

# Test with small example first
test_data = [
    [2, 3, 4, 5],
    [6, 9, 10, 12],
    [11, 12, 13, 14],
    [21, 24, 25, 26],
    [100, 100, 100, 100],
]

print("=== Testing normalization function ===")
for i, row in enumerate(test_data):
    normalized = normalize(row)
    print(f"Row {i}: {row} → {[round(x, 3) for x in normalized]}")

print("\n" + "="*70)
print("PERFORMANCE COMPARISON WITH DIFFERENT DATASET SIZES")
print("="*70)

np.random.seed(42)
sizes = [
    ("Small", 100, 50),
    ("Medium", 1000, 50),
    ("Large", 10000, 50),
    ("XL", 50000, 100),
    ("XXL", 100000, 100),
]

results = {}

for size_name, rows, cols in sizes:
    print(f"\n--- {size_name} Dataset: {rows:,} rows × {cols} columns ---")

    # Generate random data
    data = np.random.randint(0, 1000, size=(rows, cols)).tolist()

    print(f"Running serial processing...")
    start = time.perf_counter()
    serial_results = [normalize(row) for row in data]
    serial_time = time.perf_counter() - start
    print(f"  Serial time: {serial_time:.4f} seconds")

    print(f"Running parallel processing ({mp.cpu_count()} cores)...")
    start = time.perf_counter()

    with mp.Pool(mp.cpu_count()) as pool:
        parallel_results = pool.map(normalize, data)

    parallel_time = time.perf_counter() - start
    print(f"  Parallel time: {parallel_time:.4f} seconds")

    # Calculate speedup
    if parallel_time > 0:
        speedup = serial_time / parallel_time
    else:
        speedup = 0

    print(f"  Speedup: {speedup:.2f}x")

    # Store results
    results[size_name] = {
        'serial_time': serial_time,
        'parallel_time': parallel_time,
        'speedup': speedup,
        'rows': rows,
        'cols': cols
    }

    # Store XXL results for submission
    if size_name == "XXL":
        prob3_answer = parallel_results[:5]

# Summary table
print("\n" + "="*70)
print("PERFORMANCE SUMMARY")
print("="*70)
print(f"{'Dataset':<10} {'Rows':>8} {'Cols':>6} {'Serial (s)':>12} {'Parallel (s)':>12} {'Speedup':>10}")
print("-"*70)
for size_name in ['Small', 'Medium', 'Large', 'XL', 'XXL']:
    stats = results[size_name]
    print(f"{size_name:<10} {stats['rows']:>8,} {stats['cols']:>6} {stats['serial_time']:>12.4f} {stats['parallel_time']:>12.4f} {stats['speedup']:>10.2f}x")

print("\n" + "="*70)
print("INSIGHTS")
print("="*70)
print("1. The normalization function works correctly (tested).")
print("2. Serial processing time increases linearly with dataset size.")
print("3. Parallel processing shows speedup for larger datasets.")
print("4. Small datasets may not benefit due to overhead.")
print("5. The pattern matches the theory of parallel processing.")

print(f"\nprob3_answer (first 5 rows of XXL):")
for i, row in enumerate(prob3_answer[:5]):
    print(f"  Row {i}: First 5 values = {[round(x, 3) for x in row[:3]]}...")


=== Testing normalization function ===
Row 0: [2, 3, 4, 5] → [0.0, 0.333, 0.667, 1.0]
Row 1: [6, 9, 10, 12] → [0.0, 0.5, 0.667, 1.0]
Row 2: [11, 12, 13, 14] → [0.0, 0.333, 0.667, 1.0]
Row 3: [21, 24, 25, 26] → [0.0, 0.6, 0.8, 1.0]
Row 4: [100, 100, 100, 100] → [0.0, 0.0, 0.0, 0.0]

PERFORMANCE COMPARISON WITH DIFFERENT DATASET SIZES

--- Small Dataset: 100 rows × 50 columns ---
Running serial processing...
  Serial time: 0.0058 seconds
Running parallel processing (10 cores)...
  Parallel time: 0.2657 seconds
  Speedup: 0.02x

--- Medium Dataset: 1,000 rows × 50 columns ---
Running serial processing...
  Serial time: 0.0039 seconds
Running parallel processing (10 cores)...
  Parallel time: 0.0986 seconds
  Speedup: 0.04x

--- Large Dataset: 10,000 rows × 50 columns ---
Running serial processing...
  Serial time: 0.0396 seconds
Running parallel processing (10 cores)...
  Parallel time: 0.1744 seconds
  Speedup: 0.23x

--- XL Dataset: 50,000 rows × 100 columns ---
Running serial processin

## **Save your analytics results to a json object - then add, commit, and push your notebook and json to GitHub!**

In [30]:
import json
import socket
from typing import Dict
from pathlib import Path

# Make sure all variables are defined
print("Checking variables...")
try:
    print(f"prob1_answer: {prob1_answer}")
    print(f"prob2_part1_answer: {prob2_part1_answer}")
    print(f"prob2_part2_answer: {prob2_part2_answer}")
    print(f"prob2_part3_answer: {prob2_part3_answer}")
    print(f"prob3_answer (first row): {prob3_answer[0][:3]}...")  # Show first 3 values
    print(f"Hostname: {socket.gethostname()}")
except NameError as e:
    print(f"ERROR: Variable not defined: {e}")
    print("Please run the problem cells first!")
else:
    # Create the answers dictionary
    answers: Dict = dict(
        prob1=str(prob1_answer),
        prob2=dict(part1=prob2_part1_answer, part2=prob2_part2_answer, part3=prob2_part3_answer),
        prob3=str(prob3_answer),
        host=socket.gethostname()
    )

    # Write to file
    Path("soln.json").write_text(json.dumps(answers, indent=2))
    print("Results saved to soln.json")

    # Show preview
    print("\nPreview of soln.json:")
    print(json.dumps(answers, indent=2))

Checking variables...
prob1_answer: [[2, 3], [6], [11, 12], [21]]
prob2_part1_answer: 13.154151582999475
prob2_part2_answer: 6.12153812500037
prob2_part3_answer: 6.074820375000854
prob3_answer (first row): [0.2295247724974722, 0.042467138523761376, 0.9706774519716885]...
Hostname: MacBook-Pro-2.local
Results saved to soln.json

Preview of soln.json:
{
  "prob1": "[[2, 3], [6], [11, 12], [21]]",
  "prob2": {
    "part1": 13.154151582999475,
    "part2": 6.12153812500037,
    "part3": 6.074820375000854
  },
  "prob3": "[[0.2295247724974722, 0.042467138523761376, 0.9706774519716885, 0.6531850353892821, 0.42669362992922144, 0.980788675429727, 0.224469160768453, 0.3640040444893832, 0.5813953488372093, 0.05965621840242669, 0.29120323559150657, 0.4903943377148635, 0.23761375126390294, 0.9464105156723963, 0.7765419615773509, 0.8220424671385238, 0.4833164812942366, 0.7785642062689585, 0.5631951466127402, 0.6238624873609707, 0.3872598584428716, 0.7664307381193124, 0.0, 0.8665318503538928, 0.3781