## 1. ThreadPoolExecutor Basics

In [1]:
from concurrent.futures import ThreadPoolExecutor
import time

print("=" * 50)
print("THREADPOOLEXECUTOR")
print("=" * 50)

def fetch_url(url):
    print(f"Fetching {url}...")
    time.sleep(2)  # Simulate network request
    return f"Response from {url}"

urls = ['url1', 'url2', 'url3', 'url4', 'url5']

# Using ThreadPoolExecutor
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    # map returns results in order
    results = executor.map(fetch_url, urls)
    for result in results:
        print(result)
elapsed = time.time() - start
print(f"\nTime taken: {elapsed:.2f}s")

THREADPOOLEXECUTOR
Fetching url1...
Fetching url2...
Fetching url3...
Fetching url4...
Fetching url5...
Response from url1
Response from url2
Response from url3
Response from url4
Response from url5

Time taken: 4.01s


## 2. ThreadPoolExecutor with submit()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

print("\n" + "=" * 50)
print("THREADPOOLEXECUTOR - SUBMIT")
print("=" * 50)

def download_file(filename):
    print(f"Downloading {filename}...")
    time.sleep(1)
    return f"Downloaded {filename}"

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = []
    for i in range(5):
        future = executor.submit(download_file, f"file{i}.txt")
        futures.append(future)
    
    # Process results as they complete
    print("\nResults (as completed):")
    for future in as_completed(futures):
        print(future.result())

## 3. ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import time
import os

print("\n" + "=" * 50)
print("PROCESSPOOLEXECUTOR")
print("=" * 50)

def cpu_intensive(n):
    print(f"Processing {n} in PID {os.getpid()}...")
    count = 0
    for i in range(n):
        count += i ** 2
    return count

if __name__ == '__main__':
    numbers = [10_000_000, 10_000_000, 10_000_000, 10_000_000]
    
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(cpu_intensive, numbers)
        for result in results:
            print(f"Result: {result}")
    elapsed = time.time() - start
    print(f"\nTime taken: {elapsed:.2f}s")

## 4. Error Handling with Executors

from concurrent.futures import ThreadPoolExecutor, as_completed

print("\n" + "=" * 50)
print("ERROR HANDLING")
print("=" * 50)

def task_with_error(task_id):
    if task_id % 2 == 0:
        raise ValueError(f"Task {task_id} failed")
    return f"Task {task_id} completed"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {
        executor.submit(task_with_error, i): i
        for i in range(5)
    }
    
    for future in as_completed(futures):
        task_id = futures[future]
        try:
            result = future.result()
            print(f"Success: {result}")
        except Exception as e:
            print(f"Error: Task {task_id} - {e}")

## 5. Timeout Handling

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

print("\n" + "=" * 50)
print("TIMEOUT HANDLING")
print("=" * 50)

def slow_task(duration):
    time.sleep(duration)
    return f"Completed in {duration}s"

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [
        executor.submit(slow_task, 1),
        executor.submit(slow_task, 5),
    ]
    
    for future in futures:
        try:
            result = future.result(timeout=2)  # 2 second timeout
            print(f"Result: {result}")
        except TimeoutError:
            print("Task timed out!")

## 6. Executor Comparison

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

print("\n" + "=" * 50)
print("EXECUTOR COMPARISON")
print("=" * 50)

comparison = """
ThreadPoolExecutor:
- Use for: I/O-bound tasks
- Pros: Simple, low overhead, shared memory
- Cons: GIL limits CPU-bound performance
- Example: Web scraping, API calls, file operations

ProcessPoolExecutor:
- Use for: CPU-bound tasks
- Pros: True parallelism, multi-core support
- Cons: Higher overhead, separate memory spaces
- Example: Data processing, ML training, simulations

Executor Methods:
- map():          Applies function to each item, returns iterator
- submit():       Returns Future immediately, non-blocking
- shutdown():     Wait for all tasks to complete
"""
print(comparison)

## 7. Real-World Example: Batch Processing

from concurrent.futures import ThreadPoolExecutor
import time
from typing import List

print("\n" + "=" * 50)
print("BATCH PROCESSING")
print("=" * 50)

class DataProcessor:
    def __init__(self, max_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def process_batch(self, data: List):
        futures = []
        for item in data:
            future = self.executor.submit(self._process_item, item)
            futures.append(future)
        
        results = []
        for future in futures:
            try:
                results.append(future.result(timeout=5))
            except Exception as e:
                results.append(f"Error: {e}")
        return results
    
    def _process_item(self, item):
        print(f"Processing {item}...")
        time.sleep(0.5)
        return f"Processed {item}"
    
    def shutdown(self):
        self.executor.shutdown(wait=True)

# Usage
processor = DataProcessor(max_workers=3)
data = ['item1', 'item2', 'item3', 'item4', 'item5']
results = processor.process_batch(data)
for result in results:
    print(result)
processor.shutdown()