# Day 3 - Parallel and High-Performance Python: Threads, AsyncIO, NumPy, Numba, GPUs

Welcome to Day 3 of the advanced Python course. Today we focus on **performance** and **parallelism**.

We will connect Python language features with how modern CPUs and GPUs execute code. The examples are oriented around physics and mechanical **size/length measurement** data (surface roughness, thickness, diameter measurements, repeated measurements, etc.).

## What we will cover today

- CPU vs I/O bound work and a high level mental model of performance
- The Global Interpreter Lock (GIL) and why it matters
- Threads, processes, and when to use which
- AsyncIO for I/O-bound tasks (simulated sensor queries)
- NumPy vectorized computations for numerical work
- Numba JIT compilation for accelerating pure Python loops
- A short overview of GPU tools: **cuDF**, **CuPy** and where they fit
- A look at Python 3.13 and the future: experimental JIT and optional GIL
- A complex end-to-end example: processing multiple measurement files in parallel

## Daily agenda and course flow

**09:00 - 10:30 (1h 30m)**  
- CPU vs I/O bound tasks, performance model
- GIL recap and impact on threading
- Threads for I/O-bound workloads

**10:30 - 10:45 (15m)**  
- Short break

**10:45 - 12:00 (1h 15m)**  
- Processes for CPU-bound workloads
- Intro to AsyncIO and simulated asynchronous measurements

**12:00 - 13:00 (1h)**  
- Lunch break

**13:00 - 14:45 (1h 45m)**  
- NumPy recap and vectorized computations for measurement data
- Practical NumPy vectorization patterns and exercises (physics themed)

**14:45 - 15:00 (15m)**  
- Short break

**15:00 - 16:30 (1h 30m)**  
- Numba JIT compilation for numerical loops
- Overview of GPU tools (cuDF, CuPy) and limitations
- Python 3.13: experimental JIT and optional GIL, and what it may mean
- Complex example: parallel processing of multiple measurement data sets

Throughout the day, we will mark good points to pause, ask questions, or take a sip of water. Try to roughly follow the timing so that we finish comfortably.

## Topic 1 - Performance mental model: CPU vs I/O, GIL recap

In real scientific and engineering work, **performance** usually means one of:

- How fast we can process a large dataset (throughput)
- How fast we get a single answer (latency)

Python performance is strongly influenced by:

- The speed of the underlying CPU or GPU
- Whether our code is **CPU-bound** or **I/O-bound**
- How much work we keep in **C-accelerated libraries** like NumPy
- The **Global Interpreter Lock (GIL)** in CPython

### CPU-bound vs I/O-bound

- **CPU-bound**: most time is spent doing computations on the CPU.
  - Example: computing statistics on millions of surface height samples.
- **I/O-bound**: most time is spent waiting for input/output.
  - Example: reading files from disk or waiting for a measurement device over the network.

### GIL recap

The CPython interpreter uses a **Global Interpreter Lock (GIL)**. Only one thread at a time can execute Python bytecode. This means:

- Multiple threads do **not** speed up CPU-bound pure Python code.
- Threads can still help with I/O-bound tasks, because while one thread waits for I/O, another can run.
- Libraries that release the GIL internally (NumPy, some C extensions) can still run in parallel in C.

For a deeper dive, see:

- CPython GIL overview: https://wiki.python.org/moin/GlobalInterpreterLock
- CPython implementation notes: https://docs.python.org/3/c-api/init.html


In [1]:
import time

def cpu_bound(n: int) -> int:
    """Fake CPU work: sum of squares up to n."""
    total = 0
    for i in range(n):
        total += i * i
    return total

def io_bound(delay: float) -> None:
    """Fake I/O: sleep to simulate waiting for a device or disk."""
    time.sleep(delay)

start = time.perf_counter()
cpu_bound(2_000_000)
cpu_time = time.perf_counter() - start

start = time.perf_counter()
io_bound(0.2)
io_time = time.perf_counter() - start

print(f"CPU-bound function took ~{cpu_time:.3f} s")
print(f"I/O-bound function took ~{io_time:.3f} s")

CPU-bound function took ~0.083 s
I/O-bound function took ~0.205 s


### üí™ Exercise (advanced): Rough timing experiment

In this exercise you will run a tiny timing experiment to get a feel for the numbers.

1. Use the provided `cpu_bound` function.
2. Call it with different values, for example `50_000`, `200_000`, `500_000`, and measure the time for each.
3. For timing, use `time.perf_counter()` like in the example.
4. Print the `n` value and the time for each run.

Do this in a simple, sequential loop. The goal is to get a feeling for how quickly Python loops slow down as `n` grows.

In [10]:
import time

# Advanced exercise starter
# TODO: run cpu_bound with several n values and measure the time.

def cpu_bound(n: int) -> int:
    total = 0
    for i in range(n):
        total += i * i
    return total
    
ns = [50000, 200000, 500000]

for n in ns:
    # call cpu bound and measure time 

    print(f"n={n}, elapsed={elapsed:.4f} s")

n=500000, elapsed=0.0344 s
n=2000000, elapsed=0.1341 s
n=5000000, elapsed=0.3160 s


In [3]:
# Solution

import time

def cpu_bound(n: int) -> int:
    total = 0
    for i in range(n):
        total += i * i
    return total

ns = [50_000, 200_000, 500_000]
for n in ns:
    start = time.perf_counter()
    _ = cpu_bound(n)
    elapsed = time.perf_counter() - start
    print(f"n={n}, elapsed={elapsed:.4f} s")

n=50000, elapsed=0.0027 s
n=200000, elapsed=0.0131 s
n=500000, elapsed=0.0326 s


## Topic 2 - Threads for I/O-bound tasks

Because of the GIL, Python threads are usually **not** a good solution for CPU-bound acceleration. But for I/O-bound tasks (waiting for files, network, measurement devices), threads can help hide latency.

Typical pattern in lab / measurement setups:

- You need to query multiple instruments or devices.
- Each device responds in, say, 100 ms.
- With sequential code, talking to 5 devices takes roughly 5 x 100 ms.
- With threads, you can send requests in parallel and total time can be close to 100 ms.

Python module: [`threading`](https://docs.python.org/3/library/threading.html)

Note that this does not break the GIL for CPU work, but it is very useful to manage multiple slow I/O operations.

In [9]:
import threading
import time
from random import uniform

def measure_sensor(sensor_id: int, delay: float) -> None:
    """Simulate a measurement: wait 'delay' seconds, then print a value."""
    time.sleep(delay)
    value = uniform(0.9, 1.1)  # normalized size measurement
    print(f"Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")

delays = [0.4, 0.3, 0.6, 0.2]

threads = []
start = time.perf_counter()
for i, d in enumerate(delays, start=1):
    t = threading.Thread(target=measure_sensor, args=(i, d))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

elapsed = time.perf_counter() - start
print(f"Total elapsed (threaded): {elapsed:.3f} s")

Sensor 4: value=1.0816, delay=0.20s
Sensor 2: value=0.9065, delay=0.30s
Sensor 1: value=1.0224, delay=0.40s
Sensor 3: value=1.0352, delay=0.60s
Total elapsed (threaded): 0.627 s


### ‚úè Exercise (easy): Compare sequential vs threaded measurements

Use the `measure_sensor` function idea to compare sequential and threaded execution.

1. Re-implement a simple `measure_sensor` that sleeps and prints a message.
2. First, call it sequentially in a loop for all delays.
3. Then, call it using `threading.Thread` like in the example.
4. Measure and print the elapsed time for both cases.

You can reuse the list `delays = [0.4, 0.3, 0.6, 0.2]`.

In [2]:
import threading
import time
from random import uniform

# TODO: implement sequential and threaded measurement and compare their times.

def measure_sensor(sensor_id: int, delay: float) -> None:
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")

delays = [0.4, 0.3, 0.6, 0.2]

print("Sequential Run")
start = time.perf_counter()

# TODO: Loop through 'delays' (use enumerate for sensor_id starting at 1).

seq_elapsed = time.perf_counter() - start
print(f"Sequential elapsed: {seq_elapsed:.3f} s")


print("\nThreaded Run")
threads = []
start = time.perf_counter()

# TODO: Loop through delays again.
#       1. Create a Thread instance for each measure_sensor.
#       2. Start the thread.
#       3. Append the thread to the threads list.

# TODO: Loop through the 'threads' list and call join() on each to wait for completion.

thr_elapsed = time.perf_counter() - start
print(f"Threaded elapsed: {thr_elapsed:.3f} s")

Sequential Run
Sequential elapsed: 0.000 s

Threaded Run
Threaded elapsed: 0.000 s


In [5]:
# Solution

import threading
import time
from random import uniform

def measure_sensor(sensor_id: int, delay: float) -> None:
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")

delays = [0.4, 0.3, 0.6, 0.2]

# Sequential
start = time.perf_counter()
for i, d in enumerate(delays, start=1):
    measure_sensor(i, d)
seq_elapsed = time.perf_counter() - start
print(f"Sequential elapsed: {seq_elapsed:.3f} s")

# Threaded
threads = []
start = time.perf_counter()
for i, d in enumerate(delays, start=1):
    t = threading.Thread(target=measure_sensor, args=(i, d))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
thr_elapsed = time.perf_counter() - start
print(f"Threaded elapsed: {thr_elapsed:.3f} s")

Sensor 1: value=1.0886, delay=0.40s
Sensor 2: value=0.9940, delay=0.30s
Sensor 3: value=0.9065, delay=0.60s
Sensor 4: value=1.0301, delay=0.20s
Sequential elapsed: 1.505 s
Sensor 4: value=1.0565, delay=0.20s
Sensor 2: value=0.9084, delay=0.30s
Sensor 1: value=0.9360, delay=0.40s
Sensor 3: value=1.0763, delay=0.60s
Threaded elapsed: 0.608 s


### üí™ Exercise (advanced): Collect results into a shared list

Right now, `measure_sensor` just prints values. In real life, we want to **store** results.

1. Modify `measure_sensor` so that it appends `(sensor_id, value)` to a shared list.
2. Use a `threading.Lock` to protect the shared list.
3. After all threads finish, print the collected list of results.

This pattern mimics collecting measurement results from multiple devices into a shared in-memory structure.

In [21]:
import threading
import time
from random import uniform

results = []
lock = threading.Lock()

def measure_and_store(sensor_id: int, delay: float) -> None:
    """TODO: simulate measurement and safely append to results."""
    # time.sleep(delay)
    # value = uniform(0.9, 1.1)

    # TODO: store values
    

delays = [0.4, 0.3, 0.6, 0.2]
threads = []
# TODO: start threads using measure_and_store and join them, then print results.
for i, d in enumerate(delays, start=1):
    t = threading.Thread(target=measure_and_store, args=(i, d))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(results)

[(4, 0.9803299653662977), (2, 0.9926811121605301), (1, 0.9047594144771355), (3, 0.9093203053674115)]


In [6]:
# Solution

import threading
import time
from random import uniform

results = []
lock = threading.Lock()

def measure_and_store(sensor_id: int, delay: float) -> None:
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    with lock:
        results.append((sensor_id, value))

delays = [0.4, 0.3, 0.6, 0.2]
threads = []
for i, d in enumerate(delays, start=1):
    t = threading.Thread(target=measure_and_store, args=(i, d))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Collected results:", results)

Collected results: [(4, 1.0242552744792301), (2, 0.9700888704122314), (1, 1.0925002232553185), (3, 0.9773592353806989)]


---
# Short break (10:30 - 10:45)
---

## Topic 3 - Processes for CPU-bound workloads

Because of the GIL, threads do not speed up CPU-bound pure Python loops. For heavy numerical work in pure Python, we can use **processes** instead of threads.

Python module: [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html)

A **process** has its own Python interpreter and its own GIL, so multiple processes can truly run in parallel on multiple CPU cores.

Typical pattern in scientific computing:

- Split a large dataset into chunks (e.g. surface height maps for different samples).
- Spawn a pool of worker processes.
- Each process computes statistics for one chunk.
- Collect results in the main process.

Downside: processes have more overhead than threads (especially for sending large arrays between processes), but they allow true CPU-core scaling for pure Python loops.

In [3]:
# RUN FROM .py INSTEAD OF NOTEBOOK

from multiprocessing import Pool, cpu_count
import math
import random
import time

def compute_rms(values):
    """Compute RMS roughness of a list of heights."""
    s = 0.0
    for x in values:
        s += x * x
    return math.sqrt(s / len(values))

def main():
    # Create fake measurement data: 4 samples with 100_000 points each
    samples = [[random.uniform(-1e-6, 1e-6) for _ in range(100_000)] for _ in range(4)]

    print(f"Using up to {cpu_count()} CPU cores")
    start = time.perf_counter()
    with Pool() as pool:
        rms_values = pool.map(compute_rms, samples)
    elapsed = time.perf_counter() - start
    print(f"Total elapsed (threaded): {elapsed:.3f} s")
    
    print("RMS roughness per sample:")
    for i, rms in enumerate(rms_values, start=1):
        print(f"  Sample {i}: {rms:.3e} m")

if __name__ == "__main__":
    main()


Using up to 10 CPU cores


Process SpawnPoolWorker-1:
Traceback (most recent call last):
Process SpawnPoolWorker-5:
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/opt/homebrew/Cellar/python@3.13/3.13.7/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/queues.py", line 387, in get
    return _ForkingPickler.loads(res)
           ~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get attribute 'compute_rms' on <module '__main__' (<class '_froz

KeyboardInterrupt: 

### ‚úè Exercise (easy): Parallel average diameter per batch

Imagine you have several batches of diameter measurements for different parts. Each batch is a list of floats.

1. Implement a function `average(values)` that computes the mean of the list.
2. Create a list of batches (for example 3-5 lists with random diameters in millimeters).
3. Use `multiprocessing.Pool.map` to compute the average diameter for each batch in parallel.
4. Print the result for each batch.

Keep the batch sizes small enough that the code finishes quickly.

In [None]:
# RUN FROM .py INSTEAD OF NOTEBOOK

from multiprocessing import Pool
import random

# TODO: compute average diameter per batch in parallel.

def average(values):
    # return ...

# Create example batches of diameters in mm
batches = [
    [random.uniform(9.95, 10.05) for _ in range(50)],
    [random.uniform(4.95, 5.05) for _ in range(80)],
    [random.uniform(19.9, 20.1) for _ in range(100)],
]

def main():
    # with Pool() as pool:
    #     averages = ...
    # for i, avg in enumerate(averages, start=1):
    #     print(f"Batch {i}: average diameter = {avg:.3f} mm")

if __name__ == "__main__":
    main()


In [None]:
# Solution RUN FROM .py INSTEAD OF NOTEBOOK

from multiprocessing import Pool
import random

def average(values):
    total = 0.0
    for v in values:
        total += v
    return total / len(values)

batches = [
    [random.uniform(9.95, 10.05) for _ in range(50)],
    [random.uniform(4.95, 5.05) for _ in range(80)],
    [random.uniform(19.9, 20.1) for _ in range(100)],
]

def main():
    with Pool() as pool:
        averages = pool.map(average, batches)
    
    for i, avg in enumerate(averages, start=1):
        print(f"Batch {i}: average diameter = {avg:.3f} mm")

if __name__ == "__main__":
    main()


### üí™ Exercise (advanced): Parallel min, max, mean

Extend the previous idea to compute **three** statistics per batch: `(min, max, mean)`.

1. Write a function `stats(values)` that returns a tuple `(min_value, max_value, mean_value)`.
2. Reuse or create batches of diameter or thickness measurements.
3. Use a process pool to compute stats for each batch in parallel.
4. Print the three statistics for each batch in a readable way.

This is close to what you would actually do with per-sample measurement datasets.

In [None]:
# RUN FROM .py INSTEAD OF NOTEBOOK

from multiprocessing import Pool
import random

def stats(values):
    """TODO: return (min, max, mean) for the list."""
    # m = min(values)
    # M = max(values)
    # mean = ...
    # return m, M, mean
    m = min(values)
    M = max(values)
    total = 0.0
    for v in values:
        total += v
    mean = total / len(values)
    return m, M, mean

batches = [
    [random.uniform(9.95, 10.05) for _ in range(50)],
    [random.uniform(4.95, 5.05) for _ in range(80)],
    [random.uniform(19.9, 20.1) for _ in range(100)],
]

def main():
    # TODO: use Pool to compute stats in parallel and print them.

if __name__ == "__main__":
    main()


In [None]:
# RUN FROM .py INSTEAD OF NOTEBOOK

from multiprocessing import Pool
import random

def stats(values):
    m = min(values)
    M = max(values)
    total = 0.0
    for v in values:
        total += v
    mean = total / len(values)
    return m, M, mean

batches = [
    [random.uniform(9.95, 10.05) for _ in range(50)],
    [random.uniform(4.95, 5.05) for _ in range(80)],
    [random.uniform(19.9, 20.1) for _ in range(100)],
]

def main():
    with Pool() as pool:
        results = pool.map(stats, batches)
    
    for i, (m, M, mean) in enumerate(results, start=1):
        print(f"Batch {i}: min={m:.3f}, max={M:.3f}, mean={mean:.3f}")

if __name__ == "__main__":
    main()


## Topic 4 - AsyncIO for concurrent I/O

Python's [`asyncio`](https://docs.python.org/3/library/asyncio.html) module lets you write **concurrent** programs using the `async` / `await` syntax.

Unlike `threading` or `multiprocessing`, AsyncIO usually runs **in a single OS thread**. It uses an **event loop** that rapidly switches between many "tasks" (coroutines) whenever they are waiting for I/O.

### Key concepts

- **Coroutine**: a function defined with `async def`. Calling it does **not** run it, it returns a coroutine object (a bit like creating a `Task` in LabVIEW or a job in a scheduler).
  ```python
  async def measure():
      ...
  c = measure()  # coroutine object, not a result yet
  ```
- **Event loop**: a scheduler that runs coroutines. In scripts you usually start it with `asyncio.run(main())`. In Jupyter, the notebook already runs an event loop for you, so you can use `await` directly at the top level.
- **`await`**: suspends the current coroutine until the awaited operation is finished, and lets the event loop run other tasks in the meantime.
  - You can only use `await` **inside** `async def` functions (or at the top level in special environments like Jupyter).
  - This is like saying "I am waiting for the ADC / network / disk - while I wait, please do something else".

### When does AsyncIO help?

AsyncIO is ideal when your program is **I/O bound** and spends most of its time waiting:

- Many concurrent HTTP requests (web scraping, microservices).
- Talking to many instruments over TCP/serial/USB at once.
- File or database operations that frequently wait on the OS.

It does **not** make pure Python CPU loops faster. The Global Interpreter Lock (GIL) still limits CPU bound code. For heavy numeric work you usually use:

- **NumPy / vectorization** (day 3 topic),
- **C / C++ extensions**, or
- **multiprocessing** to use multiple cores.

### How does control flow work?

Think of the event loop as a conductor for an orchestra of coroutines:

1. You define `async def` coroutines.
2. You create tasks from them (for example with `asyncio.create_task`).
3. The event loop picks a task, runs it until it hits an `await` on something that is not yet ready (e.g. `await asyncio.sleep(0.5)` or an async network call).
4. At that `await`, the coroutine "pauses", the loop switches to another ready task.
5. When the awaited I/O operation completes, the loop resumes the paused coroutine right after the `await`.

Because tasks yield control at `await`, many of them can **make progress in overlapping time** - even though there is a single underlying OS thread.

### Frequently asked questions

**Q: Why can I not `await` in a normal function?**  
Because the Python grammar only allows `await` inside `async def` (or in special interactive environments). A normal `def` function does not know how to suspend and resume itself. If you want to use `await`, you must either:
- make the function `async def`, or
- call an async function from the **outside** using `asyncio.run(my_async_main())` in a script.

**Q: When is the event loop actually running?**  
- In a **script**, when you call `asyncio.run(main())` (which internally creates and runs an event loop).
- In **Jupyter**, there is already a loop running; the notebook integrates with it so `await` works at the top level. That is why you can simply do `await main()` in a notebook cell.

**Q: How do I actually get real benefits?**  
1. Identify I/O operations that can run in parallel (waiting for TCP sockets, serial ports, HTTP, `asyncio.sleep`, etc.).
2. Wrap them in `async def` coroutines that `await` the I/O.
3. Start many tasks with `asyncio.create_task` or `asyncio.gather`.
4. Let the event loop interleave their waiting periods.

Below we compare sequential vs async measurements to see the effect in practice.

**Further reading:**
- Official docs: https://docs.python.org/3/library/asyncio.html
- "Async IO in Python: A Complete Walkthrough" (Real Python): https://realpython.com/async-io-python/


In [5]:
import asyncio
import time
from random import uniform

# Synchronous version: measure each sensor one after the other

def measure_sensor_sync(sensor_id: int, delay: float) -> float:
    """Simulate a blocking sensor measurement using time.sleep."""
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"[sync ] Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")
    return value

# Async version: non-blocking wait with asyncio.sleep

async def measure_sensor_async(sensor_id: int, delay: float) -> float:
    """Async measurement: uses await asyncio.sleep instead of time.sleep."""
    await asyncio.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"[async] Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")
    return value

async def main_compare_sync_vs_async() -> None:
    delays = [0.4, 0.3, 0.6, 0.2]

    print("\n--- Sequential sync measurements ---")
    start_sync = time.perf_counter()
    sync_values = [measure_sensor_sync(i, d) for i, d in enumerate(delays, start=1)]
    elapsed_sync = time.perf_counter() - start_sync
    print(f"Sync total elapsed: {elapsed_sync:.3f} s")

    print("\n--- Concurrent async measurements ---")
    start_async = time.perf_counter()
    # Create tasks for all sensors
    tasks = [asyncio.create_task(measure_sensor_async(i, d))
             for i, d in enumerate(delays, start=1)]
    # Wait for all tasks to finish
    async_values = await asyncio.gather(*tasks)
    elapsed_async = time.perf_counter() - start_async
    print(f"Async total elapsed: {elapsed_async:.3f} s")

    print("\nSync values: ", [f"{v:.4f}" for v in sync_values])
    print("Async values:", [f"{v:.4f}" for v in async_values])
    print("Max individual delay:", max(delays), "s")

# In a Jupyter notebook you can run the async main with top-level await:
await main_compare_sync_vs_async()


--- Sequential sync measurements ---
[sync ] Sensor 1: value=0.9644, delay=0.40s
[sync ] Sensor 2: value=1.0050, delay=0.30s
[sync ] Sensor 3: value=0.9401, delay=0.60s
[sync ] Sensor 4: value=1.0310, delay=0.20s
Sync total elapsed: 1.508 s

--- Concurrent async measurements ---
[async] Sensor 4: value=0.9038, delay=0.20s
[async] Sensor 2: value=0.9559, delay=0.30s
[async] Sensor 1: value=0.9855, delay=0.40s
[async] Sensor 3: value=0.9426, delay=0.60s
Async total elapsed: 0.602 s

Sync values:  ['0.9644', '1.0050', '0.9401', '1.0310']
Async values: ['0.9855', '0.9559', '0.9426', '0.9038']
Max individual delay: 0.6 s


In [7]:
import asyncio
import time
from random import uniform

async def calculate_sync():
    print("Start normal sleep")
    start = time.perf_counter()
    time.sleep(10) # :(
    print(f"Normal sleep took {(time.perf_counter()-start):.4f}s")

async def sleep_async():
    print("Start async sleep")
    start = time.perf_counter()
    await asyncio.sleep(1)
    print(f"Async sleep took {(time.perf_counter()-start):.4f}s")

async def main_compare_sync_vs_async() -> None:
    tasks = [
        asyncio.create_task(sleep_async()),
        asyncio.create_task(calculate_sync())
    ]
    await asyncio.gather(*tasks)

# In a Jupyter notebook you can run the async main with top-level await:
await main_compare_sync_vs_async()

Start async sleep
Start normal sleep
Normal sleep took 10.0050s
Async sleep took 10.0058s


### ‚úè Exercise (easy): Measure async speedup factor

Using the example above as a starting point:

1. Copy the idea of `measure_sensor_async` and `asyncio.gather` into a new async function, for example `async_measure_many_sensors()`.
2. Use a list of delays like `[0.1, 0.5, 0.2, 0.8, 0.3]`.
3. Measure
   - how long it would take to run all measurements **sequentially** (with `time.sleep`), and
   - how long it takes to run them **concurrently** with AsyncIO.
4. Print the **speedup factor** as `sync_time / async_time`.

You already saw everything you need:
- `time.perf_counter()` for timing,
- `asyncio.create_task` + `asyncio.gather` for running tasks concurrently,
- `await` inside `async def`.

Think about why the async time is close to the **maximum** delay instead of the **sum** of delays.

In [4]:
import asyncio
import time
from random import uniform

# You can reuse or adapt these building blocks
def measure_sensor_sync(sensor_id: int, delay: float) -> float:
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"[sync ] Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")
    return value

async def measure_sensor_async(sensor_id: int, delay: float) -> float:
    await asyncio.sleep(delay)
    value = uniform(0.9, 1.1)
    print(f"[async] Sensor {sensor_id}: value={value:.4f}, delay={delay:.2f}s")
    return value

async def async_measure_many_sensors() -> None:
    delays = [0.1, 0.5, 0.2, 0.8, 0.3]

    # TODO:
    # start = ...
    for i, d in enumerate(delays):
        measure_sensor_sync(i, d)
    # t1 = ...
    
    print(f"Sync time: {t1:.4f}s")
    
    # 2) Measure async time by creating tasks and awaiting asyncio.gather.
    # start = ...
    # tasks = [asyncio.create_task(...)
    #          for i, d in enumerate(delays, start=1)]
    # await asyncio.gather(*tasks)
    # t2 = ...
    
    # Print both times and the speedup factor (sync_time / async_time).
    print(f"Async time: {t2:.4f}s")
    print(f"Speedup factor: {t1 / t2:.4f}")

# TODO: run async_measure_many_sensors from this cell using await.


[sync ] Sensor 0: value=0.9168, delay=0.10s
[sync ] Sensor 1: value=0.9389, delay=0.50s
[sync ] Sensor 2: value=1.0828, delay=0.20s
[sync ] Sensor 3: value=1.0502, delay=0.80s
[sync ] Sensor 4: value=0.9927, delay=0.30s
Sync time: 1.9079s
[async] Sensor 1: value=1.0963, delay=0.10s
[async] Sensor 3: value=1.0537, delay=0.20s
[async] Sensor 5: value=1.0066, delay=0.30s
[async] Sensor 2: value=0.9508, delay=0.50s
[async] Sensor 4: value=1.0150, delay=0.80s
Async time: 0.8012s
Speedup factor: 2.3812


In [3]:
# Example solution for the easy async speedup exercise

import asyncio
import time
from random import uniform


def measure_sensor_sync(sensor_id: int, delay: float) -> float:
    time.sleep(delay)
    value = uniform(0.9, 1.1)
    return value

async def measure_sensor_async(sensor_id: int, delay: float) -> float:
    await asyncio.sleep(delay)
    value = uniform(0.9, 1.1)
    return value

async def async_measure_many_sensors() -> None:
    delays = [0.1, 0.5, 0.2, 0.8, 0.3]

    # Sequential measurements
    start_sync = time.perf_counter()
    sync_values = [measure_sensor_sync(i, d) for i, d in enumerate(delays, start=1)]
    elapsed_sync = time.perf_counter() - start_sync

    # Async concurrent measurements
    start_async = time.perf_counter()
    tasks = [asyncio.create_task(measure_sensor_async(i, d))
             for i, d in enumerate(delays, start=1)]
    async_values = await asyncio.gather(*tasks)
    elapsed_async = time.perf_counter() - start_async

    speedup = elapsed_sync / elapsed_async if elapsed_async > 0 else float("inf")

    print(f"Sync elapsed:  {elapsed_sync:.3f} s")
    print(f"Async elapsed: {elapsed_async:.3f} s")
    print(f"Speedup:       {speedup:.2f}x")
    print("Sync values: ", [f"{v:.4f}" for v in sync_values])
    print("Async values:", [f"{v:.4f}" for v in async_values])

await async_measure_many_sensors()

Sync elapsed:  1.904 s
Async elapsed: 0.807 s
Speedup:       2.36x
Sync values:  ['1.0394', '1.0554', '0.9323', '0.9256', '0.9803']
Async values: ['0.9262', '1.0007', '1.0895', '0.9597', '0.9325']


---
# Lunch break (12:00 - 13:00)
---

## Topic 5 - NumPy recap and basic vectorization

[NumPy](https://numpy.org/) is the standard array library for numerical computing in Python.

Key ideas:

- A `numpy.ndarray` is an efficient, typed, homogeneous n-dimensional array.
- Many operations are implemented in optimized C code.
- Operations like `a + b`, `a * b` on arrays are **vectorized**: they run in fast loops in C rather than Python.

In measurement and physics workflows, NumPy is ideal for:

- Handling long arrays of measured values (heights, diameters, thicknesses, voltages).
- Computing statistics and transformations (offset correction, unit conversion, normalization).
- Applying elementwise functions (e.g. calibration curves, non-linear corrections).

Make sure you have NumPy installed. In this course environment it should already be available.

In [4]:
import numpy as np

# Thickness measurements in micrometers for 10 samples
thickness_um = np.array([100.2, 99.8, 100.5, 100.1, 99.9, 100.3, 100.0, 99.7, 100.4, 100.1])
print("Raw thickness (um):", thickness_um)
print("Shape:", thickness_um.shape, "dtype:", thickness_um.dtype)

# Convert to millimeters
thickness_mm = thickness_um / 1000.0
print("Thickness (mm):", thickness_mm)

# Basic statistics
print("Mean (um):", thickness_um.mean())
print("Std (um):", thickness_um.std())

Raw thickness (um): [100.2  99.8 100.5 100.1  99.9 100.3 100.   99.7 100.4 100.1]
Shape: (10,) dtype: float64
Thickness (mm): [0.1002 0.0998 0.1005 0.1001 0.0999 0.1003 0.1    0.0997 0.1004 0.1001]
Mean (um): 100.1
Std (um): 0.24494897427831783


### ‚úè Exercise (easy): Diameter conversion and simple statistics

1. Create a NumPy array of diameters in millimeters for at least 8 parts.
2. Convert them to micrometers (multiply by 1000).
3. Compute and print the mean and standard deviation in micrometers.

Use the same patterns as in the example above.

In [9]:
import numpy as np

# TODO: diameter conversion and basic statistics with NumPy.

# diam_mm = 
# diam_um = 
# mean_um = 
# std_um = 
print("Diameters (um):", diam_um)
print("Mean (um):", mean_um)
print("Std (um):", std_um)

Diameters (um): [1000. 1000. 1000. ... 1000. 1000. 1000.]
Mean (um): 1000.0
Std (um): 0.0


In [5]:
# Solution

import numpy as np

diam_mm = np.array([10.01, 9.99, 10.02, 10.00, 10.03, 9.98, 10.01, 9.97])
diam_um = diam_mm * 1000.0
mean_um = diam_um.mean()
std_um = diam_um.std()
print("Diameters (um):", diam_um)
print("Mean (um):", mean_um)
print("Std (um):", std_um)

Diameters (um): [10010.  9990. 10020. 10000. 10030.  9980. 10010.  9970.]
Mean (um): 10001.25
Std (um): 18.99835519196333


### üí™ Exercise (advanced): Offset correction and normalized values

Imagine your thickness sensor has a calibration offset error of +0.3 micrometers.

1. Create an array of measured thicknesses in micrometers.
2. Subtract the offset from all values to get corrected thicknesses.
3. Compute the mean and standard deviation of the corrected values.
4. Compute a normalized array where you subtract the mean and divide by the standard deviation.

This pattern (offset correction + normalization) is common in data preprocessing.

In [None]:
import numpy as np

# TODO: offset correction and normalization.
# thickness_um = np.array([100.2, 100.0, 99.9, 100.4, 100.1, 99.8])
# offset = 0.3
# corrected = ...  # subtract offset
# mean_corr = ...
# std_corr = ...
# normalized = ...  # (corrected - mean_corr) / std_corr
# print("Corrected thickness (um):", corrected)
# print("Normalized values:", normalized)

In [6]:
# Solution

import numpy as np

thickness_um = np.array([100.2, 100.0, 99.9, 100.4, 100.1, 99.8])
offset = 0.3
corrected = thickness_um - offset
mean_corr = corrected.mean()
std_corr = corrected.std()
normalized = (corrected - mean_corr) / std_corr
print("Corrected thickness (um):", corrected)
print("Normalized values:", normalized)

Corrected thickness (um): [ 99.9  99.7  99.6 100.1  99.8  99.5]
Normalized values: [ 0.6761234  -0.3380617  -0.84515425  1.69030851  0.16903085 -1.35224681]


## Topic 6 - NumPy vectorized computations in practice

Now we go deeper into NumPy vectorization. We will use:

- Elementwise operations on arrays
- Boolean masks and filtering
- Aggregations along axes (2D arrays)
- Simple physics-themed computations

NumPy allows you to write **array expressions** instead of Python `for` loops. These expressions are executed in optimized C loops under the hood.

Useful links:

- NumPy user guide: https://numpy.org/doc/stable/user/index.html
- NumPy quickstart: https://numpy.org/doc/stable/user/quickstart.html


In [7]:
import numpy as np

# Example: filter diameters by tolerance
diam_mm = np.array([10.01, 9.97, 10.05, 10.02, 9.94, 10.00, 9.99])
target = 10.00
tolerance = 0.03

deviation = diam_mm - target
mask_in_spec = np.abs(deviation) <= tolerance
print("Diameters:", diam_mm)
print("Deviation:", deviation)
print("In spec mask:", mask_in_spec)
print("In spec diameters:", diam_mm[mask_in_spec])

Diameters: [10.01  9.97 10.05 10.02  9.94 10.    9.99]
Deviation: [ 0.01 -0.03  0.05  0.02 -0.06  0.   -0.01]
In spec mask: [ True  True False  True False  True  True]
In spec diameters: [10.01  9.97 10.02 10.    9.99]


### ‚úè Exercise (easy): Filter out-of-spec samples

1. Create a NumPy array of length measurements in millimeters.
2. Define a target and tolerance.
3. Build a mask of samples that are **out of spec** (absolute deviation larger than tolerance).
4. Print the array of out-of-spec values and their count.

Use boolean masks like in the example above.

In [None]:
import numpy as np

# TODO: filter out-of-spec length values using a boolean mask.

# lengths_mm = np.array([49.98, 50.02, 49.95, 50.04, 50.01, 49.92])
# target = ...
# tolerance = ...
# deviation = ...
# mask_out = ...  # abs(deviation) > tolerance
# print("Out-of-spec values:", lengths_mm[mask_out])
# print("Count:", mask_out.sum())

In [8]:
# Solution

import numpy as np

lengths_mm = np.array([49.98, 50.02, 49.95, 50.04, 50.01, 49.92])
target = 50.00
tolerance = 0.03
deviation = lengths_mm - target
mask_out = np.abs(deviation) > tolerance
print("Out-of-spec values:", lengths_mm[mask_out])
print("Count:", int(mask_out.sum()))

Out-of-spec values: [49.95 50.04 49.92]
Count: 3


### üí™ Exercise (advanced): Repeated measurements per part (2D arrays)

Suppose you measure the same part multiple times to estimate uncertainty.

1. Create a 2D NumPy array `data` of shape `(n_parts, n_repeats)` containing diameters in mm.
2. Compute the mean diameter **per part** (axis 1).
3. Compute the standard deviation per part (axis 1).
4. Compute the overall mean of all measurements.
5. Print the per-part mean and standard deviation.

You should use `data.mean(axis=1)` and `data.std(axis=1)`.

In [None]:
import numpy as np

# TODO: repeated measurement statistics with 2D arrays.

# Example: 4 parts, 5 repeated measurements each
# data = np.array([
#     [10.01, 9.99, 10.00, 10.02, 9.98],
#     [4.99, 5.01, 5.00, 5.02, 4.98],
#     [19.98, 20.00, 20.01, 19.99, 20.02],
#     [29.99, 30.01, 30.00, 30.02, 29.98],
# ])

# mean_per_part = ...  # data.mean(axis=1)
# std_per_part = ...   # data.std(axis=1)
# overall_mean = ...   # data.mean()

# print("Mean per part:", mean_per_part)
# print("Std per part:", std_per_part)
# print("Overall mean:", overall_mean)

In [16]:
# Solution

import numpy as np

data = np.array([
    [10.01, 9.99, 10.00, 10.02, 9.98],
    [4.99, 5.01, 5.00, 5.02, 4.98],
    [19.98, 20.00, 20.01, 19.99, 20.02],
    [29.99, 30.01, 30.00, 30.02, 29.98],
])

mean_per_part = data.mean(axis=1)
std_per_part = data.std(axis=1)
overall_mean = data.mean()

print("Mean per part:", mean_per_part)
print("Std per part:", std_per_part)
print("Overall mean:", overall_mean)

Mean per part: [10.  5. 20. 30.]
Std per part: [0.01414214 0.01414214 0.01414214 0.01414214]
Overall mean: 16.25


### Vectorized physics-style computation example

As a small example, imagine you measured heights `h` at given positions `x` along a line, and you want to approximate the area under the curve using the trapezoidal rule.

The trapezoidal rule for arrays `x` and `h` can be written as:

`area ‚âà sum( (h[i] + h[i+1]) / 2 * (x[i+1] - x[i]) )`

We can implement this with pure NumPy, using slicing, without explicit Python loops.

In [18]:
import numpy as np

# Simulate positions in mm and heights in micrometers
x = np.linspace(0.0, 10.0, 1001)  # 0..10 mm, 1001 points
h_um = 2.0 * np.sin(2 * np.pi * x / 10.0) + 10.0  # some periodic height pattern in um

# Trapezoidal rule using vectorized slices
dx = x[1:] - x[:-1]
h_avg = (h_um[1:] + h_um[:-1]) / 2.0
area_um_mm = np.sum(h_avg * dx)  # units: um * mm

print(f"Approximate area (um*mm): {area_um_mm:.3f}")

Approximate area (um*mm): 100.000


### üí™ Exercise (advanced - optional): Chain of vectorized operations

Combine several vectorized operations into a small pipeline:

1. Simulate an array of thicknesses in micrometers with `np.random.normal`.
2. Apply an offset correction.
3. Clip the values to a realistic range using `np.clip`.
4. Convert to millimeters.
5. Compute and print the mean and standard deviation in both units.

Do not use explicit Python loops. Use NumPy array operations only.

In [None]:
import numpy as np

# TODO: implement the vectorized processing pipeline.

# n = 1000
# thickness_um = np.random.normal(loc=100.0, scale=0.5, size=n)
# offset = 0.2
# corrected = ...
# clipped = ...  # np.clip
# thickness_mm = ...
# print statistics

In [11]:
# Solution

import numpy as np

n = 1000
thickness_um = np.random.normal(loc=100.0, scale=0.5, size=n)
offset = 0.2
corrected = thickness_um - offset
clipped = np.clip(corrected, 98.0, 102.0)
thickness_mm = clipped / 1000.0

print("Mean (um):", clipped.mean())
print("Std (um):", clipped.std())
print("Mean (mm):", thickness_mm.mean())
print("Std (mm):", thickness_mm.std())

Mean (um): 99.80521127498606
Std (um): 0.5115783054404365
Mean (mm): 0.09980521127498607
Std (mm): 0.0005115783054404363


---
# Short break (14:45 - 15:00)

Final stretch: Numba, GPUs, Python 3.13, and a complex example.
---

## Topic 7 - Numba JIT compilation

[Numba](https://numba.pydata.org/) is a Just-In-Time (JIT) compiler for Python functions that operate mainly on NumPy arrays and numbers.

Basic usage:

```python
from numba import njit

@njit
def f(x):
    # numerical code
    ...
```

When you first call `f`, Numba compiles it to machine code (using LLVM). Subsequent calls run at near-C speed.

### Important: how Numba and NumPy interact

- NumPy operations like `a + b` or `a.mean()` are already implemented in C.
- However, **Python loops around those operations** still run in the Python interpreter.
- Numba helps most when you have **custom loops and logic** that cannot be expressed as simple NumPy expressions.

**Answering the question:** "If Numba works with NumPy + Python code, and NumPy is already implemented in C, how does Numba JIT help?"

- NumPy is fast for each individual operation, but if you write Python like:
  - `for i in range(n): result[i] = complex_expression(a[i], b[i])`
  - this loop runs in Python and pays Python overhead per iteration.
- Numba compiles the **whole loop** (and the operations inside it) into one optimized machine code function.
- This removes Python overhead and can fuse several operations into one pass.

In practice, combine them:

- Use NumPy vectorization where it is natural.
- Use Numba for custom numeric kernels that are hard to write as a single NumPy expression.

In [19]:
!pip install numba

Collecting numba
  Downloading numba-0.63.1-cp313-cp313-macosx_12_0_arm64.whl.metadata (2.9 kB)
Collecting llvmlite<0.47,>=0.46.0dev0 (from numba)
  Downloading llvmlite-0.46.0-cp313-cp313-macosx_12_0_arm64.whl.metadata (5.0 kB)
Collecting numpy<2.4,>=1.22 (from numba)
  Using cached numpy-2.3.5-cp313-cp313-macosx_14_0_arm64.whl.metadata (62 kB)
Downloading numba-0.63.1-cp313-cp313-macosx_12_0_arm64.whl (2.7 MB)
[2K   [38;2;114;156;31m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.7/2.7 MB[0m [31m32.2 MB/s[0m  [33m0:00:00[0m
[?25hDownloading llvmlite-0.46.0-cp313-cp313-macosx_12_0_arm64.whl (37.2 MB)
[2K   [38;2;114;156;31m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m37.2/37.2 MB[0m [31m51.9 MB/s[0m  [33m0:00:00[0m [31m52.8 MB/s[0m eta [36m0:00:01[0m
[?25hUsing cached numpy-2.3.5-cp313-cp313-macosx_

In [1]:
import numpy as np
import math
import time

try:
    from numba import njit
except ImportError:
    njit = None
    print("Numba is not installed in this environment. The examples will fall back to pure Python.")

def rms_python(arr: np.ndarray) -> float:
    s = 0.0
    n = arr.size
    for i in range(n):
        x = float(arr[i])
        s += x * x
    return math.sqrt(s / n)

if njit is not None:
    @njit
    def rms_numba(arr):
        s = 0.0
        n = arr.size
        for i in range(n):
            x = arr[i]
            s += x * x
        return math.sqrt(s / n)
else:
    rms_numba = None

# Test on a large array
arr = np.random.normal(loc=0.0, scale=1.0, size=1_000_00)

# Python version
start = time.perf_counter()
r1 = rms_python(arr)
t_python = time.perf_counter() - start
print(f"Python RMS: {r1:.6f}, time={t_python:.4f} s")

if rms_numba is not None:
    # First call includes compilation time
    start = time.perf_counter()
    r2 = rms_numba(arr)
    t_first = time.perf_counter() - start
    # Second call is fast
    start = time.perf_counter()
    r3 = rms_numba(arr)
    t_numba = time.perf_counter() - start
    print(f"Numba RMS first call: {r2:.6f}, time={t_first:.4f} s (includes compile)")
    print(f"Numba RMS second call: {r3:.6f}, time={t_numba:.4f} s")

Python RMS: 1.000134, time=0.0116 s
Numba RMS first call: 1.000134, time=0.4065 s (includes compile)
Numba RMS second call: 1.000134, time=0.0001 s


### ‚úè Exercise (easy): Numba-accelerated difference of squares

1. Implement a function `diff_squares_python(a, b)` that for each element computes `a[i]**2 - b[i]**2`.
2. Time it on large NumPy arrays.
3. If Numba is available, implement `diff_squares_numba` with `@njit`.
4. Compare the timings.

Make sure you do not create new Python lists inside the function. Work directly with NumPy arrays.

In [2]:
import numpy as np
import time

try:
    from numba import njit
except ImportError:
    njit = None
    print("Numba not available - you can still implement the pure Python version.")

def diff_squares_python(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    n = a.size
    out = np.empty_like(a)
    # for i ...
    return out

if njit is not None:
    @njit
    def diff_squares_numba(a, b):
        n = a.size
        out = np.empty_like(a)
        for i # ... (same logic as above)
        return out

# a = np.random.normal(size=200_000)
# b = np.random.normal(size=200_000)
# start = time.perf_counter()
# out_py = diff_squares_python(a, b)
# t_py = time.perf_counter() - start
# print(f"Python time: {t_py:.4f} s")

# if njit is not None:
#     start = time.perf_counter()
#     out_nb1 = diff_squares_numba(a, b)
#     t_nb1 = time.perf_counter() - start
#     start = time.perf_counter()
#     out_nb2 = diff_squares_numba(a, b)
#     t_nb2 = time.perf_counter() - start
#     print(f"Numba first call: {t_nb1:.4f} s, second call: {t_nb2:.4f} s")

SyntaxError: invalid syntax (417586745.py, line 21)

In [5]:
# Solution

import numpy as np
import time

try:
    from numba import njit
except ImportError:
    njit = None
    print("Numba not available - only Python version will run.")

def diff_squares_python(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    n = a.size
    out = np.empty_like(a)
    for i in range(n):
        out[i] = a[i] * a[i] - b[i] * b[i]
    return out

if njit is not None:
    @njit
    def diff_squares_numba(a, b):
        n = a.size
        out = np.empty_like(a)
        for i in range(n):
            out[i] = a[i] * a[i] - b[i] * b[i]
        return out

a = np.random.normal(size=200_000)
b = np.random.normal(size=200_000)

start = time.perf_counter()
out_py = diff_squares_python(a, b)
t_py = time.perf_counter() - start
print(f"Python time: {t_py:.4f} s")

if njit is not None:
    start = time.perf_counter()
    out_nb1 = diff_squares_numba(a, b)
    t_nb1 = time.perf_counter() - start
    start = time.perf_counter()
    out_nb2 = diff_squares_numba(a, b)
    t_nb2 = time.perf_counter() - start
    print(f"Numba first call: {t_nb1:.4f} s, second call: {t_nb2:.4f} s")

Python time: 0.1173 s
Numba first call: 0.5256 s, second call: 0.0005 s


### üí™ Exercise (advanced): Custom statistic with Numba

Define a custom function that is harder to express with pure NumPy:

1. Implement `moving_rms_python(arr, window)` that computes RMS roughness over a sliding window.
   - For each position `i`, compute RMS of `arr[i : i+window]`.
2. If Numba is available, implement `moving_rms_numba` using `@njit`.
3. Compare performance on a large array.

This is a common pattern when analyzing profiles from surface measurement devices.

In [None]:
import numpy as np
import math
import time

try:
    from numba import njit
except ImportError:
    njit = None
    print("Numba not available - you can still implement the Python version.")

def moving_rms_python(arr: np.ndarray, window: int) -> np.ndarray:
    """TODO: pure Python moving RMS."""
    n = arr.size
    out = np.empty(n - window + 1, dtype=float)
    for i in range(n - window + 1):
        sum_squares = 0.0

        # For each position i, compute RMS of arr[i : i+window].
        for j in range(window):
            # ...
        out[i] = math.sqrt(sum_squares / window)
    return out

if njit is not None:
    @njit
    def moving_rms_numba(arr, window):
        n = arr.size
        out = np.empty(n - window + 1, dtype=float)
        for i in range(n - window + 1):
            sum_squares = 0.0
    
            # For each position i, compute RMS of arr[i : i+window].
            for j in range(window):
                # ... # Same logic as above
            out[i] = math.sqrt(sum_squares / window)
        return out

# arr = np.random.normal(size=200_000)
# window = 50
# start = time.perf_counter()
# r_py = moving_rms_python(arr, window)
# t_py = time.perf_counter() - start
# print(f"Python moving RMS time: {t_py:.4f} s")

# if njit is not None:
#     start = time.perf_counter()
#     r_nb1 = moving_rms_numba(arr, window)
#     t_nb1 = time.perf_counter() - start
#     start = time.perf_counter()
#     r_nb2 = moving_rms_numba(arr, window)
#     t_nb2 = time.perf_counter() - start
#     print(f"Numba first call: {t_nb1:.4f} s, second call: {t_nb2:.4f} s")

In [3]:
# Solution

import numpy as np
import math
import time

try:
    from numba import njit
except ImportError:
    njit = None
    print("Numba not available - only Python version will run.")

def moving_rms_python(arr: np.ndarray, window: int) -> np.ndarray:
    n = arr.size
    out = np.empty(n - window + 1, dtype=float)
    for i in range(n - window + 1):
        s = 0.0
        for j in range(window):
            x = float(arr[i + j])
            s += x * x
        out[i] = math.sqrt(s / window)
    return out

if njit is not None:
    @njit
    def moving_rms_numba(arr, window):
        n = arr.size
        out = np.empty(n - window + 1, dtype=np.float64)
        for i in range(n - window + 1):
            s = 0.0
            for j in range(window):
                x = arr[i + j]
                s += x * x
            out[i] = math.sqrt(s / window)
        return out

arr = np.random.normal(size=200_000)
window = 50

start = time.perf_counter()
r_py = moving_rms_python(arr, window)
t_py = time.perf_counter() - start
print(f"Python moving RMS time: {t_py:.4f} s")

if njit is not None:
    start = time.perf_counter()
    r_nb1 = moving_rms_numba(arr, window)
    t_nb1 = time.perf_counter() - start
    start = time.perf_counter()
    r_nb2 = moving_rms_numba(arr, window)
    t_nb2 = time.perf_counter() - start
    print(f"Numba first call: {t_nb1:.4f} s, second call: {t_nb2:.4f} s")

Python moving RMS time: 0.5878 s
Numba first call: 0.0707 s, second call: 0.0017 s


---

## Examples of when you cannot / should not use numba

In [4]:
# Code using arbitrary dictionaries 

from numba import njit

@njit
def count_keys(d):
    total = 0
    for k in d:
        if len(k) > 3:
            total += 1
    return total

data = {"short": 1, "verylongkey": 2, "x": 3}
print(count_keys(data))


TypingError: Failed in nopython mode pipeline (step: nopython frontend)
non-precise type pyobject
During: typing of argument at /var/folders/fr/gyc8hsnd1834rb308w5cy65r0000gn/T/ipykernel_13631/3817865517.py (5)

File "../../../../../../../var/folders/fr/gyc8hsnd1834rb308w5cy65r0000gn/T/ipykernel_13631/3817865517.py", line 5:
<source missing, REPL/exec in use?>

During: Pass nopython_type_inference 

This error may have been caused by the following argument(s):
- argument 0: Cannot determine Numba type of <class 'dict'>


In [5]:
# Pandas / high-level libraries

import pandas as pd
from numba import njit

@njit
def transform(df):
    df["z"] = df["x"] + df["y"]  # Pandas Series ops
    return df

df = pd.DataFrame({"x": range(10_000), "y": range(10_000)})
transform(df)


TypingError: Failed in nopython mode pipeline (step: nopython frontend)
non-precise type pyobject
During: typing of argument at /var/folders/fr/gyc8hsnd1834rb308w5cy65r0000gn/T/ipykernel_13631/4283451336.py (6)

File "../../../../../../../var/folders/fr/gyc8hsnd1834rb308w5cy65r0000gn/T/ipykernel_13631/4283451336.py", line 6:
<source missing, REPL/exec in use?>

During: Pass nopython_type_inference 

This error may have been caused by the following argument(s):
- argument 0: Cannot determine Numba type of <class 'pandas.DataFrame'>


In [8]:
# I/O-bound or non-numeric code

from numba import njit

@njit
def copy_file(in_path, out_path):
    with open(in_path, "r") as fin, open(out_path, "w") as fout:
        for line in fin:
            fout.write(line.upper())

copy_file("test.txt", "test2.txt")

UnsupportedBytecodeError: The 'with (context manager) as (variable):' construct is not supported.. Raised from None

In [14]:
# Numba accelerates tight numeric loops on simple typed arrays; for object-heavy code (like strings, dicts, Python lists), it can be neutral or flat-out worse than pure Python

import time
from numba import jit

def slow_py(xs):
    s = 0
    for x in xs:
        s += len(str(x))  # string conversion, non-numeric
    return s

@njit
def slow_numba(xs):
    s = 0
    for x in xs:
        s += len(str(x))
    return s

xs = list(range(100_000))

# Warmup
slow_py(xs)
slow_numba(xs)  # triggers compilation

def bench(fn, xs, n=5):
    best = float("inf")
    for _ in range(n):
        t0 = time.perf_counter()
        fn(xs)
        dt = time.perf_counter() - t0
        best = min(best, dt)
    return best

print("Python best:", bench(slow_py, xs))
print("Numba best:", bench(slow_numba, xs))

Python best: 0.0038002079963916913
Numba best: 0.036472667001362424


In [13]:
# Tiny function call overhead: Numba loses

import time
from numba import njit

@njit
def f_numba(x):
    return x + 1

def f_py(x):
    return x + 1

def bench(fn, n=5_000_000):
    t0 = time.perf_counter()
    s = 0
    for i in range(n):
        s += fn(i)
    dt = time.perf_counter() - t0
    return dt, s

# Warm up Numba compile
f_numba(0)

t_py, s_py = bench(f_py)
t_numba, s_numba = bench(f_numba)

print("Python:", t_py, s_py)
print("Numba:", t_numba, s_numba)

Python: 0.1890889580026851 12500002500000
Numba: 0.318271708994871 12500002500000


## Topic 8 - GPU acceleration overview: cuDF and CuPy

For very large datasets and heavy numerical work, GPUs can be useful.

Two popular libraries in the Python ecosystem:

- [CuPy](https://cupy.dev/):
  - NumPy-like interface for arrays stored on a CUDA GPU.
  - Many functions mirror the NumPy API (`cupy.array`, `cupy.mean`, etc.).
- [cuDF](https://docs.rapids.ai/api/cudf/stable/):
  - Part of the RAPIDS ecosystem: https://rapids.ai/
  - Pandas-like DataFrame library running on the GPU.

In many cases, the workflow is:

- Move large arrays or tables to the GPU once.
- Perform many operations there.
- Move reduced results (e.g. aggregates) back to the CPU.

In this course environment, GPUs might not be available, so the examples below are illustrative only. Do not worry if they raise `ImportError` - that is expected on a CPU-only machine.

In [15]:
import time
import numpy as np

N = 1_000_000  # size of the array

print(f"Array size: {N:,}")

# ---------------- CPU: NumPy ----------------
t0 = time.perf_counter()
a_cpu = np.random.normal(size=N)
mean_cpu = a_cpu.mean()
std_cpu = a_cpu.std()
t1 = time.perf_counter()
cpu_time = t1 - t0

print(f"[NumPy / CPU]  mean={mean_cpu:.5f}, std={std_cpu:.5f}, time={cpu_time:.4f} s")

# ---------------- GPU: CuPy (if available) ----------------
try:
    import cupy as cp
    print("CuPy version:", cp.__version__)

    t0 = time.perf_counter()
    a_gpu = cp.random.normal(size=N)
    mean_gpu = a_gpu.mean()
    std_gpu = a_gpu.std()
    t1 = time.perf_counter()
    gpu_time = t1 - t0

    print(f"[CuPy / GPU]  mean={float(mean_gpu):.5f}, std={float(std_gpu):.5f}, time={gpu_time:.4f} s")
    print(f"Speedup (CPU time / GPU time): {cpu_time / gpu_time:.2f}x")

except ImportError:
    print("CuPy is not installed. This example is for illustration only.")


Array size: 1,000,000
[NumPy / CPU]  mean=-0.00046, std=1.00085, time=0.0252 s
CuPy is not installed. This example is for illustration only.


## Topic 9 - Python 3.13 and the future: experimental JIT and optional GIL

Recent and upcoming CPython releases are adding major performance features:

- **Python 3.13** (released in 2024) includes:
  - An experimental **free-threaded build** (optional no-GIL mode). See PEP 703.
  - An experimental **JIT compiler** (PEP 744) that can speed up some workloads.
  - These features are **off by default** and require special builds / flags.
- Future versions (3.14 and beyond) are expected to improve JIT performance and evolve the no-GIL story.

What this means for you in the medium term:

- Well-written numeric Python might become faster without you changing code.
- True multi-threaded CPU-bound Python code may become possible without having to use `multiprocessing`.
- Libraries like Numba, cuDF, CuPy, and others will likely evolve to take advantage of new capabilities.

Official resources:

- What's new in Python 3.13: https://docs.python.org/3/whatsnew/3.13.html
- PEP 703 (optional GIL): https://peps.python.org/pep-0703/
- PEP 744 (JIT): https://peps.python.org/pep-0744/

For now, you should still learn threads, processes, AsyncIO, NumPy, and Numba - these skills remain valuable regardless of how the interpreter evolves.

In [57]:
import sys

print("Running Python version:", sys.version)
print("Executable:", sys.executable)
print("Note: in standard CPython 3.13+, JIT and no-GIL builds are optional and may need explicit enabling.")

Running Python version: 3.13.7 (main, Sep  2 2025, 14:16:00) [MSC v.1944 64 bit (AMD64)]
Executable: C:\Users\gregk\Desktop\winpython\WPy64-31700\python\python.exe
Note: in standard CPython 3.13+, JIT and no-GIL builds are optional and may need explicit enabling.


## Comparing parallel strategies and numeric backends

Up to now we have seen several different ways to "go faster":

- `threading` for overlapping I/O waits
- `multiprocessing` for real parallel CPU work
- `asyncio` for cooperative, single-threaded concurrency
- `numpy` for vectorized C loops
- `numba` for JIT-compiling Python loops
- GPU libraries like `cupy` (if available)

This knowledge bit puts them **side by side** in a single script and measures them on small, controlled benchmarks. The goal is not to find the "fastest ever" configuration, but to build a **mental model**:

- When does the **GIL** block speedups?
- When do threads help, and when are they useless?
- When do processes help?
- When do NumPy and Numba "escape" the GIL and scale across cores, even with threads?
- What happens if you misuse `asyncio` and call `time.sleep` instead of `await asyncio.sleep`?
- Where do GPUs fit in if `cupy` is available?

### Architecture recap and what we measure

**1. Threads vs GIL**

- Python threads share a single interpreter and a single **Global Interpreter Lock (GIL)**.
- For **CPU-bound pure Python loops**, only one thread can run Python bytecode at a time.
  - Expect: threads do **not** speed up CPU-bound Python loops.
- For **I/O-bound work** (`time.sleep`, network, disk), threads can overlap waiting.
  - Expect: threads can reduce total wall-clock time for many I/O waits.

**2. Processes and `multiprocessing.Pool`**

- Each process has its **own interpreter and its own GIL**.
- CPU-bound work can truly run in parallel on multiple cores.
- Downside: data must be serialized and sent between processes (overhead).
- Expect: good speedup for heavy CPU-bound loops, as long as data is not too huge or passed too often.

**3. `asyncio` and cooperative concurrency**

- `asyncio` runs a single OS thread with an **event loop**.
- Coroutines `async def ...` yield control when they hit `await` on something that is not ready (e.g. `await asyncio.sleep(0.3)` or network I/O).
- If you accidentally use `time.sleep` inside `async def`, you block the entire event loop.
- Expect:
  - `asyncio` + `await asyncio.sleep` -> total time close to the **max** delay.
  - `asyncio` + `time.sleep` -> behaves like slow sequential code.

**4. NumPy: vectorized C loops, GIL released**

- Elementwise and reduction operations in NumPy are implemented in C.
- During these C loops, NumPy usually **releases the GIL**, so other threads can run in parallel C code.
- Expect:
  - Pure Python loop vs NumPy: NumPy should win for large arrays.
  - Running several independent NumPy operations in threads can scale across cores, because they are not fighting over the GIL most of the time.

Docs: https://numpy.org/doc/stable/  

**5. Numba: JIT for custom numeric kernels**

- `numba.njit` compiles numeric Python functions (using only numbers and arrays) to machine code via LLVM.
- In "nopython" mode, the compiled code **releases the GIL**.
- You can call the JIT-compiled function from several Python threads and get real parallel speedups.
- Expect:
  - Numba vs pure Python: Numba should be much faster for large numeric loops.
  - Numba in threads: multiple JIT functions can run in parallel on multiple cores.

Docs: https://numba.pydata.org/  

**6. CuPy: NumPy-like arrays on the GPU**

- `cupy` mirrors a large part of the NumPy API but executes on a CUDA GPU.
- Moving data to/from GPU has a cost, but for large arrays and heavy operations the GPU can be much faster.
- In this script, CuPy is **optional**:
  - If `cupy` is not installed or no GPU is present, the script simply prints a notice and skips those parts.
  - If it is available, we compare Python vs NumPy vs CuPy.

Docs: https://cupy.dev/  

### Overview table: when to expect speedups

| Scenario                                      | Tool / approach                     | GIL released?            | When to expect speedup                                               | Typical use case                                                                 |
|-----------------------------------------------|-------------------------------------|--------------------------|------------------------------------------------------------------------|----------------------------------------------------------------------------------|
| CPU-bound pure Python loop                    | `threading`                         | No                       | Almost never; threads fight over the GIL                              | Do **not** use threads to speed up pure Python CPU work                          |
| I/O-bound waits (sleep, network, disk)        | `threading`                         | Not relevant for waiting | Yes, if many independent I/O waits                                    | Query many instruments, HTTP endpoints, or files in parallel                     |
| CPU-bound pure Python loop                    | `multiprocessing.Pool`              | Each process has own GIL | Yes, if per-task work is heavy enough to amortize process overhead    | Heavy numeric loops on independent chunks of data                                |
| I/O-bound tasks with async APIs               | `asyncio` + `await`                 | Single-threaded          | Yes, many small I/O tasks can overlap                                 | High-concurrency network clients, async DB calls, many small I/O operations      |
| I/O-bound but using `time.sleep` in async     | `asyncio` + `time.sleep` (bug)      | Single-threaded          | No, behaves like slow sequential code                                 | Anti-pattern: what *not* to do                                                   |
| Vectorized numeric operations on arrays       | `numpy` (single-thread)             | Yes (inside NumPy)       | Large arrays, heavy elementwise/reduction operations                  | Numeric computing, physics data processing                                      |
| Independent NumPy operations in several tasks | `numpy` + `threading`               | Yes                      | Yes, if each NumPy call is heavy and independent                      | Multiple independent signals or measurement channels in parallel                 |
| Custom numeric loops on arrays                | `numba.njit`                        | Yes                      | Yes, especially if loops are complex and large                        | Custom kernels that are hard to express as simple NumPy expressions              |
| Numba in several threads                      | `numba.njit` + `threading`          | Yes                      | Yes, if each JIT kernel call is heavy enough and data is independent  | Parallel processing of many independent profiles or signals                      |
| Very large arrays, heavy elementwise ops      | `cupy` on GPU                       | N/A for GIL              | Yes, if data transfer is amortized and GPU is available               | Large-scale numerical work, ML preprocessing, array-heavy simulations            |


In [None]:
# RUN FROM A .py FILE (NOT DIRECTLY IN JUPYTER) FOR RELIABLE MULTIPROCESSING
#
# Example:
#   python compare_parallel_backends.py
#
# This script compares:
# - threading vs sequential for CPU-bound and I/O-bound tasks
# - multiprocessing.Pool for CPU-bound work
# - asyncio: correct (await asyncio.sleep) vs incorrect (time.sleep)
# - NumPy vs pure Python, and NumPy in threads (GIL released)
# - Numba vs pure Python, and Numba in threads (GIL released)
# - Optional CuPy vs NumPy vs pure Python (if CuPy is installed)

import time
import math
import sys
import threading
import asyncio
from multiprocessing import Pool, cpu_count
from typing import List

import numpy as np

# Optional imports
try:
    from numba import njit
except ImportError:
    njit = None

try:
    import cupy as cp
except ImportError:
    cp = None


# -------------------------------
# Utility helpers
# -------------------------------

def print_section(title: str) -> None:
    print("\n")
    print("=" * 80)
    print(title)
    print("=" * 80)


def print_result(label: str, seconds: float) -> None:
    print(f"{label:<45} {seconds:8.3f} s")


# -------------------------------
# 1. CPU-bound and I/O-bound basic functions
# -------------------------------

def cpu_bound_python(n: int) -> float:
    """Simple CPU-bound loop: sum of squares."""
    s = 0.0
    for i in range(n):
        s += i * i
    return math.sqrt(s)


def io_bound_sleep(delay: float) -> None:
    """Simple I/O-bound stand-in: sleep to simulate waiting."""
    time.sleep(delay)


# For multiprocessing we need top-level worker functions

def cpu_worker(n: int) -> float:
    """Worker function for multiprocessing."""
    return cpu_bound_python(n)


# -------------------------------
# 2. Threading: CPU-bound vs I/O-bound
# -------------------------------

def benchmark_threading() -> None:
    print_section("1) Threading: CPU-bound vs I/O-bound")

    num_tasks_cpu = min(4, max(2, cpu_count()))
    num_tasks_io = num_tasks_cpu

    n_per_task = 300_000  # Adjust if this is too slow/fast
    delays = [0.4, 0.3, 0.6, 0.2][:num_tasks_io]

    # CPU-bound: sequential
    print("CPU-bound Python loop (sum of squares)")
    start = time.perf_counter()
    for _ in range(num_tasks_cpu):
        cpu_bound_python(n_per_task)
    t_seq = time.perf_counter() - start
    print_result("Sequential (CPU-bound, pure Python)", t_seq)

    # CPU-bound: threaded
    threads: List[threading.Thread] = []
    start = time.perf_counter()
    for _ in range(num_tasks_cpu):
        t = threading.Thread(target=cpu_bound_python, args=(n_per_task,))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t_thr = time.perf_counter() - start
    print_result("Threaded (CPU-bound, pure Python)", t_thr)

    if t_thr > 0:
        print(f"Speedup (threaded / sequential)          {t_seq / t_thr:8.3f} x")
    print("Expected: little or no speedup because of the GIL for CPU-bound Python.\n")

    # I/O-bound: sequential
    print("I/O-bound simulation (sleep)")
    start = time.perf_counter()
    for d in delays:
        io_bound_sleep(d)
    t_io_seq = time.perf_counter() - start
    print_result("Sequential (I/O-bound)", t_io_seq)

    # I/O-bound: threaded
    threads = []
    start = time.perf_counter()
    for d in delays:
        t = threading.Thread(target=io_bound_sleep, args=(d,))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t_io_thr = time.perf_counter() - start
    print_result("Threaded (I/O-bound)", t_io_thr)
    if t_io_thr > 0:
        print(f"Speedup (threaded / sequential)          {t_io_seq / t_io_thr:8.3f} x")
    print("Expected: near speedup by approx sum(delays) / max(delays).")


# -------------------------------
# 3. Multiprocessing for CPU-bound work
# -------------------------------

def benchmark_multiprocessing() -> None:
    print_section("2) Multiprocessing.Pool for CPU-bound work")

    num_tasks = min(4, max(2, cpu_count()))
    n_per_task = 5_000_000

    print(f"Using up to {num_tasks} processes out of {cpu_count()} CPU cores.")

    # Sequential
    start = time.perf_counter()
    for _ in range(num_tasks):
        cpu_worker(n_per_task)
    t_seq = time.perf_counter() - start
    print_result("Sequential (CPU-bound, pure Python)", t_seq)

    # Multiprocessing
    start = time.perf_counter()
    with Pool(processes=num_tasks) as pool:
        pool.map(cpu_worker, [n_per_task] * num_tasks)
    t_mp = time.perf_counter() - start
    print_result("Multiprocessing Pool (CPU-bound)", t_mp)

    if t_mp > 0:
        print(f"Speedup (multiprocessing / sequential)   {t_seq / t_mp:8.3f} x")
    print("Expected: clear speedup if per-task work is large enough.")


# -------------------------------
# 4. asyncio: correct vs incorrect sleeping
# -------------------------------

async def async_measure_blocking(delay: float) -> float:
    """Bad async: uses time.sleep, blocks event loop."""
    time.sleep(delay)  # This blocks the whole event loop
    return delay


async def async_measure_proper(delay: float) -> float:
    """Good async: uses asyncio.sleep, yields control."""
    await asyncio.sleep(delay)
    return delay


async def _async_benchmark_inner() -> None:
    delays = [0.1, 0.5, 0.2, 0.8, 0.3]

    print_section("3) asyncio: blocking vs non-blocking usage")

    # Bad case: async functions using time.sleep
    start = time.perf_counter()
    tasks = [asyncio.create_task(async_measure_blocking(d)) for d in delays]
    await asyncio.gather(*tasks)
    t_blocking = time.perf_counter() - start
    print_result("asyncio + time.sleep (anti-pattern)", t_blocking)

    # Good case: async functions using asyncio.sleep
    start = time.perf_counter()
    tasks = [asyncio.create_task(async_measure_proper(d)) for d in delays]
    await asyncio.gather(*tasks)
    t_proper = time.perf_counter() - start
    print_result("asyncio + await asyncio.sleep()", t_proper)

    if t_proper > 0:
        print(f"Speedup (proper / blocking)             {t_blocking / t_proper:8.3f} x")
    print("Expected: proper async time ~ max(delay), blocking async time ~ sum(delays).")


def benchmark_asyncio() -> None:
    # Run the async benchmark in a fresh event loop
    asyncio.run(_async_benchmark_inner())


# -------------------------------
# 5. NumPy vs pure Python, and NumPy in threads
# -------------------------------

def cpu_numpy(arr: np.ndarray) -> float:
    # Simple computation that forces a pass over the data
    return float(np.sqrt(np.sum(arr * arr)))


def cpu_python_from_array(arr: np.ndarray) -> float:
    s = 0.0
    # iterate over a Python list copy to emphasize Python loop overhead
    for x in arr.tolist():
        s += x * x
    return math.sqrt(s)


def benchmark_numpy() -> None:
    print_section("4) NumPy vs pure Python, and NumPy in threads")

    n = 10_000_000
    num_tasks = min(4, max(2, cpu_count()))
    print(f"Using array size n = {n}, tasks = {num_tasks}")

    base_arr = np.random.normal(size=n)

    # Pure Python
    start = time.perf_counter()
    cpu_python_from_array(base_arr)
    t_py = time.perf_counter() - start
    print_result("Pure Python loop (single task)", t_py)

    # NumPy single task
    start = time.perf_counter()
    cpu_numpy(base_arr)
    t_np = time.perf_counter() - start
    print_result("NumPy vectorized (single task)", t_np)

    # NumPy sequential multi-task
    arrays = [np.random.normal(size=n) for _ in range(num_tasks)]
    start = time.perf_counter()
    for a in arrays:
        cpu_numpy(a)
    t_np_seq = time.perf_counter() - start
    print_result("NumPy multi-task (sequential)", t_np_seq)

    # NumPy threaded multi-task
    threads: List[threading.Thread] = []
    start = time.perf_counter()
    for a in arrays:
        t = threading.Thread(target=cpu_numpy, args=(a,))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t_np_thr = time.perf_counter() - start
    print_result("NumPy multi-task (threaded)", t_np_thr)

    if t_np_thr > 0:
        print(f"Speedup (threaded / sequential)         {t_np_seq / t_np_thr:8.3f} x")
    print("Expected: NumPy should be much faster than pure Python,")
    print("and multiple independent NumPy calls can scale with threads because NumPy releases the GIL.")


# -------------------------------
# 6. Numba vs pure Python, and Numba in threads
# -------------------------------

if njit is not None:
    def cpu_python(n) -> float:
        s = 0.0
        for i in range(n):
            s += i * i
        return math.sqrt(s)
    
    @njit(nogil=True)
    def cpu_numba(n: int) -> float:
        s = 0.0
        for i in range(n):
            s += i * i
        return math.sqrt(s)
else:
    cpu_numba = None  # type: ignore


def benchmark_numba() -> None:
    print_section("5) Numba vs pure Python, and Numba in threads")

    if cpu_numba is None:
        print("Numba is not installed. Skipping Numba benchmarks.")
        return

    n = 10_000_000
    num_tasks = min(4, max(2, cpu_count()))

    # Pure Python
    start = time.perf_counter()
    cpu_python(n)
    t_py = time.perf_counter() - start
    print_result("Pure Python loop (single task)", t_py)

    # Numba single task: first call (includes JIT compile)
    start = time.perf_counter()
    cpu_numba(n)
    t_nb_first = time.perf_counter() - start
    print_result("Numba single task (first call - compile)", t_nb_first)

    # Numba single task: second call (fast)
    start = time.perf_counter()
    cpu_numba(n)
    t_nb = time.perf_counter() - start
    print_result("Numba single task (warm)", t_nb)

    arrays = [n for _ in range(num_tasks)]

    # Numba sequential multi-task
    start = time.perf_counter()
    for a in arrays:
        cpu_numba(a)
    t_nb_seq = time.perf_counter() - start
    print_result("Numba multi-task (sequential)", t_nb_seq)

    # Numba threaded multi-task
    threads: List[threading.Thread] = []
    start = time.perf_counter()
    for a in arrays:
        t = threading.Thread(target=cpu_numba, args=(a,))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t_nb_thr = time.perf_counter() - start
    print_result("Numba multi-task (threaded)", t_nb_thr)

    if t_nb > 0:
        print(f"Speedup (Numba / pure Python, single)   {t_py / t_nb:8.3f} x")
    if t_nb_thr > 0:
        print(f"Speedup (threaded / sequential, Numba)  {t_nb_seq / t_nb_thr:8.3f} x")
    print("Expected: Numba should drastically beat pure Python for large arrays,")
    print("and multiple Numba kernels can run in parallel threads because Numba releases the GIL.")


# -------------------------------
# 7. Optional CuPy vs NumPy: heavy pairwise distances
# -------------------------------

def benchmark_cupy() -> None:
    print_section("6) CuPy vs NumPy vs pure Python (optional GPU)")

    if cp is None:
        print("CuPy is not installed or no GPU is available. Skipping CuPy benchmark.")
        return

    # Size and iterations chosen so that:
    # - NumPy is clearly faster than pure Python
    # - CuPy has enough math work to amortize GPU overhead and usually win
    n = 20_000_000
    iters = 10
    print(f"Array size for this benchmark: n = {n}, iterations = {iters}")

    # -------------------------------
    # Pure Python baseline (simple work)
    # -------------------------------
    arr_py = np.random.normal(size=n)

    start = time.perf_counter()
    cpu_python_from_array(arr_py)
    t_py = time.perf_counter() - start
    print_result("Pure Python loop (CPU)", t_py)

    # -------------------------------
    # Heavy NumPy compute
    # -------------------------------
    arr_np = np.random.normal(size=n).astype(np.float32)

    def heavy_numpy(x: np.ndarray, iters: int) -> float:
        # Repeat a non-trivial math kernel several times
        for _ in range(iters):
            x = np.sin(x) + np.cos(x) * 1.000001 + np.sqrt(x * x + 0.1234)
        return float(x.mean())

    start = time.perf_counter()
    mean_np = heavy_numpy(arr_np, iters)
    t_np = time.perf_counter() - start
    print_result("NumPy (CPU, heavy trig loop)", t_np)

    # -------------------------------
    # Heavy CuPy compute (GPU)
    # -------------------------------
    # Transfer once, then ONLY measure math-heavy GPU work.
    arr_gpu = cp.asarray(arr_np)

    def heavy_cupy(x: "cp.ndarray", iters: int) -> float:
        for _ in range(iters):
            x = cp.sin(x) + cp.cos(x) * 1.000001 + cp.sqrt(x * x + 0.1234)
        return float(x.mean())  # brings back a single scalar

    # Just to see the copy cost, not included in t_gpu:
    start_copy = time.perf_counter()
    _ = cp.asarray(arr_np)
    t_copy = time.perf_counter() - start_copy
    print_result("Host -> GPU copy (one large array)", t_copy)

    # Now time only the compute-heavy kernel
    start = time.perf_counter()
    mean_gpu = heavy_cupy(arr_gpu, iters)
    t_gpu = time.perf_counter() - start
    print_result("CuPy (GPU, heavy trig loop)", t_gpu)

    # -------------------------------
    # Speedups
    # -------------------------------
    if t_np > 0:
        print(f"Speedup (NumPy / Python) {t_py / t_np:8.3f} x")
    if t_gpu > 0:
        print(f"Speedup (CuPy compute / NumPy compute)    {t_np / t_gpu:8.3f} x")

    print("Note: Here the GPU does repeated trig and sqrt operations,")
    print("so the work is compute-heavy rather than copy-heavy.")
    print("On a decent GPU, CuPy should clearly beat NumPy on the heavy loop above.")

# -------------------------------
# Main entry point
# -------------------------------

def main() -> None:
    print("Python executable:", sys.executable)
    print("Python version:   ", sys.version.split()[0])
    print("Detected CPU cores:", cpu_count())
    print("Numba available:", bool(njit))
    print("CuPy available:", cp is not None)
    print()

    benchmark_threading()
    benchmark_multiprocessing()
    benchmark_asyncio()
    benchmark_numpy()
    benchmark_numba()
    benchmark_cupy()

    print("\nAll benchmarks finished.")


if __name__ == "__main__":
    main()


## Topic 10 - Complex example: parallel processing of measurement datasets

In this final example we combine multiple ideas from today:

- NumPy arrays and vectorized computations
- Numba JIT for a custom numeric kernel (optional)
- `multiprocessing` to process multiple independent datasets in parallel

### Scenario

You have several measurement files from a surface profiler. Each file contains a 1D height profile (in micrometers). For each profile, you want to:

1. Load the data (in this notebook we will just simulate it).
2. Apply an offset correction (subtract mean).
3. Compute RMS roughness and peak-to-valley height (max - min).
4. Return a small summary dictionary.

Then, for many profiles (e.g. 8 or 16), you want to process them in parallel using multiple CPU cores.

We will build:

- A pure NumPy summary function.
- Optionally a Numba-accelerated variant.
- A small wrapper that can be used with `multiprocessing.Pool.map`.

Your task is to fill in the missing parts.

In [None]:
# RUN FROM .py INSTEAD OF NOTEBOOK

import numpy as np
import math
from multiprocessing import Pool

try:
    from numba import njit
except ImportError:
    njit = None

def simulate_profile(n_points: int = 50_000) -> np.ndarray:
    """Simulate a 1D surface profile in micrometers."""
    x = np.linspace(0.0, 10.0, n_points)
    base = 5.0 * np.sin(2 * np.pi * x / 5.0)
    noise = np.random.normal(loc=0.0, scale=0.5, size=n_points)
    return base + noise

def summarize_profile_numpy(profile: np.ndarray) -> dict:
    """TODO: center profile and compute RMS and peak-to-valley using NumPy only."""
    # mean = ...
    # centered = ...
    # rms = ...
    # ptv = ...
    # return {"mean": float(mean), "rms": float(rms), "ptv": float(ptv)}
    mean = profile.mean()
    centered = profile - mean
    rms = math.sqrt((centered * centered).mean())
    ptv = float(centered.max() - centered.min())
    return {"mean": float(mean), "rms": float(rms), "ptv": ptv}

if njit is not None:
    @njit
    def summarize_profile_numba_kernel(profile):
        n = profile.size
        # Compute mean
        s = 0.0
        for i in range(n):
            s += profile[i]
        mean = s / n
        # Compute RMS and min, max of centered profile
        s2 = 0.0
        min_c = 1e30
        max_c = -1e30
        for i in range(n):
            c = profile[i] - mean
            s2 += c * c
            if c < min_c:
                min_c = c
            if c > max_c:
                max_c = c
        rms = math.sqrt(s2 / n)
        ptv = max_c - min_c
        return mean, rms, ptv

def summarize_profile_numba(profile: np.ndarray) -> dict:
    if njit is None:
        return summarize_profile_numpy(profile)
    mean, rms, ptv = summarize_profile_numba_kernel(profile)
    return {"mean": float(mean), "rms": float(rms), "ptv": float(ptv)}

def process_one_profile(args):
    """Wrapper for Pool.map: args could be (index, use_numba)."""
    index, use_numba = args
    profile = simulate_profile()
    if use_numba:
        summary = summarize_profile_numba(profile)
    else:
        summary = summarize_profile_numpy(profile)
    summary["index"] = index
    return summary

def main():
    # TODO:
    # 1) Create a list of indices, e.g. range(8)
    # 2) Use Pool to process them in parallel
    # 3) Print the summaries sorted by index
    
    # indices = ...
    # args_list = [(i, True) for i in indices]
    # with Pool() as pool:
    #     results = pool.map(process_one_profile, args_list)
    # results_sorted = sorted(results, key=lambda d: d["index"])
    # for r in results_sorted:
    #     print(r)

if __name__ == "__main__":
    main()


In [None]:
# Solution RUN FROM .py INSTEAD OF NOTEBOOK

import numpy as np
import math
from multiprocessing import Pool

try:
    from numba import njit
except ImportError:
    njit = None

def simulate_profile(n_points: int = 50_000) -> np.ndarray:
    x = np.linspace(0.0, 10.0, n_points)
    base = 5.0 * np.sin(2 * np.pi * x / 5.0)
    noise = np.random.normal(loc=0.0, scale=0.5, size=n_points)
    return base + noise

def summarize_profile_numpy(profile: np.ndarray) -> dict:
    mean = profile.mean()
    centered = profile - mean
    rms = math.sqrt((centered * centered).mean())
    ptv = float(centered.max() - centered.min())
    return {"mean": float(mean), "rms": float(rms), "ptv": ptv}

if njit is not None:
    @njit
    def summarize_profile_numba_kernel(profile):
        n = profile.size
        s = 0.0
        for i in range(n):
            s += profile[i]
        mean = s / n
        s2 = 0.0
        min_c = 1e30
        max_c = -1e30
        for i in range(n):
            c = profile[i] - mean
            s2 += c * c
            if c < min_c:
                min_c = c
            if c > max_c:
                max_c = c
        rms = math.sqrt(s2 / n)
        ptv = max_c - min_c
        return mean, rms, ptv

def summarize_profile_numba(profile: np.ndarray) -> dict:
    if njit is None:
        return summarize_profile_numpy(profile)
    mean, rms, ptv = summarize_profile_numba_kernel(profile)
    return {"mean": float(mean), "rms": float(rms), "ptv": float(ptv)}

def process_one_profile(args):
    index, use_numba = args
    profile = simulate_profile()
    if use_numba:
        summary = summarize_profile_numba(profile)
    else:
        summary = summarize_profile_numpy(profile)
    summary["index"] = index
    return summary

if __name__ == "__main__":
    indices = list(range(8))
    args_list = [(i, True) for i in indices]
    with Pool() as pool:
        results = pool.map(process_one_profile, args_list)
    results_sorted = sorted(results, key=lambda d: d["index"])
    for r in results_sorted:
        print(r)

## Day 3 summary

Today you:

- Built a mental model of **CPU-bound** vs **I/O-bound** workloads.
- Reviewed the impact of the **GIL** on threads and why processes are used for CPU-bound speedups.
- Used **threads** for concurrent I/O-style tasks, collecting results safely with locks.
- Used **multiprocessing** to process independent batches of measurement data in parallel.
- Learned the basics of **AsyncIO** and coordinated multiple async measurement coroutines.
- Revisited **NumPy**, created arrays, applied vectorized transformations, and computed statistics for physics/measurement data.
- Practiced boolean masks, axis-wise aggregations, and small vectorized physics-style calculations.
- Used **Numba** to JIT-compile custom numeric kernels and understood how it complements NumPy.
- Saw an overview of **GPU tools** like [CuPy](https://cupy.dev/) and [cuDF](https://docs.rapids.ai/api/cudf/stable/) for GPU acceleration.
- Discussed **Python 3.13** and its experimental JIT and optional no-GIL builds, and how future CPython versions may affect performance.
- Combined multiple concepts in a complex example: parallel processing of simulated surface profiles with NumPy, Numba, and multiprocessing.

These tools and concepts form a practical toolbox for high-performance numerical work in Python, especially in physics and engineering contexts. On the next days you can build on this knowledge for more advanced machine learning and deep learning workloads.