# Programming with Python

## Lecture 12: Concurrency 4

### Armen Gabrielyan

#### Yerevan State University / ASDS

#### 10 May, 2025

# Multi-processing

## CPU-bound task

We already know that in Python CPU-intensive tasks are best handled with `multiprocessing` rather than `threading`, mainly due to the limitations of the Global Interpreter Lock (GIL).

The GIL ensures that only one thread executes Python bytecode at a time, even on multi-core processors. This means that:

- Threading does not provide real parallelism for CPU-bound tasks.
- Threads still take turns using the CPU, resulting in limited performance gain or even overhead from context switching.
- In contrast, multiprocessing creates separate processes, each with its own Python interpreter and memory space, allowing for true parallel execution across multiple CPU cores.

Key points

- Multiprocessing bypasses the GIL, enabling full CPU core usage.
- Threading is limited by the GIL for CPU-bound work.
- Multiprocessing is ideal for tasks like number crunching, image processing, or simulations.
- Threading is better suited for I/O-bound tasks (e.g., file reads, network requests).

In summary, due to the GIL, `threading` is ineffective for CPU-heavy workloads, whereas `multiprocessing` provides actual parallelism and improved performance.

Let's see this in action with one more example.

### Prime number checking

**See practical example 1**.

## References

- [multiprocessing — Process-based parallelism](https://docs.python.org/3/library/multiprocessing.html)

# Concurrent Executors and Futures

Python's `concurrent.futures` module provides a high-level interface for asynchronously executing tasks using threads or processes. It combines clean syntax with powerful concurrent programming capabilities.

## Key Components

### Futures

A `concurrent.futures.Future` represents the result of an asynchronous computation. It's a placeholder for a value that will be available when the computation completes. It's used to check on, retrieve, or cancel the result of a function submitted to an executor (via `Executor.submit()`).

Some key methods:

1. **`result(timeout=None)`**
   - Returns the result of the computation.
   - Blocks until the result is ready or `timeout` is reached.
   - Raises the exception if the function raised one.

2. **`exception(timeout=None)`**
   - Returns the exception raised by the function (if any), or `None`.
   - Blocks like `result()`.

3. **`done()`**
   - Returns `True` if the computation is complete (with or without success).

4. **`cancel()`**
   - Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return `False`, otherwise the call will be cancelled and the method will return `True`.

5. **`cancelled()`**
   - Returns `True` if the future was cancelled.

6. **`running()`**
   - Returns `True` if the function is currently being executed.

7. **`add_done_callback(fn)`**
   - Attaches a callable to be run when the future is done.

```python
future = executor.submit(function, arg1, arg2)  # Returns immediately

# Check if done without blocking
if future.done():
    print("Task completed")

# Get result (will block until ready)
result = future.result()
```

### Executors

A `concurrent.futures.Executor` is an abstract base class for asynchronous execution of tasks. It isn’t used directly—instead, you use one of its subclasses like `concurrent.futures.ThreadPoolExecutor` or `concurrent.futures.ProcessPoolExecutor`.

`Executor`'s key methods are:

1. **`submit(fn, *args, **kwargs)`**
- Schedules a function `fn` to run with the given arguments.
- Returns a `Future` object representing the eventual result.

**Example:**
```python
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
```

2. **`map(fn, *iterables, timeout=None, chunksize=1)`**
- Works like the built-in `map()`, but executes function calls concurrently.
- Collects all inputs up front.
- Returns an iterator over results.
- Supports a `timeout` to limit how long it waits for results.
- In `ProcessPoolExecutor`, tasks are split into chunks (controlled by `chunksize`) for performance. `ThreadPoolExecutor` ignores `chunksize`.

3. **`shutdown(wait=True, cancel_futures=False)`**
- Cleans up executor resources once tasks finish.
- After calling `shutdown()`, you can’t submit new tasks.
- If `wait=True`, the call waits until all tasks complete.
- If `cancel_futures=True`, it cancels tasks that haven’t started yet.

**Best practice:** Use the `with` statement to ensure proper cleanup:

```python
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    # Submit more copy tasks...
```

The `concurrent.futures` module provides two primary executor classes:

1. `ThreadPoolExecutor`: Uses threads for concurrent execution

```python
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=4) as executor:
    # Submit tasks to be executed concurrently
```

2. `ProcessPoolExecutor`: Uses processes for concurrent execution

```python
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor(max_workers=4) as executor:
    # Submit tasks to be executed concurrently
```

### Module functions

1. **`concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)`**

This function waits for a group of `Future` objects (from one or more executors) to complete.

- **Parameters:**
  - `fs`: A collection of `Future` instances.
  - `timeout`: Maximum time (in seconds) to wait. If not set or `None`, it waits indefinitely.
  - `return_when`: Determines when the function returns. Can be:
    - `FIRST_COMPLETED`: Returns as soon as one future is done or cancelled.
    - `FIRST_EXCEPTION`: Returns when any future raises an exception.
    - `ALL_COMPLETED`: Waits until all futures are finished or cancelled.

- **Returns:** A named 2-tuple of sets:
  - `done`: Futures that are completed (including cancelled).
  - `not_done`: Futures still running or waiting.


2. **`concurrent.futures.as_completed(fs, timeout=None)`**

This function provides an iterator that yields futures in the order they finish.

- **Parameters:**
  - `fs`: A group of `Future` objects.
  - `timeout`: Time limit in seconds for waiting on each result. If not set or `None`, it waits indefinitely.

- **Behavior:**
  - Already-finished futures are yielded first.
  - Yields each future as soon as it completes.
  - Raises `TimeoutError` if a future doesn't complete within the timeout when iterating.

### `concurrent.futures.ThreadPoolExecutor`

An `concurrent.futures.Executor` subclass that uses a pool of at most `max_workers` (given as an argument) threads to execute calls asynchronously.

All threads enqueued to `ThreadPoolExecutor` will be joined before the interpreter can exit.

### `concurrent.futures.ProcessPoolExecutor`

`ProcessPoolExecutor` is a high-level interface `concurrent.futures` module that allows you to run CPU-bound tasks in separate processes concurrently. It's part of Python’s standard library and provides a simple way to achieve parallelism using multiple processes, as opposed to `ThreadPoolExecutor` which uses threads (better for I/O-bound tasks).

### Practical examples

**See practical example 2 about *futures***.

**See practical example 3 about *executors***.

**See practical example 4 about *thread pool executors***.

**See practical example 5 about *thread pool executors***.

**See practical example 6 about *web scraper***.

**See practical example 7 about *prime checking***.

## References

- [concurrent.futures — Launching parallel tasks](https://docs.python.org/3/library/concurrent.futures.html)

# Async programming with Async IO

**Asynchronous programming** is a powerful paradigm for writing efficient, non-blocking code and Python's Async IO framework provides excellent tools for implementing it. Async IO is built around coroutines, event loops and the `async`/`await` syntax. The basic idea is to allow tasks to pause execution while waiting for slow operations (like network or file I/O), letting other tasks run during that wait time.

Async IO is a *single-threaded, single-process design* that uses **cooperative multitasking**. This is a fundamental characteristic that distinguishes it from other concurrency models:

1. **Single thread** - All code runs on a single thread, unlike threading or multiprocessing.   
2. **Explicit yield points** - Tasks voluntarily yield control at `await` statements, allowing other tasks to run.
3. **Non-preemptive** - The scheduler doesn't interrupt tasks; they must cooperate by yielding control.
4. **Event loop coordination** - The event loop manages the scheduling of coroutines as they yield control.

This cooperative approach means tasks must be well-behaved - a task that performs CPU-intensive operations without yielding will block the entire event loop, preventing other tasks from running.

### Async IO explained

Async IO may at first seem counterintuitive and paradoxical. How does something that facilitates concurrent code use a single thread and a single CPU core? Miguel Grinberg’s 2017 PyCon talk explains everything quite beautifully:

> Chess master Judit Polgár hosts a chess exhibition in which she plays multiple amateur players. She has two ways of conducting the exhibition: synchronously and asynchronously.
>
> Assumptions:
>
> - 24 opponents
> - Judit makes each chess move in 5 seconds
> - Opponents each take 55 seconds to make a move
> - Games average 30 pair-moves (60 moves total)
>
> **Synchronous version:** Judit plays one game at a time, never two at the same time, until the game is complete. > Each game takes (55 + 5) * 30 == 1800 seconds, or 30 minutes. The entire exhibition takes 24 * 30 == 720 minutes, or *12 hours*.
> 
> **Asynchronous version:** Judit moves from table to table, making one move at each table. She leaves the table and lets the opponent make their next move during the wait time. One move on all 24 games takes Judit 24 * 5 == 120 seconds, or 2 minutes. The entire exhibition is now cut down to 120 * 30 == 3600 seconds, or just *1 hour*. 
>
> [Source](https://www.youtube.com/watch?v=iG6fr81xHKA&t=269s)

## Key components:

1. **(Native) Coroutines**: Functions defined with `async def` that can be paused at `await` points to let other coroutines run and later be resumed.
2. **Event loop**: The central execution mechanism that runs and manages coroutines.
3. **Tasks**: Wrapped coroutines that run concurrently in the event loop.
4. **Awaitables**: Objects that can be used with the `await` keyword (coroutines, Tasks, Futures)

Async IO is built into `asyncio` library, which is to write concurrent code using the `async/await` syntax.

In [9]:
import asyncio

### Intro to `asyncio` functions

- `asyncio.sleep(delay, result=None)` is an awaitable, meaning it can be used with `await` to pause the coroutine. Its a coroutine that suspends the current task, allowing other tasks to run. While `time.sleep()` can represent any time-consuming blocking function call, `asyncio.sleep()` is used to stand in for a non-blocking call (but one that also takes some time to complete).
- `asyncio.run(coro, *, debug=None, loop_factory=None)` is a function that executes the coroutine `coro` and returns the result.

## Coroutines

**Coroutines** are the foundation of Async IO's cooperative multitasking model. They're special functions that can pause execution, yield control back to the event loop and later resume from where they left off.

1. **Definition**: Native coroutines are defined using the `async def` syntax
2. **Suspension Points**: They can suspend execution at `await` expressions
3. **State Preservation**: When suspended, they maintain their state (local variables, execution position)
4. **Non-blocking**: They don't block the thread while waiting for operations to complete

In [10]:
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

In [11]:
say_hello

<function __main__.say_hello()>

In [12]:
say_hello()

<coroutine object say_hello at 0x1238f0640>

**See practical example 8.**

### More `asyncio` functions

- #### `asyncio.gather(*aws, return_exceptions=False)`

Run awaitable objects in the `aws` sequence concurrently. It is used to run multiple coroutines at the same time, and wait until all of them complete.

If `return_exceptions` is `False` (default), the first raised exception is immediately propagated to the task that awaits on `gather()`. Other awaitables in the `aws` sequence won’t be cancelled and will continue to run.

If `return_exceptions` is `True`, exceptions are treated the same as successful results, and aggregated in the result list.

**See practical example 9.**

**See practical example 10.**