### 1. Understanding Concurrency
Difference between concurrency, parallelism, threading, and multiprocessing.

In [None]:
import time

# Sequential execution
def task(name, duration):
    print(f"Task {name} starting")
    time.sleep(duration)
    print(f"Task {name} completed")
    return f"Result from {name}"

print("Sequential Execution:")
start = time.time()
task("A", 2)
task("B", 2)
task("C", 2)
end = time.time()
print(f"\nTotal time: {end - start:.2f} seconds")
print("\nAll tasks run one after another")

### 2. Basic Threading
Using the threading module to run tasks concurrently.

In [None]:
import threading
import time

def worker(name, duration):
    print(f"Worker {name} starting")
    time.sleep(duration)
    print(f"Worker {name} finished")

print("Threading Example:")
start = time.time()

# Create threads
thread1 = threading.Thread(target=worker, args=("A", 2))
thread2 = threading.Thread(target=worker, args=("B", 2))
thread3 = threading.Thread(target=worker, args=("C", 2))

# Start threads
thread1.start()
thread2.start()
thread3.start()

# Wait for all threads to complete
thread1.join()
thread2.join()
thread3.join()

end = time.time()
print(f"\nTotal time: {end - start:.2f} seconds")
print("All tasks ran concurrently!")

### 3. Thread with Return Values
Getting results from threads.

In [None]:
import threading
import time

class ThreadWithReturn(threading.Thread):
    """Thread that returns a value"""
    def __init__(self, target, args=()):
        super().__init__()
        self.target = target
        self.args = args
        self.result = None
    
    def run(self):
        self.result = self.target(*self.args)
    
    def get_result(self):
        return self.result

def calculate_square(n):
    time.sleep(1)
    return n ** 2

# Create and start threads
threads = []
for i in range(1, 6):
    thread = ThreadWithReturn(target=calculate_square, args=(i,))
    thread.start()
    threads.append(thread)

# Wait and collect results
results = []
for thread in threads:
    thread.join()
    results.append(thread.get_result())

print(f"Squares: {results}")

### 4. Thread Synchronization with Locks
Preventing race conditions.

In [None]:
import threading
import time

# Shared resource
counter = 0
lock = threading.Lock()

def increment_without_lock(name, times):
    global counter
    for _ in range(times):
        temp = counter
        time.sleep(0.0001)  # Simulate processing
        counter = temp + 1

def increment_with_lock(name, times):
    global counter
    for _ in range(times):
        with lock:
            temp = counter
            time.sleep(0.0001)
            counter = temp + 1

# Without lock (race condition)
print("Without Lock (Race Condition):")
counter = 0
threads = []
for i in range(5):
    thread = threading.Thread(target=increment_without_lock, args=(f"T{i}", 100))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter (without lock): {counter}")
print(f"Expected: {5 * 100}")

# With lock
print("\nWith Lock (Safe):")
counter = 0
threads = []
for i in range(5):
    thread = threading.Thread(target=increment_with_lock, args=(f"T{i}", 100))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"Final counter (with lock): {counter}")
print(f"Expected: {5 * 100}")

### 5. ThreadPoolExecutor
Managing a pool of threads efficiently.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def download_file(file_id):
    """Simulate downloading a file"""
    print(f"Downloading file {file_id}...")
    time.sleep(2)  # Simulate network delay
    print(f"Completed file {file_id}")
    return f"Data from file {file_id}"

print("Using ThreadPoolExecutor:")
start = time.time()

# Create thread pool
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit tasks
    futures = [executor.submit(download_file, i) for i in range(1, 6)]
    
    # Get results as they complete
    for future in as_completed(futures):
        result = future.result()
        print(f"Result: {result}")

end = time.time()
print(f"\nTotal time: {end - start:.2f} seconds")

### 6. Introduction to Multiprocessing
Using multiple processes for CPU-bound tasks.

In [None]:
import multiprocessing
import time

def cpu_intensive_task(n):
    """Simulate CPU-intensive work"""
    print(f"Process {multiprocessing.current_process().name} starting with {n}")
    result = sum(i * i for i in range(n))
    print(f"Process {multiprocessing.current_process().name} finished")
    return result

if __name__ == '__main__':
    print("Multiprocessing Example:")
    start = time.time()
    
    # Create processes
    processes = []
    for i in range(4):
        process = multiprocessing.Process(
            target=cpu_intensive_task,
            args=(10000000,)
        )
        processes.append(process)
        process.start()
    
    # Wait for all processes
    for process in processes:
        process.join()
    
    end = time.time()
    print(f"\nTotal time: {end - start:.2f} seconds")
    print(f"Available CPUs: {multiprocessing.cpu_count()}")

### 7. ProcessPoolExecutor
Managing a pool of processes.

In [None]:
from concurrent.futures import ProcessPoolExecutor
import time

def compute_factorial(n):
    """Compute factorial"""
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result

if __name__ == '__main__':
    numbers = [5000, 6000, 7000, 8000]
    
    print("Using ProcessPoolExecutor:")
    start = time.time()
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(compute_factorial, numbers))
    
    end = time.time()
    
    for num, result in zip(numbers, results):
        print(f"Factorial({num}) has {len(str(result))} digits")
    
    print(f"\nTotal time: {end - start:.2f} seconds")

### 8. Comparing Threading vs Multiprocessing
When to use which approach.

In [None]:
import time
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# I/O-bound task (good for threading)
def io_bound_task(n):
    time.sleep(0.1)  # Simulate I/O wait
    return n * 2

# CPU-bound task (good for multiprocessing)
def cpu_bound_task(n):
    return sum(i * i for i in range(n))

numbers = [1000000] * 8

# Threading for I/O-bound
print("I/O-bound with Threading:")
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(io_bound_task, range(8)))
thread_time = time.time() - start
print(f"Time: {thread_time:.2f} seconds\n")

# Multiprocessing for CPU-bound
if __name__ == '__main__':
    print("CPU-bound with Multiprocessing:")
    start = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(cpu_bound_task, numbers))
    process_time = time.time() - start
    print(f"Time: {process_time:.2f} seconds")
    
    print("\nGuidelines:")
    print("- Use Threading for I/O-bound tasks (network, file operations)")
    print("- Use Multiprocessing for CPU-bound tasks (calculations, data processing)")
    print("- Threading: Limited by GIL, shares memory")
    print("- Multiprocessing: True parallelism, separate memory")

### 9. Queue for Thread Communication
Safely passing data between threads.

In [None]:
import threading
import queue
import time
import random

def producer(q, items):
    """Produce items and add to queue"""
    for item in items:
        time.sleep(random.uniform(0.1, 0.3))
        q.put(item)
        print(f"Produced: {item}")
    q.put(None)  # Signal completion

def consumer(q, name):
    """Consume items from queue"""
    while True:
        item = q.get()
        if item is None:
            q.put(None)  # Pass signal to other consumers
            break
        time.sleep(random.uniform(0.1, 0.2))
        print(f"{name} consumed: {item}")
        q.task_done()

# Create queue
work_queue = queue.Queue()

# Create and start threads
items_to_produce = ['Item1', 'Item2', 'Item3', 'Item4', 'Item5']

producer_thread = threading.Thread(
    target=producer,
    args=(work_queue, items_to_produce)
)

consumer_threads = [
    threading.Thread(target=consumer, args=(work_queue, f"Consumer-{i}"))
    for i in range(2)
]

producer_thread.start()
for thread in consumer_threads:
    thread.start()

# Wait for completion
producer_thread.join()
for thread in consumer_threads:
    thread.join()

print("\nAll work completed!")

### 10. Multiprocessing with Shared Memory
Sharing data between processes.

In [None]:
import multiprocessing
import time

def increment_value(shared_value, lock, iterations):
    """Increment shared value safely"""
    for _ in range(iterations):
        with lock:
            shared_value.value += 1

if __name__ == '__main__':
    # Shared value and lock
    shared_value = multiprocessing.Value('i', 0)  # 'i' for integer
    lock = multiprocessing.Lock()
    
    # Create processes
    processes = []
    for _ in range(4):
        process = multiprocessing.Process(
            target=increment_value,
            args=(shared_value, lock, 1000)
        )
        processes.append(process)
        process.start()
    
    # Wait for all processes
    for process in processes:
        process.join()
    
    print(f"Final value: {shared_value.value}")
    print(f"Expected: {4 * 1000}")

### 11. Async IO Basics
Introduction to asynchronous programming.

In [None]:
import asyncio
import time

async def async_task(name, duration):
    """Asynchronous task"""
    print(f"Task {name} starting")
    await asyncio.sleep(duration)  # Non-blocking sleep
    print(f"Task {name} completed")
    return f"Result from {name}"

async def main():
    """Main async function"""
    print("Starting async tasks...")
    start = time.time()
    
    # Run tasks concurrently
    results = await asyncio.gather(
        async_task("A", 2),
        async_task("B", 2),
        async_task("C", 2)
    )
    
    end = time.time()
    print(f"\nResults: {results}")
    print(f"Total time: {end - start:.2f} seconds")

# Run async code
await main()

### 12. Practical Example: Parallel Web Scraping

In [None]:
from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
    """Simulate fetching a URL"""
    print(f"Fetching {url}...")
    time.sleep(1)  # Simulate network delay
    return f"Content from {url}"

urls = [
    "http://example.com/page1",
    "http://example.com/page2",
    "http://example.com/page3",
    "http://example.com/page4",
    "http://example.com/page5"
]

print("Sequential fetching:")
start = time.time()
for url in urls:
    fetch_url(url)
sequential_time = time.time() - start
print(f"Sequential time: {sequential_time:.2f} seconds\n")

print("Parallel fetching:")
start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))
parallel_time = time.time() - start
print(f"\nParallel time: {parallel_time:.2f} seconds")
print(f"Speedup: {sequential_time / parallel_time:.2f}x")

### 13. Thread-Safe Data Structures

In [None]:
import threading
import queue
import time

# queue.Queue is thread-safe
safe_queue = queue.Queue()

def add_to_queue(items):
    for item in items:
        safe_queue.put(item)
        print(f"Added: {item}")
        time.sleep(0.1)

def remove_from_queue(count):
    for _ in range(count):
        try:
            item = safe_queue.get(timeout=2)
            print(f"Removed: {item}")
            time.sleep(0.15)
        except queue.Empty:
            print("Queue is empty")
            break

# Start threads
adder = threading.Thread(target=add_to_queue, args=([1, 2, 3, 4, 5],))
remover = threading.Thread(target=remove_from_queue, args=(5,))

adder.start()
remover.start()

adder.join()
remover.join()

print("\nOperations completed safely!")