<a href="https://colab.research.google.com/github/amkayhani/wm9QF_programming_for_artificial_intelligence_labs/blob/main/Python_concurrency_and_parallelism.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Python Advanced: Concurrency and Parallelism

This notebook mirrors the LiaScript lesson and keeps the examples runnable with extra guidance for beginners. You will learn when to use threading, multiprocessing, and asyncio to keep programs responsive and fast.

**Prerequisites:** comfortable with Python functions, loops, and basic exception handling.

**You will be able to:**
- Distinguish concurrency, parallelism, and asynchronous programming
- Choose between threading, multiprocessing, and asyncio for different workloads
- Spot issues like race conditions and deadlocks
- Apply these ideas to web scraping, data processing, and API-heavy code

## Concurrency vs. Parallelism

- **Concurrency**: dealing with many tasks at once by switching between them (great when tasks wait for I/O).
- **Parallelism**: doing multiple tasks at the exact same time on different CPU cores.
- Python gives you three main tools:
  1. **Threading**: many threads in one process (best for I/O-bound work).
  2. **Multiprocessing**: many processes with separate memory (best for CPU-bound work).
  3. **Asyncio**: cooperative multitasking with `async`/`await` (great for lots of network-bound tasks).

## The Global Interpreter Lock (GIL)

The GIL is a lock in CPython that allows only one thread to run Python bytecode at a time. Effects:
- Threading can speed up I/O-bound work because threads often wait for I/O, but it will **not** speed up CPU-heavy code.
- Multiprocessing avoids the GIL by using separate processes, so it can speed up CPU-bound work.

In [None]:
import threading
import time


def cpu_intensive_task(n: int) -> int:
    """Simulate CPU-bound work by doing many math operations."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total


# Single-threaded execution
start = time.perf_counter()
result1 = cpu_intensive_task(1_000_000)
result2 = cpu_intensive_task(1_000_000)
single_duration = time.perf_counter() - start
print(f"Single-threaded: {single_duration:.3f}s")

# Multi-threaded execution (the GIL prevents a speedup for CPU-bound work)
start = time.perf_counter()
thread1 = threading.Thread(target=cpu_intensive_task, args=(1_000_000,))
thread2 = threading.Thread(target=cpu_intensive_task, args=(1_000_000,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
threaded_duration = time.perf_counter() - start
print(f"Multi-threaded: {threaded_duration:.3f}s")
print(f"Threading speedup: {single_duration / threaded_duration:.2f}x")


Single-threaded: 0.217s
Multi-threaded: 0.219s
Threading speedup: 0.99x


**What to notice:** even with two threads, the CPU-heavy loop does not get faster because the GIL lets only one thread execute Python bytecode at a time. Use multiprocessing for CPU-bound work instead.

## Threading Basics

Threading runs multiple threads inside one process. Threads share memory, which makes sharing data easy but requires synchronization when writing to shared objects.

In [None]:
import threading
import time


def download_file(file_id: int) -> str:
    """Pretend to download a file; here we just sleep to simulate network delay."""
    print(f"Starting download {file_id}")
    time.sleep(0.5)  # Simulate network latency
    print(f"Finished download {file_id}")
    return f"data_{file_id}"


print("=== Sequential Downloads ===")
start = time.perf_counter()
for i in range(4):
    download_file(i)
sequential_duration = time.perf_counter() - start
print(f"Sequential time: {sequential_duration:.2f}s")

print("=== Concurrent Downloads (Threading) ===")
start = time.perf_counter()
threads = []
for i in range(4):
    thread = threading.Thread(target=download_file, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for every thread to finish before measuring time
for thread in threads:
    thread.join()

threaded_duration = time.perf_counter() - start
print(f"Threaded time: {threaded_duration:.2f}s")
print(f"Speedup: {sequential_duration / threaded_duration:.2f}x")


=== Sequential Downloads ===
Starting download 0
Finished download 0
Starting download 1
Finished download 1
Starting download 2
Finished download 2
Starting download 3
Finished download 3
Sequential time: 2.00s
=== Concurrent Downloads (Threading) ===
Starting download 0
Starting download 1
Starting download 2
Starting download 3
Finished download 0
Finished download 2
Finished download 3
Finished download 1
Threaded time: 0.50s
Speedup: 3.98x


**Quiz:** When is threading most beneficial?

- [ ] CPU-intensive math computations
- [x] Downloading many files from the internet
- [ ] Sorting huge arrays entirely in memory
- [ ] Computing prime numbers

Threading shines when tasks mostly wait for I/O.

## Thread Safety and Locks

When threads update shared data, use locks to avoid race conditions. A `Lock` ensures only one thread enters a protected block at a time.

In [None]:
import threading
import time
import random


class BankAccount:
    """Simulates a bank account with thread-safe operations."""

    def __init__(self, initial_balance: float = 1000.0):
        self.balance = initial_balance
        self.lock = threading.Lock()
        self.transaction_count = 0

    def deposit(self, amount: float, account_holder: str):
        """Safely deposit money into the account."""
        with self.lock:
            print(f"{account_holder} depositing ${amount:.2f}")
            current_balance = self.balance
            time.sleep(0.01)  # Simulate processing time
            self.balance = current_balance + amount
            self.transaction_count += 1
            print(f"{account_holder} deposited ${amount:.2f}. New balance: ${self.balance:.2f}")

    def withdraw(self, amount: float, account_holder: str) -> bool:
        """Safely withdraw money from the account."""
        with self.lock:
            print(f"{account_holder} attempting to withdraw ${amount:.2f}")
            if self.balance >= amount:
                current_balance = self.balance
                time.sleep(0.01)  # Simulate processing time
                self.balance = current_balance - amount
                self.transaction_count += 1
                print(f"{account_holder} withdrew ${amount:.2f}. New balance: ${self.balance:.2f}")
                return True
            else:
                print(f"{account_holder} withdrawal failed - insufficient funds")
                return False

    def get_balance(self) -> float:
        """Thread-safe balance inquiry."""
        with self.lock:
            return self.balance


def perform_transactions(account: BankAccount, person_name: str, num_transactions: int):
    """Perform random deposits and withdrawals."""
    for _ in range(num_transactions):
        action = random.choice(['deposit', 'withdraw'])
        amount = random.uniform(10, 100)

        if action == 'deposit':
            account.deposit(amount, person_name)
        else:
            account.withdraw(amount, person_name)

        time.sleep(random.uniform(0.01, 0.05))


# Create a shared bank account
print("=== Bank Account Simulation with Thread Safety ===")
account = BankAccount(initial_balance=1000.0)
print(f"Initial balance: ${account.get_balance():.2f}\n")

# Multiple people accessing the same account concurrently
people = ["Alice", "Bob", "Charlie"]
transaction_threads = []

for person in people:
    thread = threading.Thread(target=perform_transactions, args=(account, person, 5))
    transaction_threads.append(thread)
    thread.start()

# Wait for all transactions to complete
for thread in transaction_threads:
    thread.join()

print(f"\n=== Final Results ===")
print(f"Final balance: ${account.get_balance():.2f}")
print(f"Total transactions: {account.transaction_count}")

=== Bank Account Simulation with Thread Safety ===
Initial balance: $1000.00

Alice depositing $79.73
Alice deposited $79.73. New balance: $1079.73
Bob depositing $17.47
Bob deposited $17.47. New balance: $1097.20
Charlie depositing $86.10
Charlie deposited $86.10. New balance: $1183.31
Alice depositing $61.66
Alice deposited $61.66. New balance: $1244.97
Bob depositing $57.70
Bob deposited $57.70. New balance: $1302.67
Charlie depositing $67.43
Charlie deposited $67.43. New balance: $1370.10
Alice depositing $43.19
Alice deposited $43.19. New balance: $1413.29
Charlie depositing $48.25
Charlie deposited $48.25. New balance: $1461.54
Bob depositing $83.83
Bob deposited $83.83. New balance: $1545.37
Alice attempting to withdraw $92.39
Alice withdrew $92.39. New balance: $1452.98
Charlie attempting to withdraw $35.20
Charlie withdrew $35.20. New balance: $1417.78
Alice attempting to withdraw $28.16
Alice withdrew $28.16. New balance: $1389.63
Bob depositing $65.77
Bob deposited $65.77. N

## Thread Safety and Locks Exercise:
Try the same code without Lock and check the result.

In [None]:
import threading
import time


class UnsafeCounter:
    def __init__(self):
        self.count = 0

    def increment(self):
        # No lock: two threads can read the same value and overwrite each other
        current = self.count
        current += 1
        time.sleep(0.01)
        self.count = current


class SafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()

    def increment(self):
        # Lock prevents overlapping writes
        with self.lock:
            current = self.count
            current += 1
            time.sleep(0.01)
            self.count = current


def worker(counter, iterations: int):
    for _ in range(iterations):
        counter.increment()


# Unsafe counter test
unsafe = UnsafeCounter()
threads = [threading.Thread(target=worker, args=(unsafe, 1000)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Unsafe counter: {unsafe.count}")

# Safe counter test
safe = SafeCounter()
threads = [threading.Thread(target=worker, args=(safe, 1000)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Safe counter: {safe.count}")


Unsafe counter: 1000
Safe counter: 5000


Locks add overhead but prevent data races. Use them around writes to shared mutable state, or prefer thread-safe data structures.

## ThreadPoolExecutor for Easier Threading

`concurrent.futures.ThreadPoolExecutor` manages a pool of worker threads for you. Submit callables and collect results as they finish.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def process_data(item_id: int):
    """Simulate processing a data item that is I/O-bound."""
    time.sleep(0.2)
    return f"Processed item {item_id}", item_id * 2


print("=== Processing with ThreadPoolExecutor ===")
start = time.perf_counter()

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(process_data, i) for i in range(10)]

    for future in as_completed(futures):
        message, result = future.result()
        print(f"{message}: {result}")

duration = time.perf_counter() - start
print(f"Total time: {duration:.2f}s")


=== Processing with ThreadPoolExecutor ===
Processed item 0: 0
Processed item 1: 2
Processed item 2: 4
Processed item 3: 6
Processed item 4: 8
Processed item 6: 12
Processed item 7: 14
Processed item 5: 10
Processed item 8: 16
Processed item 9: 18
Total time: 0.61s


`ThreadPoolExecutor` is ideal when you want simple parallel I/O without manually creating and tracking threads.

## Multiprocessing for CPU-Bound Tasks

Use the `multiprocessing` module to run CPU-heavy code in separate processes. Each process has its own Python interpreter, so the GIL is not a bottleneck.

In [None]:
import multiprocessing
import os
import time

def cpu_heavy_task(x):
    """
    A CPU-intensive task that calculates the sum of squares up to x.
    It prints the Process ID (PID) to show which process is executing it.
    """
    pid = os.getpid()
    print(f"Task {x} started on Process ID: {pid}")

    # Simulate heavy computation
    result = sum(i * i for i in range(x))

    return f"Task {x} result: {result} (Processed by PID {pid})"

# Prepare a list of inputs for the heavy task
inputs = [1000000, 2000000, 3000000, 4000000]

print(f"Main Process ID: {os.getpid()}")
print("Starting multiprocessing pool...")

# Create a Pool of worker processes.
# By default, it creates one worker per CPU core available.
# Since these are separate processes, each has its own memory and Python interpreter (GIL).
with multiprocessing.Pool() as pool:
    # pool.map distributes the inputs across the worker processes
    # and collects the results in a list.
    results = pool.map(cpu_heavy_task, inputs)

print("\nAll tasks completed. Results:")
for res in results:
    print(res)

Main Process ID: 422
Starting multiprocessing pool...
Task 1000000 started on Process ID: 26330Task 2000000 started on Process ID: 26331

Task 3000000 started on Process ID: 26330
Task 4000000 started on Process ID: 26331

All tasks completed. Results:
Task 1000000 result: 333332833333500000 (Processed by PID 26330)
Task 2000000 result: 2666664666667000000 (Processed by PID 26331)
Task 3000000 result: 8999995500000500000 (Processed by PID 26330)
Task 4000000 result: 21333325333334000000 (Processed by PID 26331)


Please run the below code in VS Code. And think why Colab is not a good place to run this code?

In [None]:
import multiprocessing
import os
import time

def cpu_heavy_task(x):
    """
    A CPU-intensive task that calculates the sum of squares up to x.
    It prints the Process ID (PID) to show which process is executing it.
    """
    pid = os.getpid()
    print(f"Task {x} started on Process ID: {pid}")

    # Simulate heavy computation
    result = sum(i * i for i in range(x))

    return f"Task {x} result: {result} (Processed by PID {pid})"

if __name__ == "__main__":
    # Prepare a list of inputs for the heavy task
    inputs = [1000000, 2000000, 3000000, 4000000]
    print(f"Main Process ID: {os.getpid()}")
    print("Starting multiprocessing pool...")

    # Create a Pool of worker processes.
    # By default, it creates one worker per CPU core available.
    # Since these are separate processes, each has its own memory and Python interpreter (GIL).
    with multiprocessing.Pool() as pool:
        # pool.map distributes the inputs across the worker processes
        # and collects the results in a list.
        results = pool.map(cpu_heavy_task, inputs)

    print("\nAll tasks completed. Results:")
    for res in results:
        print(res)


Main Process ID: 422
Starting multiprocessing pool...
Task 2000000 started on Process ID: 26360Task 1000000 started on Process ID: 26359

Task 3000000 started on Process ID: 26359
Task 4000000 started on Process ID: 26360

All tasks completed. Results:
Task 1000000 result: 333332833333500000 (Processed by PID 26359)
Task 2000000 result: 2666664666667000000 (Processed by PID 26360)
Task 3000000 result: 8999995500000500000 (Processed by PID 26359)
Task 4000000 result: 21333325333334000000 (Processed by PID 26360)


Multiprocessing can approach linear speedup for CPU work when you have enough cores. Remember that processes do not share memory.

## Process Communication with Queues

Because processes do not share memory, you need explicit communication. `multiprocessing.Queue` provides safe, cross-process message passing.

In [None]:
import multiprocessing
import time


def producer(queue, items):
    """Produce items and put them on the queue."""
    for item in items:
        print(f"Producing {item}")
        queue.put(item)
        time.sleep(0.1)
    queue.put(None)  # Sentinel value to signal completion


def consumer(queue):
    """Consume items until the sentinel arrives."""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consuming {item}")
        time.sleep(0.15)


queue = multiprocessing.Queue()
items = ["data_1", "data_2", "data_3", "data_4"]

producer_process = multiprocessing.Process(target=producer, args=(queue, items))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

producer_process.start()
consumer_process.start()

producer_process.join()
consumer_process.join()

print("All processing complete")


Producing data_1
Consuming data_1
Producing data_2
Consuming data_2
Producing data_3
Producing data_4
Consuming data_3
Consuming data_4
All processing complete


Queues make it easy to build producer-consumer pipelines without sharing mutable state directly.

## Introduction to Asyncio

Asyncio lets one thread juggle many I/O-bound tasks by switching when tasks await I/O. It is ideal for thousands of network operations.

In [None]:
import asyncio   # Built-in library for asynchronous (non-blocking) programming
import time      # Used here just to measure elapsed time


# Define an *asynchronous* function (a coroutine) using `async def`
async def fetch_data(source_id: int, delay: float):
    """Simulate fetching data from a remote source."""
    # This prints immediately when the coroutine starts running
    print(f"Fetching from source {source_id}...")

    # Non-blocking sleep: while this task is "waiting", other tasks can run
    # In async code we use `await asyncio.sleep(...)` instead of `time.sleep(...)`
    await asyncio.sleep(delay)

    # This prints after the delay has passed
    print(f"Received data from source {source_id}")

    # Return some fake "data" for this source
    return f"data_{source_id}"


# Another coroutine that coordinates everything
async def main():
    # Take a high-precision timestamp so we can measure total runtime
    start = time.perf_counter()

    # Create a list of coroutine objects for four different "fetch" operations.
    # Note: these are NOT running yet; they are just defined tasks.
    tasks = [
        fetch_data(1, 0.5),  # Source 1 will "take" 0.5 seconds
        fetch_data(2, 0.3),  # Source 2 will "take" 0.3 seconds
        fetch_data(3, 0.4),  # Source 3 will "take" 0.4 seconds
        fetch_data(4, 0.2),  # Source 4 will "take" 0.2 seconds
    ]

    # `asyncio.gather` runs all the tasks *concurrently*.
    # `await` waits until they are all finished and then returns their results.
    # The results are returned in the same order as `tasks`, not by completion time.
    results = await asyncio.gather(*tasks)

    # Work out how long everything took in total
    duration = time.perf_counter() - start

    # Show the collected results (a list of strings like "data_1", "data_2", ...)
    print(f"Results: {results}")

    # Show total time. Because tasks run concurrently, this should be close to
    # the *longest* individual delay (0.5s), not the sum of all delays.
    print(f"Total time: {duration:.2f}s (close to the longest delay, not the sum)")


# --- NOTE FOR COLAB / JUPYTER ---
# In Google Colab and Jupyter, an event loop is already running in the background.
# That means `asyncio.run(main())` will raise an error.
# Instead, we can directly `await main()` at the top level of a cell.
# This line must be run in a cell by itself (or at the bottom of the cell) in Colab.
await main()
# --- NOTE FOR VS CODE ---
#Use the below line instead of the above line to run this code in VS CODE:
#asyncio.run(main())

Fetching from source 1...
Fetching from source 2...
Fetching from source 3...
Fetching from source 4...
Received data from source 4
Received data from source 2
Received data from source 3
Received data from source 1
Results: ['data_1', 'data_2', 'data_3', 'data_4']
Total time: 0.50s (close to the longest delay, not the sum)


Asyncio uses cooperative multitasking: tasks run until they `await` something slow, then another task runs. Great for sockets, HTTP calls, and database queries.

**Quiz:** What is the main advantage of asyncio over threading?

- [ ] Better for CPU-bound tasks
- [x] Lower overhead for managing many concurrent I/O operations
- [ ] Automatically uses multiple CPU cores
- [ ] Simpler syntax than threads

## Async/Await Syntax

Use `async def` to declare coroutines and `await` to pause them. `asyncio.gather` runs many coroutines at once and collects their results.

In [None]:
import asyncio
from pathlib import Path

import aiohttp
# aiohttp is an async HTTP client library.
# It integrates nicely with asyncio, allowing us to make non-blocking HTTP requests.


async def download_file(session: aiohttp.ClientSession, url: str, filename: str) -> str:
    """
    Download a page and save it to a file.

    - `session` is reused across requests for efficiency (connection pooling).
    - `url` is the address of the page to download.
    - `filename` is where we’ll save the content locally.
    """
    print(f"Starting download: {url}")

    # `async with` ensures the request is properly closed when we're done,
    # even if an exception happens. This is important for freeing resources.
    async with session.get(url) as response:
        # `.raise_for_status()` will raise an exception for HTTP errors (4xx/5xx),
        # making debugging easier instead of silently failing.
        response.raise_for_status()

        # `await response.read()` is non-blocking; it lets other tasks run while
        # waiting for the network data to arrive.
        content = await response.read()

    # Use `pathlib.Path` instead of raw strings for safer, clearer path handling.
    downloads_dir = Path("downloads")
    # `exist_ok=True` stops it raising an error if the directory already exists.
    downloads_dir.mkdir(exist_ok=True)

    filepath = downloads_dir / filename
    # `.write_bytes()` writes the raw bytes to disk in one go (no need to open/close manually).
    filepath.write_bytes(content)

    print(f"Completed download: {url} -> {filepath}")
    return f"Downloaded {len(content)} bytes from {url} to {filepath}"


async def download_all_files():
    """
    Create and run all download tasks concurrently.

    This function:
    - Defines which URLs to download.
    - Opens a single shared HTTP session (for efficiency).
    - Starts all downloads at once using `asyncio.gather`.
    """
    files = [
        ("https://warwick.ac.uk/", "warwick.html"),
        (
            "https://warwick.ac.uk/fac/sci/wmg/study/masters-degrees/applied-artificial-intelligence/",
            "applied_ai.html",
        ),
        ("https://warwick.ac.uk/fac/sci/wmg/", "wmg.html"),
    ]

    # Use one ClientSession for multiple requests:
    # - Reuses TCP connections (connection pooling)
    # - Faster and more resource-efficient than creating a new session per request.
    async with aiohttp.ClientSession() as session:
        # Build a list of coroutine objects (not yet executed):
        tasks = [
            download_file(session, url, filename)
            for url, filename in files
        ]

        # `asyncio.gather(*tasks)` schedules all the tasks to run concurrently
        # and waits for all of them to finish.
        #
        # This is where we get the benefit of async: while one download is waiting
        # on network I/O, another one can progress.
        results = await asyncio.gather(*tasks)

    # Return the list of result strings from all the downloads.
    return results


async def main():
    """
    Top-level async entry point.

    This function orchestrates the whole workflow:
    - Calls `download_all_files` to perform all network work.
    - Prints the results returned for each download.
    """
    results = await download_all_files()
    for result in results:
        print(result)


# In a normal Python script (run from the terminal), you’d typically do:
# if __name__ == "__main__":
#     asyncio.run(main())
#
# But in Jupyter / IPython, the event loop is already running,
# so we can directly `await main()` instead:
await main()


Starting download: https://warwick.ac.uk/
Starting download: https://warwick.ac.uk/fac/sci/wmg/study/masters-degrees/applied-artificial-intelligence/
Starting download: https://warwick.ac.uk/fac/sci/wmg/
Completed download: https://warwick.ac.uk/ -> downloads/warwick.html
Completed download: https://warwick.ac.uk/fac/sci/wmg/study/masters-degrees/applied-artificial-intelligence/ -> downloads/applied_ai.html
Completed download: https://warwick.ac.uk/fac/sci/wmg/ -> downloads/wmg.html
Downloaded 72931 bytes from https://warwick.ac.uk/ to downloads/warwick.html
Downloaded 78613 bytes from https://warwick.ac.uk/fac/sci/wmg/study/masters-degrees/applied-artificial-intelligence/ to downloads/applied_ai.html
Downloaded 133110 bytes from https://warwick.ac.uk/fac/sci/wmg/ to downloads/wmg.html


`await` hands control back to the event loop so other tasks can run while one is waiting on I/O.

Async context managers guarantee cleanup (even on errors), and async iterators make it easy to work with streaming data without blocking the loop.

## Choosing the Right Approach

Use this quick guide:

| Task type | Best approach | Why |
| --- | --- | --- |
| I/O-bound (few connections) | Threading | Simple and low overhead |
| I/O-bound (many connections) | Asyncio | Scales to thousands with low overhead |
| CPU-bound | Multiprocessing | Uses multiple cores and skips the GIL |
| Mixed I/O + CPU | Asyncio plus process pool | Async for I/O, processes for heavy work |

### Choosing the Right Concurrency Model

Below is a quick reference for why each method is a good fit for the given example.

---

## 1. Web scraping 200 URLs → `asyncio`

**Best because:**

- **I/O-bound workload**  
  - Web scraping spends most of its time **waiting for network responses**, not using much CPU.
- **High concurrency needed (200+ tasks)**  
  - Doing 200 requests sequentially is slow; you want to **overlap the waiting time**.
- **`asyncio` scales well to many tasks**  
  - Runs everything in **one thread, one process**.
  - While one request is waiting (`await`), the event loop runs other coroutines.
  - Coroutines are **lighter than threads**, so hundreds or thousands of concurrent tasks are practical.
- **Threads are possible but heavier**  
  - 200 threads use more memory and incur more context-switching overhead.
  - `asyncio` is designed specifically for “lots of I/O, lots of tasks”.

> **Summary:** Many network-bound tasks (200 URLs) → `asyncio` gives efficient, scalable concurrency.

---

## 2. Image processing 50 files → `multiprocessing`

**Best because:**

- **CPU-bound workload**  
  - Image decoding, resizing, filtering, and transforms are **computationally heavy**.
- **The GIL limits threads for CPU work**  
  - In CPython, the **Global Interpreter Lock (GIL)** means only one thread executes Python bytecode at a time **per process**.
  - Threads do **not** give true parallelism for CPU-heavy pure Python code.
- **`multiprocessing` uses multiple processes**  
  - Each process has its own interpreter and its own GIL.
  - Multiple processes can run on **different CPU cores in parallel**.
- **Good match for 50 images**  
  - Use a process pool (e.g. 4–8 workers).
  - Each process handles images one by one, fully using multiple cores.
- **Threads here would underuse the CPU**  
  - They would still contend on the GIL and not speed things up much.

> **Summary:** Heavy CPU-bound work (image processing) → `multiprocessing` bypasses the GIL and uses multiple cores.

---

## 3. Database queries (10) → `threading`

**Best because:**

- **I/O-bound workload**  
  - Database queries spend most time **waiting on the database server** to respond.
- **Only a small number of concurrent tasks (10)**  
  - Overhead of 10 threads is tiny; we do not need the massive scalability of `asyncio`.
- **Typical DB drivers are blocking and thread-safe**  
  - Most database libraries expose a **blocking** API: `cursor.execute(query)` blocks until the result arrives.
  - They are usually designed to work well with threads.
- **Threads are easy to use here**  
  - Each thread can run a query independently.
  - The main thread waits for all threads to finish.
  - No need to convert codebase to async or manage an event loop.
- **`multiprocessing` is overkill**  
  - Processes are heavier (more memory, more coordination).
  - For 10 I/O-bound operations, the extra complexity is not worth it.

> **Summary:** A small number of blocking I/O tasks (10 DB queries) → `threading` is simple, effective, and works well with typical database drivers.

`loop.run_in_executor` is the bridge that lets async code cooperate with blocking libraries without freezing the event loop.

##Example:
Try the below example using both CPU and GPU. The code automatically choose GPU when it is available.

In [None]:
# =======================
# 1. Install dependencies
# =======================
!pip install -q openai-whisper aiohttp
!apt-get -y install ffmpeg > /dev/null

# =======================
# 2. Imports & setup
# =======================
import asyncio
import aiohttp
from pathlib import Path
import whisper
import torch

# Folder where files will be stored
DOWNLOAD_DIR = Path("downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)

# Choose device (GPU if available)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Load Whisper model (you can change "small" to "tiny", "base", "medium", "large", etc.)
model = whisper.load_model("small", device=device)

# ==========================================
# 3. Async download helpers (audio / video)
# ==========================================
async def download_one(session: aiohttp.ClientSession, url: str, dest_folder: Path = DOWNLOAD_DIR) -> Path:
    """
    Download a single file using aiohttp and save it to dest_folder.
    Returns the local file path.
    """
    # Derive a filename from the URL
    name_from_url = url.split("?")[0].rstrip("/").split("/")[-1] or "file"
    filepath = dest_folder / name_from_url

    print(f"Starting download: {url}")
    async with session.get(url) as resp:
        resp.raise_for_status()
        with open(filepath, "wb") as f:
            async for chunk in resp.content.iter_chunked(1024 * 64):
                f.write(chunk)
    print(f"Completed download: {filepath}")
    return filepath


async def download_all(urls):
    """
    Download all URLs concurrently and return a list of local file paths.
    """
    async with aiohttp.ClientSession() as session:
        tasks = [download_one(session, url) for url in urls]
        files = await asyncio.gather(*tasks)
    return files

# ===================================
# 4. Transcription using Whisper
# ===================================
def transcribe_files(filepaths, language: str = None):
    """
    Transcribe each file in filepaths with Whisper.
    Saves a .txt file with the same name and returns a dict of {Path: text}.
    """
    transcripts = {}
    for path in filepaths:
        print(f"\nTranscribing: {path}")
        # language=None lets Whisper auto-detect; set e.g. language="en" to force English
        result = model.transcribe(str(path), language=language)
        text = result["text"].strip()

        transcripts[path] = text

        # Save transcript next to the file
        txt_path = path.with_suffix(path.suffix + ".txt")
        txt_path.write_text(text, encoding="utf-8")
        print(f"Saved transcript to: {txt_path}")

    return transcripts

# ===================================
# 5. Define URLs and run everything
# ===================================

# Real sample audio/video URLs intended for testing
# - Short sample MP3 and MP4 from samplelib.com
# - Sample MP3 and MP4 from an open GitHub repo
URLS = [
    "https://download.samplelib.com/mp3/sample-3s.mp3",   # short MP3 sample
    "https://github.com/rafaelreis-hotmart/Audio-Sample-files/raw/master/sample.mp3",  # sample MP3
    "https://download.samplelib.com/mp4/sample-5s.mp4",   # short MP4 sample
    "https://github.com/rafaelreis-hotmart/Audio-Sample-files/raw/master/sample.mp4",  # sample MP4
]

async def main():
    # Step 1: download all media files concurrently
    downloaded_files = await download_all(URLS)

    # Step 2: transcribe them one by one with Whisper
    transcripts = transcribe_files(downloaded_files, language=None)  # or language="en"

    # Optional: print a short preview
    print("\n=== Transcript previews ===")
    for path, text in transcripts.items():
        preview = text[:200].replace("\n", " ")
        print(f"{path.name}: {preview}...")
    return transcripts

# In Colab/Jupyter we use top-level `await` instead of asyncio.run(...)
transcripts = await main()


Exception ignored in: <coroutine object main at 0x7e13a637fd40>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object main at 0x7e13a637fd40>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object main at 0x7e13a637d740>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object main at 0x7e13a63cc540>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object main at 0x7e13a63cd140>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object download_all_files at 0x7e13a636e130>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/803.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m31.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone




Using device: cpu


100%|████████████████████████████████████████| 461M/461M [00:01<00:00, 258MiB/s]


Starting download: https://download.samplelib.com/mp3/sample-3s.mp3
Starting download: https://github.com/rafaelreis-hotmart/Audio-Sample-files/raw/master/sample.mp3
Starting download: https://download.samplelib.com/mp4/sample-5s.mp4
Starting download: https://github.com/rafaelreis-hotmart/Audio-Sample-files/raw/master/sample.mp4
Completed download: downloads/sample-3s.mp3
Completed download: downloads/sample.mp4
Completed download: downloads/sample.mp3
Completed download: downloads/sample-5s.mp4

Transcribing: downloads/sample-3s.mp3




Saved transcript to: downloads/sample-3s.mp3.txt

Transcribing: downloads/sample.mp3




Saved transcript to: downloads/sample.mp3.txt

Transcribing: downloads/sample-5s.mp4




Saved transcript to: downloads/sample-5s.mp4.txt

Transcribing: downloads/sample.mp4




Saved transcript to: downloads/sample.mp4.txt

=== Transcript previews ===
sample-3s.mp3: you...
sample.mp3: Music...
sample-5s.mp4: ...
sample.mp4: you...


## Common Pitfalls: Race Conditions and Deadlocks

Race conditions happen when threads interleave reads and writes to shared data without coordination. Deadlocks happen when locks are acquired in inconsistent order. Always use locks for shared writes and a consistent lock ordering.

In [None]:
import threading
import time

# Race condition example
shared_list = []


def unsafe_append(value: int):
    if value not in shared_list:  # Check
        time.sleep(0.001)  # Simulate work between check and write
        shared_list.append(value)  # Modify (race window)


threads = [threading.Thread(target=unsafe_append, args=(1,)) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Unsafe list (should have 1 item): {shared_list}")

# Safe version with lock
shared_list_safe = []
lock = threading.Lock()


def safe_append(value: int):
    with lock:
        if value not in shared_list_safe:
            time.sleep(0.001)
            shared_list_safe.append(value)


threads_safe = [threading.Thread(target=safe_append, args=(1,)) for _ in range(5)]
for t in threads_safe:
    t.start()
for t in threads_safe:
    t.join()

print(f"Safe list (should have 1 item): {shared_list_safe}")


Unsafe list (should have 1 item): [1]
Safe list (should have 1 item): [1]


In [None]:
import threading
import time

# Deadlock example and fix
lock1 = threading.Lock()
lock2 = threading.Lock()


def task1_safe():
    # Always acquire locks in the same order
    with lock1:
        print("Task1 acquired lock1")
        time.sleep(0.1)
        with lock2:
            print("Task1 acquired lock2")


def task2_safe():
    # Same lock order prevents deadlock
    with lock1:
        print("Task2 acquired lock1")
        time.sleep(0.1)
        with lock2:
            print("Task2 acquired lock2")


t1 = threading.Thread(target=task1_safe)
t2 = threading.Thread(target=task2_safe)
t1.start(); t2.start()
t1.join(); t2.join()
print("No deadlock!")


Task1 acquired lock1
Task1 acquired lock2
Task2 acquired lock1
Task2 acquired lock2
No deadlock!


## Practice Projects

Try these exercises to reinforce the ideas. Each block includes a reference solution you can run and modify.

### Exercise 1: Parallel Data Processor (Multiprocessing)

Goal: split a list of numbers across processes and compare sequential vs. parallel execution.

Steps:
1. Write a CPU-intensive function (prime check works well).
2. Process numbers sequentially and time it.
3. Process numbers with a process pool and time it.
4. Compare the speedup.

In [None]:
import multiprocessing
import time
import math


def is_prime(n: int) -> bool:
    """Check if n is prime (expensive for large numbers)."""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True


def process_number(n: int):
    return {
        "number": n,
        "is_prime": is_prime(n),
        "square": n * n,
        "cube": n * n * n,
    }


def sequential_processing(numbers):
    start = time.perf_counter()
    results = [process_number(n) for n in numbers]
    duration = time.perf_counter() - start
    return results, duration


def parallel_processing(numbers, num_processes: int = 4):
    start = time.perf_counter()
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(process_number, numbers)
    duration = time.perf_counter() - start
    return results, duration


# Test with large primes (primality check is expensive)
test_numbers = [104729, 104743, 104759, 104761, 104773, 104779, 104789, 104801]

print("=== Sequential Processing ===")
seq_results, seq_time = sequential_processing(test_numbers)
print(f"Time: {seq_time:.3f}s")
for r in seq_results[:3]:
    print(f"  {r['number']}: prime={r['is_prime']}")

print("
=== Parallel Processing ===")
par_results, par_time = parallel_processing(test_numbers)
print(f"Time: {par_time:.3f}s")
for r in par_results[:3]:
    print(f"  {r['number']}: prime={r['is_prime']}")

print(f"Speedup: {seq_time / par_time:.2f}x")


### Exercise 2: Async Web Scraper (Asyncio)

Goal: fetch many URLs concurrently with asyncio, handle errors, and limit concurrency.

In [None]:
import asyncio
import time


class AsyncWebScraper:
    """Asynchronous web scraper with concurrency control."""

    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_url(self, url: str, delay: float = 0.5):
        async with self.semaphore:
            try:
                print(f"Fetching: {url}")
                await asyncio.sleep(delay)  # Simulate network request

                if "error" in url:
                    raise ConnectionError(f"Failed to connect to {url}")

                data = f"Content from {url}"
                print(f"Completed: {url}")
                return {"url": url, "status": "success", "data": data}

            except ConnectionError as exc:
                print(f"Error: {exc}")
                return {"url": url, "status": "error", "data": None}

    async def scrape_urls(self, urls):
        tasks = [self.fetch_url(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)


async def main():
    urls = [
        "http://example.com/page1",
        "http://example.com/page2",
        "http://example.com/page3",
        "http://example.com/error",  # Will fail
        "http://example.com/page4",
        "http://example.com/page5",
        "http://example.com/page6",
    ]

    scraper = AsyncWebScraper(max_concurrent=3)

    start = time.perf_counter()
    results = await scraper.scrape_urls(urls)
    duration = time.perf_counter() - start

    print("
=== Results ===")
    successful = sum(1 for r in results if isinstance(r, dict) and r.get("status") == "success")
    print(f"Successful: {successful}/{len(urls)}")
    print(f"Total time: {duration:.2f}s")
    print(f"Average time per URL: {duration / len(urls):.2f}s")


asyncio.run(main())


SyntaxError: unterminated string literal (detected at line 50) (ipython-input-228327426.py, line 50)

Try adding retries with exponential backoff for failed URLs to make the scraper more robust.

### Exercise 3: Producer-Consumer Pipeline (Threading)

Goal: implement a producer-consumer pattern with a queue and multiple consumer threads.

In [None]:
import threading
import queue
import time
import random


class DataPipeline:
    """Producer-consumer pipeline with a thread-safe queue."""

    def __init__(self, queue_size: int = 10):
        self.queue = queue.Queue(maxsize=queue_size)
        self.stop_event = threading.Event()

    def producer(self, producer_id: int, num_items: int):
        for i in range(num_items):
            if self.stop_event.is_set():
                break

            item = {
                "id": f"P{producer_id}-{i}",
                "value": random.randint(1, 100),
                "timestamp": time.time(),
            }

            print(f"Producer {producer_id}: Creating {item['id']}")
            self.queue.put(item)
            time.sleep(random.uniform(0.1, 0.3))

        print(f"Producer {producer_id}: Finished")

    def consumer(self, consumer_id: int):
        while not self.stop_event.is_set():
            try:
                item = self.queue.get(timeout=1)
                print(f"Consumer {consumer_id}: Processing {item['id']}")
                time.sleep(random.uniform(0.2, 0.5))
                result = item['value'] * 2
                print(f"Consumer {consumer_id}: Completed {item['id']} -> {result}")
                self.queue.task_done()
            except queue.Empty:
                continue

        print(f"Consumer {consumer_id}: Finished")

    def run(self, num_producers: int = 2, num_consumers: int = 3, items_per_producer: int = 5):
        threads = []

        # Start consumers first so they are ready to work
        for i in range(num_consumers):
            t = threading.Thread(target=self.consumer, args=(i,))
            t.start()
            threads.append(t)

        # Start producers
        for i in range(num_producers):
            t = threading.Thread(target=self.producer, args=(i, items_per_producer))
            t.start()
            threads.append(t)

        # Wait for all producers to finish
        for t in threads[num_consumers:]:
            t.join()

        # Wait until queue is empty
        self.queue.join()
        self.stop_event.set()

        # Stop consumers
        for t in threads[:num_consumers]:
            t.join()

        print("
=== Pipeline Complete ===")


pipeline = DataPipeline(queue_size=5)
pipeline.run(num_producers=2, num_consumers=3, items_per_producer=4)


Try replacing the regular queue with `queue.PriorityQueue()` and use the item value as the priority so larger numbers process first.

### Bonus: Hybrid Async + Multiprocessing

Goal: combine asyncio for I/O-bound fetching with a process pool for CPU-heavy analysis.

In [None]:
import asyncio
import time
import math
from concurrent.futures import ProcessPoolExecutor
import random


def cpu_intensive_analysis(data):
    numbers = data.get("numbers", [])
    result = sum(math.sqrt(n) * math.log(n + 1) for n in numbers if n > 0)
    return {"id": data["id"], "result": result, "count": len(numbers)}


async def fetch_dataset(dataset_id: str, delay: float = 0.3):
    print(f"Fetching dataset {dataset_id}...")
    await asyncio.sleep(delay)
    data = {"id": dataset_id, "numbers": [random.randint(1, 1000) for _ in range(10000)]}
    print(f"Fetched dataset {dataset_id}")
    return data


async def process_with_hybrid_approach(dataset_ids):
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # Phase 1: async fetch
        print("=== Phase 1: Fetching Datasets (Async I/O) ===")
        datasets = await asyncio.gather(*(fetch_dataset(did) for did in dataset_ids))

        # Phase 2: process in separate processes
        print("
=== Phase 2: Processing Datasets (Multiprocessing) ===")
        process_tasks = [loop.run_in_executor(executor, cpu_intensive_analysis, ds) for ds in datasets]
        return await asyncio.gather(*process_tasks)


async def main():
    dataset_ids = ["DS001", "DS002", "DS003", "DS004"]

    start = time.perf_counter()
    results = await process_with_hybrid_approach(dataset_ids)
    duration = time.perf_counter() - start

    print("
=== Results ===")
    for result in results:
        print(f"Dataset {result['id']}: {result['result']:.2f} (processed {result['count']} numbers)")

    print(f"
Total time: {duration:.2f}s")


asyncio.run(main())


SyntaxError: unterminated string literal (detected at line 31) (ipython-input-2881882735.py, line 31)

## Additional Resources

- Python Threading docs: https://docs.python.org/3/library/threading.html
- Python Multiprocessing docs: https://docs.python.org/3/library/multiprocessing.html
- Python Asyncio docs: https://docs.python.org/3/library/asyncio.html
- `concurrent.futures` module: https://docs.python.org/3/library/concurrent.futures.html
- Understanding the Python GIL: https://realpython.com/python-gil/

## Recap

- Threading: great for I/O-bound tasks with moderate concurrency.
- Asyncio: best for very high I/O concurrency with low overhead.
- Multiprocessing: best for CPU-bound tasks because it bypasses the GIL.
- Use locks for shared state, avoid deadlocks by consistent lock ordering.
- Mix models (async + executors) when your app has both I/O and CPU-heavy parts.