# Section 4 — Advanced Functions & Control
Focused examples for functional tools, decorators, context managers, error handling, logging, testing, and concurrency.

## 1) Lambda, `map`, `filter`, `reduce`
Anonymous functions and functional helpers.

In [None]:

from functools import reduce

nums = [1, 2, 3, 4, 5]

# lambda
double = lambda x: x * 2
print("Lambda double(3):", double(3))

# map: apply function to each element
doubled = list(map(lambda x: x * 2, nums))
print("map doubled:", doubled)

# filter: keep elements that match condition
evens = list(filter(lambda x: x % 2 == 0, nums))
print("filter evens:", evens)

# reduce: accumulate to a single value
product = reduce(lambda a, b: a * b, nums, 1)
print("reduce product:", product)


## 2) Decorators
Wrap or enhance functions without modifying their source.

In [None]:

import time
from functools import wraps

def timing(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.time()
        result = func(*args, **kwargs)
        dt = (time.time() - t0) * 1000
        print(f"{func.__name__} took {dt:.2f} ms")
        return result
    return wrapper

@timing
def slow_add(a, b):
    time.sleep(0.1)
    return a + b

print("slow_add:", slow_add(3, 4))
print("function name preserved:", slow_add.__name__)


## 3) Context Managers (`with`)
Guarantee setup/teardown via `__enter__`/`__exit__`, or use `contextlib`.

In [None]:

# Built-in example: file handling
with open("cm_example.txt", "w") as f:
    f.write("Hello context manager!\n")

# Custom context manager
class managed_resource:
    def __enter__(self):
        print("Acquiring resource")
        return {"status": "ok"}
    def __exit__(self, exc_type, exc, tb):
        print("Releasing resource")
        # Swallow exceptions? return True to suppress, False to propagate
        return False

with managed_resource() as res:
    print("Inside with:", res)


### `contextlib` helpers

In [None]:

from contextlib import contextmanager

@contextmanager
def timer():
    import time
    t0 = time.time()
    try:
        yield
    finally:
        print(f"Elapsed: {(time.time() - t0)*1000:.2f} ms")

with timer():
    sum(range(1_000_000))


## 4) Exceptions & Error Handling
Use `try/except/else/finally`, raise custom exceptions.

In [None]:

class TooSmallError(Exception):
    """Raised when a value is too small."""

def reciprocal(x):
    if x == 0:
        raise ZeroDivisionError("x cannot be zero")
    return 1/x

try:
    print("Reciprocal(2):", reciprocal(2))
    print("Reciprocal(0):", reciprocal(0))
except ZeroDivisionError as e:
    print("Caught ZeroDivisionError:", e)
else:
    print("No exceptions raised.")
finally:
    print("finally: always runs")

# Raising custom exception
def check_min(value, min_value=10):
    if value < min_value:
        raise TooSmallError(f"{value} < {min_value}")
    return value

try:
    check_min(5)
except TooSmallError as e:
    print("Custom exception:", e)


## 5) Logging
Structured logs for debugging and observability.

In [None]:

import logging

# Basic configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger("demo")

logger.debug("Debug message (may not show at INFO level)")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
logger.critical("Critical message")

# Logging within try/except
try:
    1/0
except Exception as e:
    logger.exception("Something went wrong")


## 6) Testing & Debugging
Minimal examples with `unittest` and `assert`.

In [None]:

# Simple asserts
def add(a, b): 
    return a + b

assert add(2, 3) == 5
print("Assert passed.")

# Minimal unittest (can run via `python -m unittest` in a script)
import unittest

class TestMath(unittest.TestCase):
    def test_add(self):
        self.assertEqual(add(2, 2), 4)

suite = unittest.TestLoader().loadTestsFromTestCase(TestMath)
unittest.TextTestRunner(verbosity=2).run(suite);


## 7) Concurrency: `threading`, `multiprocessing`, `asyncio`
Parallelism and async IO basics.

### 7.1 `threading` (I/O-bound tasks)

In [None]:

import threading, time

def io_task(name, delay=0.2):
    time.sleep(delay)
    print(f"[thread] {name} done")

threads = [threading.Thread(target=io_task, args=(f"t{i}", 0.2)) for i in range(3)]
[t.start() for t in threads]
[t.join() for t in threads]
print("All threads finished")


### 7.2 `multiprocessing` (CPU-bound tasks)

In [None]:

from multiprocessing import Pool, cpu_count

def square(n): 
    return n*n

with Pool(processes=min(4, cpu_count())) as pool:
    res = pool.map(square, range(10))
print("Squares via multiprocessing:", res)


### 7.3 `asyncio` (async IO)

In [None]:

import asyncio
import random

async def fetch(name):
    await asyncio.sleep(random.uniform(0.1, 0.3))
    print(f"[async] {name} fetched")

async def main():
    tasks = [fetch(f"task-{i}") for i in range(3)]
    await asyncio.gather(*tasks)

asyncio.run(main())
