# 🧵 Mastering Threading in Python: Concurrency for I/O-Bound Tasks

**Welcome!** This notebook focuses on Python's `threading` module, a fundamental tool for achieving concurrency (handling multiple tasks seemingly simultaneously) within a single process. We'll explore how to create and manage threads, the critical importance of synchronization for shared data, and modern approaches using `concurrent.futures`.

**Target Audience:** Python developers needing to improve the responsiveness of applications involving I/O operations (like network requests, file access, GUIs) or manage concurrent workflows.

**Learning Objectives:**
*   Create, start, and manage `Thread` objects.
*   Understand the concept of shared memory between threads and its implications (race conditions).
*   Implement thread synchronization using `Lock`, `RLock`, `Event`, `Semaphore`, `Condition`.
*   Use `queue.Queue` for thread-safe communication and task distribution.
*   Understand daemon threads and their lifecycle.
*   Utilize the high-level `concurrent.futures.ThreadPoolExecutor` for simplified thread pool management.
*   Learn best practices for thread safety, resource management, and debugging.
*   Identify common pitfalls like race conditions and deadlocks.

## 1. Introduction: Why Use Threading?

As discussed in the comparison notebook, threading allows a single process to manage multiple execution contexts (threads) that share the same memory space.

**Key Use Case: I/O-Bound Tasks**
The primary benefit of threading in CPython (due to the GIL) is improving performance and responsiveness for **I/O-bound** tasks. When a thread makes a blocking I/O call (e.g., waiting for a network response, reading a large file), it releases the GIL, allowing other threads to run Python code or perform their own I/O. This prevents the entire application from freezing while waiting.

**Analogy: The Efficient Waiter**
Imagine a waiter (your program) serving multiple tables (tasks). 
*   **Without Threading:** The waiter takes an order from Table 1, goes to the kitchen, waits for the food, delivers it, takes the order from Table 2, waits, delivers, etc. Tables wait idly while the waiter is busy with *one* specific task (especially waiting).
*   **With Threading:** The waiter (main thread) can delegate tasks. One helper thread takes Table 1's order and goes to the kitchen. While waiting, the main waiter (or another thread) can take Table 2's order. Another thread might be waiting to deliver food. They coordinate (synchronization) but make progress on multiple tables concurrently, even if only one *chef* (CPU core executing Python) is active at a time due to the GIL. The overall throughput and responsiveness improve because waiting time is utilized effectively.

## 2. Creating and Managing Threads (`threading.Thread`)

The basic way to create a thread is using the `threading.Thread` class.

**Steps:**
1.  Define a target function that the thread will execute.
2.  Create a `Thread` instance, passing the `target` function and any arguments (`args` tuple or `kwargs` dict).
3.  Call the `start()` method on the `Thread` object. This invokes the target function in a new thread of control.
4.  Optionally, call the `join()` method on the `Thread` object. This makes the main thread (or calling thread) wait until the target thread finishes execution.

In [1]:
import threading
import time
import logging

logging.basicConfig(level=logging.INFO, 
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
                    force=True)

def worker_task(task_id: int, duration: float):
    """Simulates a task performed by a thread."""
    thread_name = threading.current_thread().name
    logging.info(f"Starting task {task_id}...")
    time.sleep(duration)
    logging.info(f"Finished task {task_id}.")

# --- Main Thread --- 
logging.info("Main thread starting.")

# Create thread objects
thread1 = threading.Thread(target=worker_task, args=(1, 1.5), name="Worker-A")
thread2 = threading.Thread(target=worker_task, args=(2, 0.5), name="Worker-B")

# Start the threads
logging.info("Starting threads...")
thread1.start()
thread2.start()
logging.info("Threads started.")

# Wait for threads to complete (optional but often needed)
logging.info("Waiting for threads to join...")
thread1.join()
thread2.join()
logging.info("All threads finished.")

logging.info("Main thread finished.")

[INFO] (MainThread) Main thread starting.
[INFO] (MainThread) Starting threads...
[INFO] (Worker-A  ) Starting task 1...
[INFO] (Worker-B  ) Starting task 2...
[INFO] (MainThread) Threads started.
[INFO] (MainThread) Waiting for threads to join...
[INFO] (Worker-B  ) Finished task 2.
[INFO] (Worker-A  ) Finished task 1.
[INFO] (MainThread) All threads finished.
[INFO] (MainThread) Main thread finished.


## 3. Shared Data and Race Conditions

**The Challenge:** Threads within the same process share memory. While convenient for communication, this is also the biggest source of complexity and bugs in threaded applications.

**Race Condition:** Occurs when multiple threads access and manipulate shared data concurrently, and the final result depends on the unpredictable order (timing) in which their operations are interleaved by the scheduler.

**Example:** Incrementing a shared counter.

In [2]:
import threading
import time

SHARED_COUNTER = 0
NUM_ITERATIONS = 100000 # Enough iterations to likely see the race condition

def increment_counter_unsafe():
    global SHARED_COUNTER
    for _ in range(NUM_ITERATIONS):
        # --- This sequence is NOT atomic --- 
        current_value = SHARED_COUNTER  # 1. Read
        # Context switch could happen here!
        current_value += 1            # 2. Increment local copy
        # Context switch could happen here!
        SHARED_COUNTER = current_value  # 3. Write back
        # ------------------------------------

print("--- Demonstrating Race Condition ---")
t1 = threading.Thread(target=increment_counter_unsafe)
t2 = threading.Thread(target=increment_counter_unsafe)

t1.start()
t2.start()

t1.join()
t2.join()

expected_value = 2 * NUM_ITERATIONS
print(f"Expected Counter Value: {expected_value}")
print(f"Actual Counter Value:   {SHARED_COUNTER}") # Likely less than expected!

# Why? Both threads might read the *same* value, increment their local copy,
# and then both write back the *same* incremented value, losing one increment.

--- Demonstrating Race Condition ---
Expected Counter Value: 200000
Actual Counter Value:   200000


## 4. Synchronization Primitives

To prevent race conditions and coordinate threads, Python's `threading` module provides several synchronization primitives.

### 4.1 `threading.Lock`: Mutual Exclusion
*   The most basic synchronization primitive.
*   Has two states: locked and unlocked.
*   `lock.acquire(blocking=True, timeout=-1)`: Acquires the lock. 
    *   If unlocked, it locks it and returns `True` immediately.
    *   If locked, it *blocks* until the lock is released (if `blocking=True`).
    *   If `blocking=False`, it returns `True` if acquired, `False` otherwise (doesn't block).
    *   `timeout` specifies max seconds to block.
*   `lock.release()`: Releases the lock. Raises `RuntimeError` if called on an unlocked lock or by a different thread than the one holding it.
*   **Best Practice:** Use a `Lock` as a context manager (`with lock:`) to ensure it's always released, even if errors occur.

In [3]:
import threading
import time

SHARED_COUNTER_LOCKED = 0
# NUM_ITERATIONS defined previously

# Create a lock object
counter_lock = threading.Lock()

def increment_counter_safe():
    global SHARED_COUNTER_LOCKED
    for _ in range(NUM_ITERATIONS):
        # Acquire the lock before accessing shared resource
        # --- Manual Acquire/Release (less safe) --- 
        # counter_lock.acquire()
        # try:
        #     current_value = SHARED_COUNTER_LOCKED
        #     current_value += 1
        #     SHARED_COUNTER_LOCKED = current_value
        # finally:
        #     counter_lock.release() # CRITICAL to release!
        
        # --- Using Lock as Context Manager (Recommended) --- 
        with counter_lock:
            current_value = SHARED_COUNTER_LOCKED
            current_value += 1
            SHARED_COUNTER_LOCKED = current_value
        # Lock is automatically released upon exiting the 'with' block

print("\n--- Fixing Race Condition with Lock --- ")
t3 = threading.Thread(target=increment_counter_safe)
t4 = threading.Thread(target=increment_counter_safe)

t3.start()
t4.start()

t3.join()
t4.join()

expected_value = 2 * NUM_ITERATIONS
print(f"Expected Counter Value: {expected_value}")
print(f"Actual Counter Value:   {SHARED_COUNTER_LOCKED}") # Should now be correct


--- Fixing Race Condition with Lock --- 
Expected Counter Value: 200000
Actual Counter Value:   200000


### 4.2 `threading.RLock`: Re-entrant Lock
*   A lock that can be acquired multiple times *by the same thread*.
*   It maintains an internal counter, incremented on `acquire()` and decremented on `release()` by the owning thread.
*   The lock is only fully released (and available to other threads) when the counter reaches zero.
*   Useful in recursive functions or complex scenarios where a thread might need to re-acquire a lock it already holds.

In [4]:
import threading

r_lock = threading.RLock()

def recursive_function(depth):
    if depth <= 0:
        print(f"({threading.current_thread().name}) Base case reached.")
        return
    
    print(f"({threading.current_thread().name}) Acquiring RLock (depth {depth})...")
    with r_lock:
        print(f"({threading.current_thread().name}) Acquired RLock (depth {depth}). Owning thread can re-acquire.")
        # Call recursively - the same thread can acquire the RLock again
        recursive_function(depth - 1)
    print(f"({threading.current_thread().name}) Released RLock (depth {depth}).")

print("\n--- RLock Demonstration ---")
r_thread = threading.Thread(target=recursive_function, args=(3,))
r_thread.start()
r_thread.join()

# Note: If a regular Lock were used above, the second acquire() call 
# within the same thread would block indefinitely, causing a deadlock.


--- RLock Demonstration ---
(Thread-7 (recursive_function)) Acquiring RLock (depth 3)...
(Thread-7 (recursive_function)) Acquired RLock (depth 3). Owning thread can re-acquire.
(Thread-7 (recursive_function)) Acquiring RLock (depth 2)...
(Thread-7 (recursive_function)) Acquired RLock (depth 2). Owning thread can re-acquire.
(Thread-7 (recursive_function)) Acquiring RLock (depth 1)...
(Thread-7 (recursive_function)) Acquired RLock (depth 1). Owning thread can re-acquire.
(Thread-7 (recursive_function)) Base case reached.
(Thread-7 (recursive_function)) Released RLock (depth 1).
(Thread-7 (recursive_function)) Released RLock (depth 2).
(Thread-7 (recursive_function)) Released RLock (depth 3).


### 4.3 `threading.Semaphore`: Bounded Resource Access
*   Manages an internal counter which is decremented by `acquire()` and incremented by `release()`.
*   The counter represents the number of available resources or allowed concurrent accesses.
*   If the counter is zero on `acquire()`, the call blocks until another thread calls `release()`.
*   Useful for limiting access to a resource with a fixed capacity (e.g., limiting concurrent downloads, database connections).

In [5]:
import threading
import time
import random

# Limit concurrent access to a resource to 3 threads
MAX_CONNECTIONS = 3
semaphore = threading.Semaphore(value=MAX_CONNECTIONS)

def access_resource(thread_id):
    print(f"Thread {thread_id}: Trying to acquire semaphore...")
    with semaphore: # Acquire semaphore (blocks if count is 0)
        print(f"Thread {thread_id}: Acquired semaphore. Accessing resource...")
        # Simulate using the resource
        time.sleep(random.uniform(0.5, 1.5))
        print(f"Thread {thread_id}: Releasing semaphore.")
    # Semaphore released automatically by 'with' statement

print("\n--- Semaphore Demonstration (Max 3 concurrent) ---")
threads = []
for i in range(7): # Create 7 threads trying to access the resource
    t = threading.Thread(target=access_resource, args=(i+1,))
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()

print("Semaphore demo finished.")


--- Semaphore Demonstration (Max 3 concurrent) ---
Thread 1: Trying to acquire semaphore...
Thread 1: Acquired semaphore. Accessing resource...
Thread 2: Trying to acquire semaphore...
Thread 2: Acquired semaphore. Accessing resource...
Thread 3: Trying to acquire semaphore...
Thread 3: Acquired semaphore. Accessing resource...
Thread 4: Trying to acquire semaphore...
Thread 5: Trying to acquire semaphore...
Thread 6: Trying to acquire semaphore...
Thread 7: Trying to acquire semaphore...
Thread 3: Releasing semaphore.
Thread 4: Acquired semaphore. Accessing resource...
Thread 2: Releasing semaphore.
Thread 5: Acquired semaphore. Accessing resource...
Thread 1: Releasing semaphore.
Thread 6: Acquired semaphore. Accessing resource...
Thread 5: Releasing semaphore.
Thread 7: Acquired semaphore. Accessing resource...
Thread 4: Releasing semaphore.
Thread 6: Releasing semaphore.
Thread 7: Releasing semaphore.
Semaphore demo finished.


### 4.4 `threading.Event`: Signaling Between Threads
*   A simple mechanism for one thread to signal an event to other threads.
*   Manages an internal flag.
*   `event.set()`: Sets the internal flag to `True`.
*   `event.clear()`: Resets the internal flag to `False`.
*   `event.wait(timeout=None)`: Blocks until the internal flag is `True`. If `timeout` is set, blocks for at most that many seconds.
*   `event.is_set()`: Returns `True` if the internal flag is set.

In [6]:
import threading
import time

event = threading.Event()

def waiter(event_obj: threading.Event, name: str):
    print(f"{name}: Waiting for event to be set...")
    event_obj.wait() # Blocks here until event.set() is called
    print(f"{name}: Event received! Proceeding.")
    # Do work after event
    time.sleep(0.1)
    print(f"{name}: Finished work.")

def setter(event_obj: threading.Event, delay: float):
    print(f"Setter: Sleeping for {delay} seconds...")
    time.sleep(delay)
    print("Setter: Setting the event!")
    event_obj.set()

print("\n--- Event Demonstration ---")

waiter1 = threading.Thread(target=waiter, args=(event, "Waiter-1"), name="Waiter-1")
waiter2 = threading.Thread(target=waiter, args=(event, "Waiter-2"), name="Waiter-2")
setter_thread = threading.Thread(target=setter, args=(event, 1.0), name="Setter")

waiter1.start()
waiter2.start()
setter_thread.start()

waiter1.join()
waiter2.join()
setter_thread.join()

print("Event demo finished.")


--- Event Demonstration ---
Waiter-1: Waiting for event to be set...
Waiter-2: Waiting for event to be set...
Setter: Sleeping for 1.0 seconds...
Setter: Setting the event!
Waiter-2: Event received! Proceeding.
Waiter-1: Event received! Proceeding.
Waiter-2: Finished work.
Waiter-1: Finished work.
Event demo finished.


### 4.5 `threading.Condition`: Complex Synchronization
*   Combines a `Lock` (or `RLock`) with the ability for threads to `wait()` for a condition and be `notify()`'d (or `notify_all()`) when the condition might be met.
*   Used for more complex producer-consumer scenarios or situations where threads need to wait for a specific state change protected by a lock.
*   `cond.acquire()` / `cond.release()`: Acquire/release the underlying lock.
*   `cond.wait()`: Releases the underlying lock and blocks until notified by another thread calling `notify()` or `notify_all()`. Re-acquires the lock before returning.
*   `cond.notify(n=1)`: Wakes up *at most* `n` threads waiting on the condition.
*   `cond.notify_all()`: Wakes up *all* threads waiting on the condition.

**Important:** Threads must acquire the lock *before* calling `wait()`, `notify()`, or `notify_all()`. `wait()` should typically be called inside a `while` loop that re-checks the actual condition, as notifications can be spurious.

In [7]:
import threading
import time
import collections

MAX_QUEUE_SIZE = 3
item_queue = collections.deque(maxlen=MAX_QUEUE_SIZE) # Shared resource
condition = threading.Condition() # Uses a Lock by default

def producer(cond: threading.Condition, queue: collections.deque):
    for i in range(7):
        with cond: # Acquire the lock
            while len(queue) >= MAX_QUEUE_SIZE:
                print(f"Producer: Queue full ({len(queue)} items). Waiting...")
                cond.wait() # Wait for consumer to make space
                print("Producer: Woke up.")
            
            item = f"Item-{i}"
            queue.append(item)
            print(f"Producer: Added {item}. Queue size: {len(queue)}")
            cond.notify() # Notify ONE waiting consumer
        time.sleep(0.3) # Simulate production time
    
    # Signal producer is done (optional)
    with cond:
         queue.append(None) # Sentinel value
         cond.notify_all() # Wake all consumers to check for None

def consumer(cond: threading.Condition, queue: collections.deque, consumer_id: int):
    while True:
        with cond:
            while not queue: # Check condition inside loop
                print(f"Consumer {consumer_id}: Queue empty. Waiting...")
                cond.wait() # Wait for producer to add item
                print(f"Consumer {consumer_id}: Woke up.")
                # Check if woken up by sentinel after waiting
                if queue and queue[0] is None:
                   print(f"Consumer {consumer_id}: Detected sentinel. Exiting.")
                   return 
                if not queue: # Check again after wait if queue still empty
                    continue 
                    
            item = queue.popleft()
            
            # Check for sentinel value
            if item is None:
                print(f"Consumer {consumer_id}: Received sentinel. Exiting.")
                # Put sentinel back for other consumers if any
                queue.append(None)
                cond.notify() # Notify another consumer
                break # Exit loop
                
            print(f"Consumer {consumer_id}: Consumed {item}. Queue size: {len(queue)}")
            cond.notify() # Notify producer that space is available
            
        # Simulate consumption time outside the lock
        time.sleep(random.uniform(0.4, 0.8))

print("\n--- Condition Variable Demonstration (Producer/Consumer) ---")
producer_thread = threading.Thread(target=producer, args=(condition, item_queue), name="Producer")
consumer1_thread = threading.Thread(target=consumer, args=(condition, item_queue, 1), name="Consumer-1")
consumer2_thread = threading.Thread(target=consumer, args=(condition, item_queue, 2), name="Consumer-2")

consumer1_thread.start()
consumer2_thread.start()
producer_thread.start()

producer_thread.join()
consumer1_thread.join()
consumer2_thread.join()

print("Condition demo finished.")


--- Condition Variable Demonstration (Producer/Consumer) ---
Consumer 1: Queue empty. Waiting...
Consumer 2: Queue empty. Waiting...
Producer: Added Item-0. Queue size: 1
Consumer 1: Woke up.
Consumer 1: Consumed Item-0. Queue size: 0
Consumer 2: Woke up.
Consumer 2: Queue empty. Waiting...
Producer: Added Item-1. Queue size: 1
Consumer 2: Woke up.
Consumer 2: Consumed Item-1. Queue size: 0
Producer: Added Item-2. Queue size: 1
Consumer 1: Consumed Item-2. Queue size: 0
Consumer 2: Queue empty. Waiting...
Producer: Added Item-3. Queue size: 1
Consumer 2: Woke up.
Consumer 2: Consumed Item-3. Queue size: 0
Consumer 1: Queue empty. Waiting...
Producer: Added Item-4. Queue size: 1
Consumer 1: Woke up.
Consumer 1: Consumed Item-4. Queue size: 0
Consumer 2: Queue empty. Waiting...
Producer: Added Item-5. Queue size: 1
Consumer 2: Woke up.
Consumer 2: Consumed Item-5. Queue size: 0
Consumer 1: Queue empty. Waiting...
Producer: Added Item-6. Queue size: 1
Consumer 1: Woke up.
Consumer 1: Con

## 5. Thread-Safe Data Exchange: `queue.Queue`

While locks protect shared variables, passing data between threads using shared variables directly can still be complex to manage correctly.

The `queue` module provides synchronized (thread-safe) queue classes (`Queue`, `LifoQueue`, `PriorityQueue`). These are excellent for passing tasks or data between threads without needing explicit locks for the queue operations themselves.

*   `q.put(item, block=True, timeout=None)`: Adds an item. Blocks if the queue is full (if `maxsize` was set).
*   `q.get(block=True, timeout=None)`: Removes and returns an item. Blocks if the queue is empty.
*   `q.task_done()`: Indicates that a formerly enqueued task is complete.
*   `q.join()`: Blocks until all items in the queue have been gotten and processed (i.e., `task_done()` called for each item put into the queue).
*   `q.qsize()`: Approximate size.
*   `q.empty()` / `q.full()`: Check status (can be unreliable in concurrent environments).

In [8]:
import queue
import threading
import time
import random

task_queue = queue.Queue(maxsize=5) # Optional max size
result_queue = queue.Queue()
SENTINEL = object() # Unique object to signal end of tasks

def task_producer(q: queue.Queue):
    """Produces tasks and puts them onto the queue."""
    for i in range(10):
        task_data = f"Task Data {i}"
        print(f"Producer: Adding '{task_data}' to queue.")
        q.put(task_data)
        time.sleep(random.uniform(0.1, 0.3))
    # Add sentinel values for each consumer thread
    for _ in range(NUM_WORKERS):
        q.put(SENTINEL) 
    print("Producer: Finished adding tasks and sentinels.")

def task_consumer(in_q: queue.Queue, out_q: queue.Queue, worker_id: int):
    """Consumes tasks from the queue, processes them, puts result on another queue."""
    while True:
        task_data = in_q.get() # Blocks if empty
        if task_data is SENTINEL:
            print(f"Worker {worker_id}: Received sentinel. Exiting.")
            in_q.task_done() # Mark sentinel as done
            break # Exit loop
        
        print(f"Worker {worker_id}: Processing '{task_data}'...")
        # Simulate work
        time.sleep(random.uniform(0.2, 0.6))
        result = task_data.upper()
        print(f"Worker {worker_id}: Finished '{task_data}'. Result: '{result}'")
        out_q.put((worker_id, result)) # Put result on output queue
        in_q.task_done() # Signal task completion

print("\n--- Queue Demonstration --- ")
NUM_WORKERS = 3

# Start producer thread
producer_t = threading.Thread(target=task_producer, args=(task_queue,), name="Producer")
producer_t.start()

# Start consumer threads
consumers = []
for i in range(NUM_WORKERS):
    consumer_t = threading.Thread(target=task_consumer, args=(task_queue, result_queue, i+1), name=f"Consumer-{i+1}")
    # No need for daemon=True if we use sentinels and join the queue
    consumer_t.start()
    consumers.append(consumer_t)

# Wait for producer to finish (optional)
producer_t.join()
print("Producer thread has finished.")

# Wait for all tasks in the queue to be processed
print("Main thread waiting for queue tasks to complete...")
task_queue.join() # Blocks until task_done() called for all items
print("All tasks processed (queue joined).")

# Consumers should exit now because of the sentinels
# Join consumer threads (good practice to ensure they terminate cleanly)
for c in consumers:
    c.join()

print("\n--- Processing Results --- ")
results_list = []
while not result_queue.empty():
    try:
        res = result_queue.get_nowait() # Use nowait as we know consumers are done
        results_list.append(res)
    except queue.Empty:
        break
print(f"Collected results: {results_list}")

print("Queue demo finished.")


--- Queue Demonstration --- 
Producer: Adding 'Task Data 0' to queue.
Worker 1: Processing 'Task Data 0'...
Producer: Adding 'Task Data 1' to queue.
Worker 2: Processing 'Task Data 1'...
Worker 1: Finished 'Task Data 0'. Result: 'TASK DATA 0'
Producer: Adding 'Task Data 2' to queue.
Worker 3: Processing 'Task Data 2'...
Worker 2: Finished 'Task Data 1'. Result: 'TASK DATA 1'
Producer: Adding 'Task Data 3' to queue.
Worker 1: Processing 'Task Data 3'...
Producer: Adding 'Task Data 4' to queue.
Worker 2: Processing 'Task Data 4'...
Worker 3: Finished 'Task Data 2'. Result: 'TASK DATA 2'
Producer: Adding 'Task Data 5' to queue.
Worker 3: Processing 'Task Data 5'...
Worker 1: Finished 'Task Data 3'. Result: 'TASK DATA 3'
Worker 2: Finished 'Task Data 4'. Result: 'TASK DATA 4'
Producer: Adding 'Task Data 6' to queue.
Worker 1: Processing 'Task Data 6'...
Producer: Adding 'Task Data 7' to queue.
Worker 2: Processing 'Task Data 7'...
Worker 3: Finished 'Task Data 5'. Result: 'TASK DATA 5'
Pr

## 6. Daemon Threads

A thread can be designated as a **daemon thread**. Daemon threads exit automatically when the main program exits (i.e., when all *non-daemon* threads have completed).

*   Set using `thread.daemon = True` **before** calling `start()`.
*   Useful for background tasks that are not critical and can be safely interrupted (e.g., periodic health checks, cleanup tasks).
*   **Caution:** Daemon threads are abruptly stopped. Resources they hold (like open files) might not be cleaned up properly. Avoid using them for tasks that need guaranteed completion or cleanup. Using signaling mechanisms (like `Event` or sentinel values in a `Queue`) is often safer for controlling thread termination.

In [13]:
import threading
import time
import logging

def background_task():
    thread_name = threading.current_thread().name
    counter = 0
    while True:
        counter += 1
        logging.info(f"Background task running... (Count: {counter})")
        time.sleep(0.5)
        # This loop runs forever unless the thread is a daemon

print("\n--- Daemon Thread Demonstration ---")

daemon_thread = threading.Thread(target=background_task, name="DaemonTask", daemon=True)
# If daemon=False (default), the program would hang here waiting for it.

daemon_thread.start()

# Main thread does some work and then exits
print("Main thread doing some work...")
time.sleep(2.1) 
print("Main thread exiting. Daemon thread will be terminated automatically.")

# No need to join the daemon thread explicitly if you want it to terminate with main

[INFO] (DaemonTask) Background task running... (Count: 1)



--- Daemon Thread Demonstration ---
Main thread doing some work...


[INFO] (DaemonTask) Background task running... (Count: 2)
[INFO] (DaemonTask) Background task running... (Count: 3)
[INFO] (DaemonTask) Background task running... (Count: 4)
[INFO] (DaemonTask) Background task running... (Count: 5)


Main thread exiting. Daemon thread will be terminated automatically.


[INFO] (DaemonTask) Background task running... (Count: 6)
[INFO] (DaemonTask) Background task running... (Count: 7)
[INFO] (DaemonTask) Background task running... (Count: 8)
[INFO] (DaemonTask) Background task running... (Count: 9)
[INFO] (DaemonTask) Background task running... (Count: 10)
[INFO] (DaemonTask) Background task running... (Count: 11)
[INFO] (DaemonTask) Background task running... (Count: 12)
[INFO] (DaemonTask) Background task running... (Count: 13)
[INFO] (DaemonTask) Background task running... (Count: 14)
[INFO] (DaemonTask) Background task running... (Count: 15)
[INFO] (DaemonTask) Background task running... (Count: 16)
[INFO] (DaemonTask) Background task running... (Count: 17)
[INFO] (DaemonTask) Background task running... (Count: 18)
[INFO] (DaemonTask) Background task running... (Count: 19)
[INFO] (DaemonTask) Background task running... (Count: 20)
[INFO] (DaemonTask) Background task running... (Count: 21)
[INFO] (DaemonTask) Background task running... (Count: 22)
[

## 7. Modern Approach: `concurrent.futures.ThreadPoolExecutor`

Managing individual threads, joining them, and handling results can be verbose. The `concurrent.futures` module provides a higher-level abstraction.

`ThreadPoolExecutor` manages a pool of worker threads:
*   You submit tasks (`submit(fn, *args, **kwargs)`).
*   It returns `Future` objects, representing the eventual result of the task.
*   You can check if futures are done (`.done()`), get their results (`.result()`, which blocks), or add callbacks (`.add_done_callback()`).
*   The executor handles thread creation, reuse, and joining automatically, often via a context manager.

In [10]:
import concurrent.futures
import time
import requests # Example I/O bound task (pip install requests)
from typing import List, Tuple

URLS = [
    'https://httpbin.org/delay/1', # Simulate 1 second delay
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/0.5',
    'https://httpbin.org/delay/1.5',
    'https://httpbin.org/status/404', # Example of a failed request
]

def download_url(url: str) -> Tuple[str, int, int]:
    """Downloads a URL and returns URL, status code, and content length."""
    thread_name = threading.current_thread().name
    print(f"({thread_name}) Fetching {url}...")
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        print(f"({thread_name}) Finished {url}")
        return url, response.status_code, len(response.content)
    except requests.exceptions.RequestException as e:
        print(f"({thread_name}) Error fetching {url}: {e}")
        return url, -1, 0 # Indicate error

print("\n--- ThreadPoolExecutor Demonstration --- ")
start_time_pool = time.perf_counter()

# Use context manager for automatic shutdown
# max_workers defaults to min(32, os.cpu_count() + 4) in Python 3.8+
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix='Downloader') as executor:
    # Method 1: Using submit() and collecting futures
    # futures = [executor.submit(download_url, url) for url in URLS]
    # results = []
    # for future in concurrent.futures.as_completed(futures):
    #     try:
    #         result = future.result() # Get result (blocks if not done, re-raises exceptions)
    #         results.append(result)
    #         print(f"Got result: {result}")
    #     except Exception as exc:
    #         print(f"Future generated an exception: {exc}")
            
    # Method 2: Using executor.map() (simpler for applying same function)
    # map() returns an iterator yielding results *in the order tasks were submitted*
    # It implicitly waits for all tasks to complete.
    # Exceptions raised during task execution will be raised when iterating over map results.
    print("Submitting tasks using executor.map...")
    results_iterator = executor.map(download_url, URLS)
    
    results = []
    try:
        for result in results_iterator:
             print(f"Got result from map: {result}")
             results.append(result)
    except Exception as exc:
         print(f"Map iterator raised an exception: {exc}")

end_time_pool = time.perf_counter()

print(f"\n--- Results --- ")
for res in results:
    print(res)
print(f"\nThreadPoolExecutor finished in {end_time_pool - start_time_pool:.2f} seconds")

# Compare with sequential execution (conceptual)
# sequential_start = time.perf_counter()
# sequential_results = [download_url(url) for url in URLS]
# sequential_end = time.perf_counter()
# print(f"\nSequential finished in {sequential_end - sequential_start:.2f} seconds")


--- ThreadPoolExecutor Demonstration --- 
Submitting tasks using executor.map...
(Downloader_0) Fetching https://httpbin.org/delay/1...
(Downloader_1) Fetching https://httpbin.org/delay/2...
(Downloader_2) Fetching https://httpbin.org/delay/0.5...
(Downloader_2) Finished https://httpbin.org/delay/0.5
(Downloader_2) Fetching https://httpbin.org/delay/1.5...
(Downloader_0) Finished https://httpbin.org/delay/1
(Downloader_0) Fetching https://httpbin.org/status/404...
Got result from map: ('https://httpbin.org/delay/1', 200, 356)
(Downloader_1) Finished https://httpbin.org/delay/2
Got result from map: ('https://httpbin.org/delay/2', 200, 356)
Got result from map: ('https://httpbin.org/delay/0.5', 200, 358)
(Downloader_0) Error fetching https://httpbin.org/status/404: 404 Client Error: NOT FOUND for url: https://httpbin.org/status/404
(Downloader_2) Finished https://httpbin.org/delay/1.5
Got result from map: ('https://httpbin.org/delay/1.5', 200, 358)
Got result from map: ('https://httpbin

**Benefit:** `ThreadPoolExecutor` abstracts away the manual thread creation, starting, joining, and result collection, making concurrent I/O tasks much easier to manage.

## 8. Best Practices & Enterprise Considerations

1.  **Use for I/O-Bound Tasks:** Threading is most effective in CPython when tasks spend significant time waiting for external operations.
2.  **Protect Shared Data:** *Always* use locks (`Lock`, `RLock`) or other appropriate synchronization primitives (`Semaphore`, `Event`, `Condition`) when multiple threads access mutable shared data. Use the `with` statement for locks.
3.  **Prefer `queue.Queue`:** Use thread-safe queues for passing data/tasks between threads instead of complex manual locking on shared variables.
4.  **Use `ThreadPoolExecutor`:** Prefer the high-level `concurrent.futures.ThreadPoolExecutor` for managing pools of threads over manual `threading.Thread` management for simpler use cases.
5.  **Avoid Daemon Threads for Critical Tasks:** Ensure critical tasks complete and resources are cleaned up. Use signaling mechanisms (`Event`, sentinel values in queues) instead of relying on daemon behavior for termination.
6.  **Beware of Deadlocks:** Occur when threads are stuck waiting for each other to release locks (e.g., Thread A holds Lock 1 and waits for Lock 2, while Thread B holds Lock 2 and waits for Lock 1). Avoid complex locking patterns; acquire locks in a consistent order.
7.  **Resource Management:** Ensure resources acquired by a thread (files, connections) are properly released, even if errors occur (use `try...finally` or context managers within the thread's task).
8.  **Testing:** Threading bugs (race conditions, deadlocks) can be hard to reproduce. Use tools, stress testing, and careful code design. Consider testing with tools like `ThreadSanitizer` if possible (more common in C/C++ contexts).
9.  **Limit Thread Pool Size:** Creating too many threads can consume excessive memory and lead to performance degradation due to context switching overhead. `ThreadPoolExecutor`'s default is usually sensible.

## 9. Pitfalls and Common Interview Questions

**Common Pitfalls:**

*   **Race Conditions:** Unprotected access to shared mutable data.
*   **Deadlocks:** Threads waiting indefinitely for locks held by each other.
*   **Forgetting `join()`:** The main thread might exit before worker threads complete their tasks if not joined (unless they are daemons).
*   **Misusing Locks:** Forgetting to release a lock, using `Lock` where `RLock` is needed (recursion), unnecessary locking reducing concurrency.
*   **Ignoring Return Values/Exceptions:** Not properly retrieving results or handling exceptions from threads (easier with `concurrent.futures`).
*   **Using Threading for CPU-Bound Tasks (in CPython):** Expecting parallelism due to the GIL limitation.
*   **Resource Leaks:** Threads terminating (especially daemons) without releasing resources.

**Common Interview Questions:**

1.  What is the difference between a thread and a process?
2.  What is the Global Interpreter Lock (GIL) and how does it affect threading in CPython?
3.  When is threading a suitable concurrency model in Python?
4.  What is a race condition? How can you prevent it?
5.  Explain the purpose of `threading.Lock`. How should it typically be used?
6.  What is the difference between `Lock` and `RLock`?
7.  How can threads communicate safely? (Mention `queue.Queue`).
8.  What is a daemon thread?
9.  What are the advantages of using `concurrent.futures.ThreadPoolExecutor` over managing `threading.Thread` objects directly?
10. What is a deadlock?

## 10. Challenge: Threaded Web Status Checker

**Goal:** Create a script that checks the status code of multiple URLs concurrently using threads.

**Tasks:**

1.  **URL List:** Define a list of URLs to check (include some valid, some potentially slow, and maybe one invalid/404 URL).
2.  **Checker Function:** Create a function `check_url_status(url: str) -> Tuple[str, Optional[int]]` that:
    *   Takes a URL string as input.
    *   Uses the `requests` library (`pip install requests`) to make a GET request to the URL (use a timeout, e.g., 5 seconds).
    *   Handles potential `requests.exceptions.RequestException` errors.
    *   Returns a tuple containing the URL and the HTTP status code (or `None` if an error occurred).
    *   Include logging within the function to show which thread is checking which URL.
3.  **Concurrent Execution:**
    *   Use `concurrent.futures.ThreadPoolExecutor` to run the `check_url_status` function concurrently for all URLs in the list.
    *   Configure the executor with a reasonable number of worker threads (e.g., 5).
4.  **Collect and Print Results:**
    *   Retrieve the results from the executor (e.g., using `executor.map` or `submit`/`as_completed`).
    *   Print the status for each URL.
5.  **Timing:** Measure and print the total time taken for checking all URLs concurrently.

**(Bonus):** Implement the same task sequentially (without threads) and compare the execution time.

In [12]:
# --- Solution Space for Challenge ---
import concurrent.futures
import threading
import requests
import time
import logging
from typing import List, Tuple, Optional

logging.basicConfig(level=logging.INFO, 
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
                    force=True)

# 1. URL List
URLS_TO_CHECK = [
    'https://www.google.com',
    'https://httpbin.org/delay/1', # Delays 1 second
    'https://httpbin.org/delay/0.5',
    'https://httpbin.org/status/404', # Not Found
    'https://httpbin.org/delay/1.5',
    'https://invalid.url.that.does.not.exist',
    'https://www.github.com',
]

# 2. Checker Function
def check_url_status(url: str) -> Tuple[str, Optional[int]]:
    """Checks the HTTP status code for a given URL."""
    thread_name = threading.current_thread().name
    logging.info(f"Checking {url}...")
    try:
        response = requests.get(url, timeout=10) # Increased timeout
        # No raise_for_status needed if we just want the code
        status_code = response.status_code
        logging.info(f"Finished {url} - Status: {status_code}")
        return url, status_code
    except requests.exceptions.Timeout:
        logging.warning(f"Timeout occurred for {url}")
        return url, None
    except requests.exceptions.ConnectionError:
        logging.warning(f"Connection error for {url}")
        return url, None
    except requests.exceptions.RequestException as e:
        logging.error(f"Error checking {url}: {type(e).__name__}")
        return url, None # Return None for other request errors

# 3 & 4: Concurrent Execution and Result Collection
MAX_WORKERS = 5
all_results = []

print(f"--- Starting Concurrent URL Check (Max Workers: {MAX_WORKERS}) ---")
start_time = time.perf_counter()

with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix='Checker') as executor:
    # Using map for simplicity - order of results matches URLS_TO_CHECK
    results_iterator = executor.map(check_url_status, URLS_TO_CHECK)
    
    # Collect results (map implicitly waits)
    all_results = list(results_iterator)

end_time = time.perf_counter()

# 5. Print Results and Timing
print("\n--- Concurrent Check Results ---")
for url, status in all_results:
    print(f"  {url:<50} -> Status: {status if status is not None else 'Error/Timeout'}")

print(f"\nTotal time (concurrent): {end_time - start_time:.2f} seconds")

# Bonus: Sequential Comparison
print("\n--- Starting Sequential URL Check ---")
start_time_seq = time.perf_counter()
sequential_results = []
for url in URLS_TO_CHECK:
    # Simulate running in the 'main' thread context
    sequential_results.append(check_url_status(url))
end_time_seq = time.perf_counter()

print("\n--- Sequential Check Results ---")
for url, status in sequential_results:
     print(f"  {url:<50} -> Status: {status if status is not None else 'Error/Timeout'}")
print(f"\nTotal time (sequential): {end_time_seq - start_time_seq:.2f} seconds")

[INFO] (Checker_0 ) Checking https://www.google.com...
[INFO] (Checker_1 ) Checking https://httpbin.org/delay/1...
[INFO] (Checker_2 ) Checking https://httpbin.org/delay/0.5...
[INFO] (Checker_3 ) Checking https://httpbin.org/status/404...
[INFO] (Checker_4 ) Checking https://httpbin.org/delay/1.5...


--- Starting Concurrent URL Check (Max Workers: 5) ---


[INFO] (Checker_0 ) Finished https://www.google.com - Status: 200
[INFO] (Checker_0 ) Checking https://invalid.url.that.does.not.exist...
[INFO] (Checker_0 ) Checking https://www.github.com...
[INFO] (Checker_0 ) Finished https://www.github.com - Status: 200
[INFO] (Checker_3 ) Finished https://httpbin.org/status/404 - Status: 404
[INFO] (Checker_2 ) Finished https://httpbin.org/delay/0.5 - Status: 200
[INFO] (Checker_1 ) Finished https://httpbin.org/delay/1 - Status: 200
[INFO] (Checker_4 ) Finished https://httpbin.org/delay/1.5 - Status: 200
[INFO] (MainThread) Checking https://www.google.com...



--- Concurrent Check Results ---
  https://www.google.com                             -> Status: 200
  https://httpbin.org/delay/1                        -> Status: 200
  https://httpbin.org/delay/0.5                      -> Status: 200
  https://httpbin.org/status/404                     -> Status: 404
  https://httpbin.org/delay/1.5                      -> Status: 200
  https://invalid.url.that.does.not.exist            -> Status: Error/Timeout
  https://www.github.com                             -> Status: 200

Total time (concurrent): 2.75 seconds

--- Starting Sequential URL Check ---


[INFO] (MainThread) Finished https://www.google.com - Status: 200
[INFO] (MainThread) Checking https://httpbin.org/delay/1...
[INFO] (MainThread) Finished https://httpbin.org/delay/1 - Status: 200
[INFO] (MainThread) Checking https://httpbin.org/delay/0.5...
[INFO] (MainThread) Finished https://httpbin.org/delay/0.5 - Status: 200
[INFO] (MainThread) Checking https://httpbin.org/status/404...
[INFO] (MainThread) Finished https://httpbin.org/status/404 - Status: 404
[INFO] (MainThread) Checking https://httpbin.org/delay/1.5...
[INFO] (MainThread) Finished https://httpbin.org/delay/1.5 - Status: 200
[INFO] (MainThread) Checking https://invalid.url.that.does.not.exist...
[INFO] (MainThread) Checking https://www.github.com...
[INFO] (MainThread) Finished https://www.github.com - Status: 200



--- Sequential Check Results ---
  https://www.google.com                             -> Status: 200
  https://httpbin.org/delay/1                        -> Status: 200
  https://httpbin.org/delay/0.5                      -> Status: 200
  https://httpbin.org/status/404                     -> Status: 404
  https://httpbin.org/delay/1.5                      -> Status: 200
  https://invalid.url.that.does.not.exist            -> Status: Error/Timeout
  https://www.github.com                             -> Status: 200

Total time (sequential): 8.76 seconds


## 11. Conclusion

Threading in Python, facilitated by the `threading` module and the high-level `concurrent.futures.ThreadPoolExecutor`, provides an effective mechanism for achieving concurrency, particularly for I/O-bound tasks. While the GIL prevents true parallelism for CPU-bound work in CPython, threading allows applications to remain responsive by performing other tasks while waiting for I/O.

The key challenge lies in managing shared data safely using synchronization primitives like Locks or thread-safe structures like Queues. By understanding these concepts and applying best practices, you can leverage threading to build more efficient and responsive Python applications.