# Python Advanced Topics
## Generators, Iterators, Async/Await, and More

---

## Table of Contents
1. [Generators and Iterators](#generators)
2. [Advanced Comprehensions](#comprehensions)
3. [Exception Handling](#exceptions)
4. [File I/O](#file-io)
5. [Regular Expressions](#regex)
6. [Threading and Multiprocessing](#threading)
7. [Async/Await](#async)
8. [Type Hints](#type-hints)
9. [Testing](#testing)
10. [Practice Problems](#practice)

---
# 1. GENERATORS AND ITERATORS

## 1.1 Understanding Iterators

In [None]:
# An iterator is an object with __iter__ and __next__ methods

class CountUp:
    """Iterator that counts from start to end."""
    
    def __init__(self, start, end):
        self.current = start
        self.end = end
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current > self.end:
            raise StopIteration
        value = self.current
        self.current += 1
        return value

# Use the iterator
counter = CountUp(1, 5)
print("Using for loop:")
for num in counter:
    print(num, end=' ')
print()

# Manual iteration
counter = CountUp(1, 3)
print("\nManual iteration:")
print(next(counter))
print(next(counter))
print(next(counter))

## 1.2 Generators - The Easy Way

In [None]:
# Generator function - uses yield instead of return
def count_up(start, end):
    """Generator that counts from start to end."""
    current = start
    while current <= end:
        yield current  # Pause and return value
        current += 1

# Use generator
print("Generator output:")
for num in count_up(1, 5):
    print(num, end=' ')
print()

# Generator is lazy - creates values on demand
gen = count_up(1, 3)
print(f"\nGenerator object: {gen}")
print(f"Type: {type(gen)}")

# Convert to list
numbers = list(count_up(1, 5))
print(f"As list: {numbers}")

In [None]:
# Why generators? Memory efficiency!
import sys

# List - stores all values in memory
list_comp = [x**2 for x in range(1000)]
print(f"List size: {sys.getsizeof(list_comp)} bytes")

# Generator - creates values on demand
gen_exp = (x**2 for x in range(1000))
print(f"Generator size: {sys.getsizeof(gen_exp)} bytes")

# Both produce same results
print(f"\nSum from list: {sum([x**2 for x in range(1000)])}")
print(f"Sum from generator: {sum(x**2 for x in range(1000))}")

## 1.3 Practical Generator Examples

In [None]:
# Fibonacci generator - infinite sequence
def fibonacci():
    """Generate Fibonacci numbers infinitely."""
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

# Get first 10 Fibonacci numbers
fib = fibonacci()
fib_numbers = [next(fib) for _ in range(10)]
print(f"First 10 Fibonacci: {fib_numbers}")

# Using itertools.islice
from itertools import islice
fib_slice = list(islice(fibonacci(), 15))
print(f"First 15 Fibonacci: {fib_slice}")

In [None]:
# File reader generator - memory efficient
def read_large_file(file_path):
    """Read file line by line without loading entire file."""
    with open(file_path, 'r') as f:
        for line in f:
            yield line.strip()

# Pipeline of generators
def numbers():
    """Generate numbers."""
    for i in range(10):
        yield i

def squared(nums):
    """Square each number."""
    for num in nums:
        yield num ** 2

def filtered(nums):
    """Filter numbers > 20."""
    for num in nums:
        if num > 20:
            yield num

# Chain generators
pipeline = filtered(squared(numbers()))
result = list(pipeline)
print(f"Pipeline result: {result}")

## 1.4 Generator send() and throw()

In [None]:
# send() - send values into generator
def accumulator():
    """Accumulate values sent to generator."""
    total = 0
    while True:
        value = yield total
        if value is not None:
            total += value

acc = accumulator()
print(f"Initial: {next(acc)}")
print(f"Send 10: {acc.send(10)}")
print(f"Send 5: {acc.send(5)}")
print(f"Send 3: {acc.send(3)}")

---
# 2. ADVANCED COMPREHENSIONS

## 2.1 Nested Comprehensions

In [None]:
# Flatten 2D list
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
flat = [num for row in matrix for num in row]
print(f"Flattened: {flat}")

# Create 2D matrix
matrix = [[i*3 + j for j in range(3)] for i in range(3)]
print(f"\nMatrix:")
for row in matrix:
    print(row)

# Transpose matrix
transposed = [[row[i] for row in matrix] for i in range(3)]
print(f"\nTransposed:")
for row in transposed:
    print(row)

In [None]:
# Dictionary comprehension with conditions
words = ['apple', 'banana', 'cherry', 'date', 'elderberry']
word_lengths = {word: len(word) for word in words if len(word) > 4}
print(f"Word lengths (>4 chars): {word_lengths}")

# Swap keys and values
original = {'a': 1, 'b': 2, 'c': 3}
swapped = {v: k for k, v in original.items()}
print(f"\nSwapped: {swapped}")

# Set comprehension
sentence = "the quick brown fox jumps over the lazy dog"
unique_lengths = {len(word) for word in sentence.split()}
print(f"\nUnique word lengths: {unique_lengths}")

## 2.2 Walrus Operator (Python 3.8+)

In [None]:
# := assigns and returns value

# Without walrus operator
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
filtered = []
for n in numbers:
    square = n ** 2
    if square > 25:
        filtered.append(square)
print(f"Without walrus: {filtered}")

# With walrus operator
filtered = [square for n in numbers if (square := n ** 2) > 25]
print(f"With walrus: {filtered}")

# In while loop
import random
while (num := random.randint(1, 10)) != 5:
    print(f"Got {num}, not 5")
print(f"Finally got {num}!")

---
# 3. EXCEPTION HANDLING

## 3.1 Complete Exception Handling

In [None]:
def divide(a, b):
    """Demonstrate complete exception handling."""
    try:
        result = a / b
    except ZeroDivisionError:
        print("Error: Division by zero")
        return None
    except TypeError as e:
        print(f"Error: Invalid types - {e}")
        return None
    else:
        # Runs if no exception
        print(f"Success: {a} / {b} = {result}")
        return result
    finally:
        # Always runs
        print("Division operation completed\n")

# Test cases
divide(10, 2)
divide(10, 0)
divide("10", 2)

## 3.2 Custom Exceptions

In [None]:
class ValidationError(Exception):
    """Custom exception for validation errors."""
    
    def __init__(self, message, field=None):
        super().__init__(message)
        self.field = field
        self.message = message
    
    def __str__(self):
        if self.field:
            return f"{self.field}: {self.message}"
        return self.message

class AgeError(ValidationError):
    """Exception for age validation."""
    pass

def validate_age(age):
    """Validate age is within valid range."""
    if not isinstance(age, int):
        raise ValidationError("Age must be an integer", field="age")
    if age < 0:
        raise AgeError("Age cannot be negative", field="age")
    if age > 150:
        raise AgeError("Age cannot exceed 150", field="age")
    return True

# Test custom exceptions
test_ages = [25, -5, "thirty", 200]
for age in test_ages:
    try:
        validate_age(age)
        print(f"Age {age} is valid")
    except AgeError as e:
        print(f"AgeError: {e}")
    except ValidationError as e:
        print(f"ValidationError: {e}")

## 3.3 Exception Chaining

In [None]:
def fetch_data(url):
    """Simulate fetching data."""
    raise ConnectionError("Failed to connect")

def process_data():
    """Process data from API."""
    try:
        data = fetch_data("http://api.example.com")
    except ConnectionError as e:
        # Chain exceptions with 'from'
        raise RuntimeError("Data processing failed") from e

try:
    process_data()
except RuntimeError as e:
    print(f"Error: {e}")
    print(f"Caused by: {e.__cause__}")

---
# 4. FILE I/O

## 4.1 Reading and Writing Files

In [None]:
# Writing to file
with open('example.txt', 'w') as f:
    f.write("Line 1\n")
    f.write("Line 2\n")
    f.writelines(["Line 3\n", "Line 4\n"])

# Reading entire file
with open('example.txt', 'r') as f:
    content = f.read()
    print("Full content:")
    print(content)

# Reading lines
with open('example.txt', 'r') as f:
    lines = f.readlines()
    print(f"Lines: {lines}")

# Reading line by line (memory efficient)
print("\nLine by line:")
with open('example.txt', 'r') as f:
    for i, line in enumerate(f, 1):
        print(f"{i}: {line.strip()}")

In [None]:
# Append mode
with open('example.txt', 'a') as f:
    f.write("Line 5\n")

# Read and write mode
with open('example.txt', 'r+') as f:
    content = f.read()
    f.write("Line 6\n")

# Binary mode
with open('binary.bin', 'wb') as f:
    f.write(b'\x00\x01\x02\x03')

with open('binary.bin', 'rb') as f:
    data = f.read()
    print(f"Binary data: {data}")

# Clean up
import os
os.remove('example.txt')
os.remove('binary.bin')

## 4.2 Working with JSON

In [None]:
import json

# Python to JSON
data = {
    "name": "Alice",
    "age": 30,
    "cities": ["NYC", "LA", "Chicago"],
    "active": True,
    "balance": None
}

# Convert to JSON string
json_string = json.dumps(data, indent=2)
print("JSON string:")
print(json_string)

# Parse JSON string
parsed = json.loads(json_string)
print(f"\nParsed: {parsed}")
print(f"Name: {parsed['name']}")

# Write to file
with open('data.json', 'w') as f:
    json.dump(data, f, indent=2)

# Read from file
with open('data.json', 'r') as f:
    loaded = json.load(f)
    print(f"\nLoaded from file: {loaded}")

os.remove('data.json')

## 4.3 Working with CSV

In [None]:
import csv

# Write CSV
data = [
    ['Name', 'Age', 'City'],
    ['Alice', 30, 'NYC'],
    ['Bob', 25, 'LA'],
    ['Charlie', 35, 'Chicago']
]

with open('data.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(data)

# Read CSV
print("CSV content:")
with open('data.csv', 'r') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

# Read as dictionary
print("\nAs dictionaries:")
with open('data.csv', 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(dict(row))

os.remove('data.csv')

---
# 5. REGULAR EXPRESSIONS

## 5.1 Basic Patterns

In [None]:
import re

text = "Contact us at support@example.com or sales@company.org"

# Find all emails
pattern = r'[\w.-]+@[\w.-]+\.[a-z]+'
emails = re.findall(pattern, text)
print(f"Emails found: {emails}")

# Search for pattern
match = re.search(r'support@\w+\.\w+', text)
if match:
    print(f"Found: {match.group()}")
    print(f"Position: {match.start()}-{match.end()}")

# Match at beginning
if re.match(r'Contact', text):
    print("Starts with 'Contact'")

In [None]:
# Common patterns
patterns = {
    'phone': r'\d{3}-\d{3}-\d{4}',
    'email': r'[\w.-]+@[\w.-]+\.[a-z]+',
    'url': r'https?://[\w.-]+\.[a-z]+/?\S*',
    'date': r'\d{4}-\d{2}-\d{2}',
    'ip': r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
}

test_text = """
Call us at 555-123-4567
Email: info@example.com
Website: https://www.example.com/page
Date: 2024-01-15
Server: 192.168.1.1
"""

for name, pattern in patterns.items():
    matches = re.findall(pattern, test_text)
    print(f"{name}: {matches}")

## 5.2 Groups and Substitution

In [None]:
# Named groups
pattern = r'(?P<area>\d{3})-(?P<exchange>\d{3})-(?P<number>\d{4})'
phone = "555-123-4567"

match = re.search(pattern, phone)
if match:
    print(f"Area code: {match.group('area')}")
    print(f"Exchange: {match.group('exchange')}")
    print(f"Number: {match.group('number')}")
    print(f"Groups dict: {match.groupdict()}")

# Substitution
text = "The price is $10 and $20"
new_text = re.sub(r'\$\d+', '$XXX', text)
print(f"\nOriginal: {text}")
print(f"Substituted: {new_text}")

# Substitution with function
def double_price(match):
    price = int(match.group()[1:])  # Remove $
    return f'${price * 2}'

doubled = re.sub(r'\$\d+', double_price, text)
print(f"Doubled: {doubled}")

---
# 6. THREADING AND MULTIPROCESSING

## 6.1 Threading Basics

In [None]:
import threading
import time

def worker(name, delay):
    """Worker function that simulates work."""
    print(f"{name} starting")
    time.sleep(delay)
    print(f"{name} finished")

# Create threads
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"Thread-{i}", 1))
    threads.append(t)

# Start threads
start = time.time()
for t in threads:
    t.start()

# Wait for completion
for t in threads:
    t.join()

print(f"\nTotal time: {time.time() - start:.2f}s")

In [None]:
# Thread with return value using Queue
from queue import Queue

def calculate(num, result_queue):
    """Calculate square and put in queue."""
    result = num ** 2
    result_queue.put((num, result))

# Create queue and threads
result_queue = Queue()
threads = []

for i in range(5):
    t = threading.Thread(target=calculate, args=(i, result_queue))
    threads.append(t)
    t.start()

# Wait and collect results
for t in threads:
    t.join()

# Get results
results = []
while not result_queue.empty():
    results.append(result_queue.get())

print(f"Results: {sorted(results)}")

## 6.2 Thread Synchronization

In [None]:
# Lock for thread safety
counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # Acquire lock
            counter += 1

# Create threads
threads = [threading.Thread(target=increment) for _ in range(2)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Final counter: {counter}")
print(f"Expected: 200000")

## 6.3 ThreadPoolExecutor

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def download_page(url):
    """Simulate downloading a page."""
    time.sleep(0.5)  # Simulate network delay
    return f"Downloaded {url}"

urls = [f"http://example.com/page{i}" for i in range(5)]

# Using ThreadPoolExecutor
start = time.time()

with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit all tasks
    futures = {executor.submit(download_page, url): url for url in urls}
    
    # Process as completed
    for future in as_completed(futures):
        url = futures[future]
        result = future.result()
        print(result)

print(f"\nTotal time: {time.time() - start:.2f}s")

---
# 7. ASYNC/AWAIT

## 7.1 Async Basics

In [None]:
import asyncio

async def say_hello(name, delay):
    """Async function that greets after delay."""
    await asyncio.sleep(delay)
    print(f"Hello, {name}!")
    return f"Greeted {name}"

# Run single coroutine
async def main():
    result = await say_hello("Alice", 1)
    print(f"Result: {result}")

# In Jupyter, we can use await directly
# await main()

# Or use asyncio.run() in scripts
# asyncio.run(main())

print("Async example (run in script with asyncio.run())")

In [None]:
# Running multiple tasks concurrently
async def fetch_data(url, delay):
    """Simulate fetching data."""
    print(f"Fetching {url}")
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    # Create tasks
    tasks = [
        fetch_data("url1", 2),
        fetch_data("url2", 1),
        fetch_data("url3", 3)
    ]
    
    # Run concurrently
    results = await asyncio.gather(*tasks)
    print(f"\nResults: {results}")

# To run: asyncio.run(main())
print("Concurrent fetch example")

---
# 8. TYPE HINTS

## 8.1 Basic Type Hints

In [None]:
from typing import List, Dict, Tuple, Optional, Union, Callable

# Basic types
def greet(name: str) -> str:
    return f"Hello, {name}!"

def add(a: int, b: int) -> int:
    return a + b

# Complex types
def process_items(items: List[str]) -> Dict[str, int]:
    """Count items."""
    return {item: len(item) for item in items}

# Optional (can be None)
def find_item(items: List[str], target: str) -> Optional[int]:
    """Find index of target, or None."""
    try:
        return items.index(target)
    except ValueError:
        return None

# Union (multiple types)
def format_value(value: Union[int, float, str]) -> str:
    return str(value)

# Test
print(greet("Alice"))
print(process_items(["apple", "banana", "cherry"]))
print(find_item(["a", "b", "c"], "b"))
print(find_item(["a", "b", "c"], "z"))

In [None]:
# Callable type
def apply_func(func: Callable[[int, int], int], a: int, b: int) -> int:
    return func(a, b)

result = apply_func(lambda x, y: x + y, 3, 4)
print(f"Apply func result: {result}")

# Type aliases
Vector = List[float]
Matrix = List[Vector]

def dot_product(v1: Vector, v2: Vector) -> float:
    return sum(x * y for x, y in zip(v1, v2))

print(f"Dot product: {dot_product([1, 2, 3], [4, 5, 6])}")

# Generic types (Python 3.9+)
# def process(items: list[str]) -> dict[str, int]:
#     pass

---
# 9. TESTING

## 9.1 Unit Testing with unittest

In [None]:
import unittest

# Function to test
def factorial(n):
    if n < 0:
        raise ValueError("Factorial not defined for negative numbers")
    if n == 0:
        return 1
    return n * factorial(n - 1)

# Test class
class TestFactorial(unittest.TestCase):
    
    def test_zero(self):
        self.assertEqual(factorial(0), 1)
    
    def test_positive(self):
        self.assertEqual(factorial(5), 120)
        self.assertEqual(factorial(3), 6)
    
    def test_negative(self):
        with self.assertRaises(ValueError):
            factorial(-1)
    
    def test_large(self):
        self.assertEqual(factorial(10), 3628800)

# Run tests (in Jupyter)
if __name__ == '__main__':
    # Create test suite
    suite = unittest.TestLoader().loadTestsFromTestCase(TestFactorial)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

## 9.2 Testing with pytest (concept)

In [None]:
# pytest style (would run with: pytest test_file.py)

def add(a, b):
    return a + b

# Simple test functions (no class needed)
def test_add_positive():
    assert add(2, 3) == 5

def test_add_negative():
    assert add(-1, -1) == -2

def test_add_zero():
    assert add(0, 0) == 0

# Parametrized tests (concept)
# @pytest.mark.parametrize("a,b,expected", [
#     (2, 3, 5),
#     (-1, 1, 0),
#     (0, 0, 0),
# ])
# def test_add(a, b, expected):
#     assert add(a, b) == expected

# Run simple tests
test_add_positive()
test_add_negative()
test_add_zero()
print("All tests passed!")

---
# 10. PRACTICE PROBLEMS

## Problem 1: Implement a Generator for Prime Numbers

In [None]:
def is_prime(n):
    """Check if n is prime."""
    if n < 2:
        return False
    for i in range(2, int(n**0.5) + 1):
        if n % i == 0:
            return False
    return True

def prime_generator():
    """Generate prime numbers infinitely."""
    n = 2
    while True:
        if is_prime(n):
            yield n
        n += 1

# Get first 20 primes
from itertools import islice
primes = list(islice(prime_generator(), 20))
print(f"First 20 primes: {primes}")

## Problem 2: Thread-safe Counter

In [None]:
import threading

class ThreadSafeCounter:
    """Thread-safe counter implementation."""
    
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def decrement(self):
        with self._lock:
            self._value -= 1
    
    @property
    def value(self):
        with self._lock:
            return self._value

# Test
counter = ThreadSafeCounter()

def increment_many():
    for _ in range(10000):
        counter.increment()

threads = [threading.Thread(target=increment_many) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter value: {counter.value}")
print(f"Expected: 50000")

## Problem 3: Context Manager for Timing

In [None]:
import time
from contextlib import contextmanager

@contextmanager
def timer(description="Operation"):
    """Context manager that times a code block."""
    start = time.time()
    try:
        yield
    finally:
        elapsed = time.time() - start
        print(f"{description} took {elapsed:.4f} seconds")

# Test
with timer("Sum calculation"):
    total = sum(range(1000000))
    print(f"Sum: {total}")

with timer("List comprehension"):
    squares = [x**2 for x in range(100000)]

## Problem 4: Regex Parser for Log Files

In [None]:
import re
from collections import Counter

def parse_log(log_text):
    """Parse log entries and return statistics."""
    # Pattern for log entry: [LEVEL] timestamp: message
    pattern = r'\[(?P<level>\w+)\] (?P<timestamp>[\d-]+ [\d:]+): (?P<message>.+)'
    
    entries = []
    for match in re.finditer(pattern, log_text):
        entries.append(match.groupdict())
    
    # Count by level
    levels = Counter(entry['level'] for entry in entries)
    
    return entries, levels

# Test
log = """
[INFO] 2024-01-15 10:30:00: Server started
[ERROR] 2024-01-15 10:31:00: Connection failed
[WARNING] 2024-01-15 10:32:00: High memory usage
[INFO] 2024-01-15 10:33:00: Request processed
[ERROR] 2024-01-15 10:34:00: Database timeout
"""

entries, levels = parse_log(log)
print("Log entries:")
for entry in entries:
    print(f"  {entry}")
print(f"\nLevel counts: {dict(levels)}")

---
## Summary

### Key Concepts:

1. **Generators**: Memory-efficient iterators using `yield`
2. **Comprehensions**: List, dict, set, and generator expressions
3. **Exception Handling**: try/except/else/finally, custom exceptions
4. **File I/O**: Text, binary, JSON, CSV
5. **Regular Expressions**: Pattern matching and substitution
6. **Threading**: Concurrent execution, locks, ThreadPoolExecutor
7. **Async/Await**: Asynchronous programming with asyncio
8. **Type Hints**: Static type checking
9. **Testing**: unittest and pytest

### Best Practices:

- Use generators for large data processing
- Always use context managers for file operations
- Compile regex patterns for repeated use
- Use ThreadPoolExecutor over manual threading
- Add type hints for better code documentation
- Write tests for critical functions

---

**Next Steps:**
1. Practice generator pipelines
2. Build async web scrapers
3. Learn pytest fixtures and mocking
4. Move on to Data Science with NumPy/Pandas