# Lab 06: Out-of-Core, Streaming & Parallel Processing

**Course:** Big Data

---

## Student Information

**Name:** `Your Name Here`

**Date:** `DD/MM/YYYY`

---

**Goal:** Process datasets larger than RAM using chunking, implement streaming statistics, and leverage parallelization (threading/multiprocessing) for performance.

## Learning Objectives

By the end of this lab, you will be able to:

1. **Use PyArrow Directly**: Understand when to use PyArrow vs Pandas for I/O
2. **Apply Projection Pushdown**: Read only the columns you need
3. **Process Out-of-Core**: Handle datasets larger than RAM with chunking
4. **Implement Online Statistics**: Compute mean/std in a single pass (Welford's algorithm)
5. **Parallelize Work**: Use threading for I/O and multiprocessing for CPU-bound tasks
6. **Build Complete Pipelines**: Combine chunking + parallelization

## Instructions

1. **Fill in your information above** before starting the lab
2. Read each cell carefully before running it
3. Implement the **TODO functions** when you see them
4. Run cells **from top to bottom** (Shift+Enter)
5. Check that output makes sense after each cell

---

## Libraries Used in This Lab

### Core Libraries

- **`pyarrow`** — Direct Parquet reading and Arrow Table operations
- **`pandas`** — DataFrame operations and chunked CSV reading
- **`numpy`** — Numerical operations
- **`concurrent.futures`** — ThreadPoolExecutor and ProcessPoolExecutor
- **`psutil`** — Memory monitoring
- **`matplotlib`** — Plotting memory and speedup charts

### Why This Matters

Real-world datasets often don't fit in RAM. This lab teaches three strategies:

1. **Chunking**: Process data in pieces, one at a time
2. **Streaming statistics**: Compute results without storing all data
3. **Parallelization**: Use multiple cores to process faster

Combined, these allow processing datasets of any size.

---

## 1. Imports and Setup

In [None]:
import json
import time
import os
import glob
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import psutil
import matplotlib.pyplot as plt

print("Imports successful!")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"PyArrow version: {pa.__version__}")

## 2. Define Paths

In [None]:
# Base directories
DATA_RAW = Path("../data/raw")
DATA_PROCESSED = Path("../data/processed")
RESULTS_DIR = Path("../results")

# File paths for this lab
WARMUP_PARQUET = DATA_PROCESSED / "sales_warmup.parquet"
SALES_CSV = DATA_RAW / "sales_large.csv"
SALES_PARTITIONED = DATA_PROCESSED / "sales_partitioned"
PARTITIONS_DIR = DATA_PROCESSED / "partitions"
ELECTRONICS_PARQUET = DATA_PROCESSED / "electronics_only.parquet"
METRICS_PATH = RESULTS_DIR / "lab06_metrics.json"

# Ensure directories exist
DATA_RAW.mkdir(parents=True, exist_ok=True)
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
PARTITIONS_DIR.mkdir(parents=True, exist_ok=True)

print("Paths defined:")
print(f"  Warmup Parquet: {WARMUP_PARQUET}")
print(f"  Sales CSV: {SALES_CSV}")
print(f"  Partitions: {PARTITIONS_DIR}")
print(f"  Metrics: {METRICS_PATH}")

---

## Exercise 0: PyArrow en la Práctica — Benchmark y Warm-up (15 min)

**Objetivo**: Familiarizarse con PyArrow comparando su rendimiento frente a Pandas puro, y entender `iter_batches()` que usaremos en el resto del lab.

### TODO 1: `generate_warmup_data()`

Generate a warmup dataset (5M rows) and save as Parquet.

**Hints:**
- Use `np.random.seed(seed)` for reproducibility
- Create columns: `product_id`, `category`, `price`, `quantity`, `customer_id`
- Save with `df.to_parquet(WARMUP_PARQUET, index=False)`

In [None]:
def generate_warmup_data(n: int = 5_000_000, seed: int = 42) -> pd.DataFrame:
    """
    Generate a warmup dataset and save as Parquet.

    Args:
        n: Number of rows
        seed: Random seed

    Returns:
        DataFrame with columns: product_id, category, price, quantity, customer_id
    """
    # TODO: Implement this function
    # Step 1: Set random seed with np.random.seed(seed)
    # Step 2: Create DataFrame with columns:
    #   - 'product_id': np.random.randint(1, 10000, n)
    #   - 'category': np.random.choice(['Electronics','Clothing','Home','Sports','Food'], n)
    #   - 'price': np.random.uniform(1, 1000, n).round(2)
    #   - 'quantity': np.random.randint(1, 50, n)
    #   - 'customer_id': np.random.randint(1, 100000, n)
    # Step 3: Save to WARMUP_PARQUET with df.to_parquet(..., index=False)
    # Step 4: Print memory usage and return df
    pass

In [None]:
# Generate warmup data
print("Generating warmup dataset (5M rows)...")
df_warmup = generate_warmup_data()

if df_warmup is not None:
    print(f"Shape: {df_warmup.shape}")
    print(f"\nSample:")
    print(df_warmup.head())
else:
    print("TODO: Implement generate_warmup_data()")

### Tarea 1 — Benchmark: pd.read_parquet vs pyarrow directo

### TODO 2: `benchmark_read_methods()`

Compare three approaches to read the Parquet file:
- **Método A**: `pd.read_parquet()` (Pandas, with overhead)
- **Método B**: `pq.read_table()` (PyArrow direct, no conversion)
- **Método C**: Arrow Table → Pandas (measure conversion cost)

**Hints:**
- Use `time.perf_counter()` for precise timing
- Use `table.nbytes` for Arrow memory usage
- Use `df.memory_usage(deep=True).sum()` for Pandas memory

In [None]:
def benchmark_read_methods() -> dict:
    """
    Compare pd.read_parquet vs pq.read_table vs Arrow→Pandas conversion.

    Returns:
        Dictionary with timing results for each method.
    """
    # TODO: Implement this function
    # Método A: pd.read_parquet(WARMUP_PARQUET)
    #   - Measure time and RAM (df.memory_usage(deep=True).sum() / 1e6)
    #
    # Método B: pq.read_table(WARMUP_PARQUET)
    #   - Measure time and RAM (table.nbytes / 1e6)
    #
    # Método C: table.to_pandas()
    #   - Measure conversion time
    #
    # Print results and compute speedup
    pass

In [None]:
# Run benchmark
read_bench = benchmark_read_methods()

if read_bench is None:
    print("TODO: Implement benchmark_read_methods()")

**Pregunta**: ¿Por qué leer como Arrow Table es más rápido que leer directo a Pandas, si Pandas usa Arrow internamente?

*Tu respuesta aquí:*

---

### Tarea 2 — Projection pushdown: leer solo las columnas que necesitas

### TODO 3: `benchmark_projection_pushdown()`

Compare reading all columns vs only 2 columns (`price`, `quantity`).

**Hints:**
- Use `pq.read_table(path, columns=['price', 'quantity'])` for selective read
- Use `pyarrow.compute.multiply()` and `pc.sum()` for Arrow-native computation

In [None]:
def benchmark_projection_pushdown() -> dict:
    """
    Compare reading all columns vs only needed columns from Parquet.

    Returns:
        Dictionary with timing and size results.
    """
    # TODO: Implement this function
    # 1. Read ALL columns: pq.read_table(WARMUP_PARQUET)
    #    - Measure time and table.nbytes
    #
    # 2. Read ONLY 2 columns: pq.read_table(WARMUP_PARQUET, columns=['price', 'quantity'])
    #    - Measure time and table.nbytes
    #
    # 3. Calculate revenue with Arrow:
    #    revenue = pc.multiply(table.column('price'), table.column('quantity'))
    #    total = pc.sum(revenue).as_py()
    #
    # 4. Print speedup and data reduction
    pass

In [None]:
# Run projection pushdown benchmark
proj_bench = benchmark_projection_pushdown()

if proj_bench is None:
    print("TODO: Implement benchmark_projection_pushdown()")

**Pregunta**: ¿En qué casos reales aprovecharías este patrón en lugar de leer el DataFrame completo?

*Tu respuesta aquí:*

---

### Tarea 3 — `iter_batches()`: el puente entre Arrow y out-of-core

Esta función es la que usaremos en el resto del lab para chunking con Parquet.

### TODO 4: `process_with_iter_batches()`

**Hints:**
- Create a `ParquetFile` with `pq.ParquetFile(WARMUP_PARQUET)`
- Read metadata: `pf.metadata.num_row_groups`, `pf.metadata.num_rows`
- Iterate: `pf.iter_batches(batch_size=500_000, columns=['price', 'quantity'])`
- Each `batch` is a `RecordBatch`, NOT a DataFrame

In [None]:
def process_with_iter_batches(batch_size: int = 500_000) -> dict:
    """
    Use iter_batches() to process the Parquet file in streaming fashion.

    Args:
        batch_size: Number of rows per batch

    Returns:
        Dictionary with total revenue, total rows, and batch info.
    """
    # TODO: Implement this function
    # 1. Open ParquetFile: pf = pq.ParquetFile(WARMUP_PARQUET)
    # 2. Print metadata (row groups, total rows)
    # 3. Iterate: for batch in pf.iter_batches(batch_size=..., columns=[...]):
    #    - Calculate revenue per batch using pc.multiply and pc.sum
    #    - Accumulate total_revenue and total_rows
    # 4. Print results and return dict
    pass

In [None]:
# Process with iter_batches
batch_results = process_with_iter_batches()

if batch_results is None:
    print("TODO: Implement process_with_iter_batches()")

### Tarea 4 (opcional) — Schema inspection

### TODO 5: `inspect_parquet_schema()`

Inspect a Parquet file's metadata **without reading any data**.

**Hints:**
- `pf.schema_arrow` — Column types
- `pf.metadata.num_rows`, `num_row_groups`, `num_columns`
- `pf.metadata.row_group(0).column(0).statistics` — Min/max per column

In [None]:
def inspect_parquet_schema() -> dict:
    """
    Inspect Parquet file metadata without reading data.

    Returns:
        Dictionary with schema information.
    """
    # TODO: Implement this function
    # 1. Open ParquetFile: pf = pq.ParquetFile(WARMUP_PARQUET)
    # 2. Print schema: pf.schema_arrow
    # 3. Print metadata: num_rows, num_row_groups, num_columns
    # 4. Print statistics of first column in first row group
    pass

In [None]:
# Inspect schema
schema_info = inspect_parquet_schema()

if schema_info is None:
    print("TODO: Implement inspect_parquet_schema()")

### Exercise 0 — Summary Table

| Operación | Tiempo aprox. | RAM aprox. | ¿Cuándo usarla? |
|-----------|---------------|------------|------------------|
| `pd.read_parquet()` | ~1.0s | ~150 MB | Cuando necesitas DataFrame completo |
| `pq.read_table()` | ~0.5s | ~80 MB | Cuando operas en Arrow o conviertes después |
| `pq.read_table(columns=[...])` | ~0.2s | ~20 MB | Cuando solo necesitas pocas columnas |
| `iter_batches()` | misma, streaming | ~10 MB/batch | Cuando el archivo no cabe en RAM → Ejercicio 1 |

---

## Exercise 1: Out-of-Core Processing with Chunking (25 min)

**Objetivo**: Procesar dataset mayor que RAM usando chunking.

### TODO 6: `generate_large_dataset()`

Generate 20M rows and save as CSV + partitioned Parquet.

**Hints:**
- Columns: `date`, `product_id`, `category`, `price`, `quantity`, `customer_id`
- Use `df.to_csv(SALES_CSV, index=False)` for CSV
- Use `df.to_parquet(SALES_PARTITIONED, partition_cols=['category'], index=False)` for partitioned Parquet

In [None]:
def generate_large_dataset(n: int = 20_000_000, seed: int = 42) -> None:
    """
    Generate a large dataset and save as CSV and partitioned Parquet.

    Args:
        n: Number of rows
        seed: Random seed
    """
    # TODO: Implement this function
    # Step 1: Set random seed
    # Step 2: Create DataFrame with:
    #   - 'date': pd.date_range('2020-01-01', periods=n, freq='s')
    #   - 'product_id': np.random.randint(1, 10000, n)
    #   - 'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Food'], n)
    #   - 'price': np.random.uniform(1, 1000, n).round(2)
    #   - 'quantity': np.random.randint(1, 50, n)
    #   - 'customer_id': np.random.randint(1, 100000, n)
    # Step 3: Save as CSV to SALES_CSV
    # Step 4: Save as partitioned Parquet to SALES_PARTITIONED
    pass

In [None]:
# Generate large dataset
# WARNING: This may take several minutes and use significant disk space
print("Generating large dataset (20M rows)...")
print("(This may take 2-5 minutes)\n")

start = time.perf_counter()
generate_large_dataset()
elapsed = time.perf_counter() - start
print(f"\nGeneration completed in {elapsed:.1f} seconds")

### TODO 7: `chunked_statistics()`

Calculate average price using chunking — **without loading the full file in RAM**.

**Hints:**
- Use `pd.read_csv(SALES_CSV, chunksize=500_000)` to get an iterator
- Accumulate `total_sum` and `total_count` across chunks
- Average = `total_sum / total_count`

In [None]:
def chunked_statistics(chunksize: int = 500_000) -> dict:
    """
    Calculate statistics using chunking over CSV.

    Args:
        chunksize: Number of rows per chunk

    Returns:
        Dictionary with total_sum, total_count, and avg_price.
    """
    # TODO: Implement this function
    # 1. Initialize total_sum = 0, total_count = 0
    # 2. Loop: for chunk in pd.read_csv(SALES_CSV, chunksize=chunksize):
    #      total_sum += chunk['price'].sum()
    #      total_count += len(chunk)
    # 3. Calculate and return avg_price = total_sum / total_count
    pass

In [None]:
# Run chunked statistics
stats = chunked_statistics()

if stats is None:
    print("TODO: Implement chunked_statistics()")

### TODO 8: `chunked_filter_save()`

Filter only "Electronics" sales and save to Parquet — processing chunk by chunk.

**Hints:**
- Filter each chunk: `filtered = chunk[chunk['category'] == 'Electronics']`
- Collect filtered chunks in a list
- Concatenate and save: `pd.concat(results).to_parquet(...)`

In [None]:
def chunked_filter_save(chunksize: int = 500_000) -> int:
    """
    Filter and save only Electronics sales using chunking.

    Args:
        chunksize: Number of rows per chunk

    Returns:
        Number of Electronics rows saved.
    """
    # TODO: Implement this function
    # 1. Initialize results = []
    # 2. Loop: for chunk in pd.read_csv(SALES_CSV, chunksize=chunksize):
    #      filtered = chunk[chunk['category'] == 'Electronics']
    #      results.append(filtered)
    # 3. Concatenate: electronics = pd.concat(results)
    # 4. Save: electronics.to_parquet(ELECTRONICS_PARQUET, index=False)
    # 5. Return len(electronics)
    pass

In [None]:
# Filter and save Electronics
n_electronics = chunked_filter_save()

if n_electronics is None:
    print("TODO: Implement chunked_filter_save()")
else:
    print(f"\nSaved {n_electronics:,} Electronics rows")

### TODO 9: `monitor_memory_chunking()`

Monitor memory usage during chunked processing to prove it stays constant.

**Hints:**
- Use `psutil.Process(os.getpid()).memory_info().rss / 1024**2` for memory in MB
- Record memory after each chunk
- Plot with `plt.plot(mem_usage)`

In [None]:
def monitor_memory_chunking(chunksize: int = 500_000) -> list:
    """
    Monitor memory usage during chunked processing.

    Args:
        chunksize: Number of rows per chunk

    Returns:
        List of memory usage (MB) per chunk.
    """
    # TODO: Implement this function
    # 1. Get process: process = psutil.Process(os.getpid())
    # 2. Initialize mem_usage = []
    # 3. Loop: for chunk in pd.read_csv(SALES_CSV, chunksize=chunksize):
    #      mem_mb = process.memory_info().rss / 1024**2
    #      mem_usage.append(mem_mb)
    #      chunk['price'].sum()  # Process
    # 4. Plot and save to RESULTS_DIR / 'memoria_chunking.png'
    # 5. Return mem_usage
    pass

In [None]:
# Monitor memory during chunking
mem = monitor_memory_chunking()

if mem is None:
    print("TODO: Implement monitor_memory_chunking()")
else:
    # Show the plot inline
    plt.figure(figsize=(10, 5))
    plt.plot(mem, 'b-o', markersize=3)
    plt.xlabel('Chunk número')
    plt.ylabel('Memoria (MB)')
    plt.title('Memoria constante con chunking')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

---

## Exercise 2: Online Statistics — Welford's Algorithm (20 min)

**Objetivo**: Implementar algoritmos streaming para calcular estadísticos sin almacenar todos los datos.

### TODO 10: `OnlineStats` class

Implement Welford's algorithm for streaming mean, variance, and std.

**Welford's algorithm:**
```
For each new value x:
  count += 1
  delta = x - mean
  mean += delta / count
  delta2 = x - mean    (note: uses UPDATED mean)
  M2 += delta * delta2

variance = M2 / count
std = sqrt(variance)
```

In [None]:
class OnlineStats:
    """
    Online (streaming) statistics using Welford's algorithm.
    """

    def __init__(self):
        self.count = 0
        self.mean = 0.0
        self.M2 = 0.0
        self.min_val = float('inf')
        self.max_val = float('-inf')

    def update(self, x):
        """Implement Welford's algorithm."""
        # TODO: Implement this method
        # 1. self.count += 1
        # 2. delta = x - self.mean
        # 3. self.mean += delta / self.count
        # 4. delta2 = x - self.mean  (uses updated mean!)
        # 5. self.M2 += delta * delta2
        # 6. Update min_val and max_val
        pass

    def variance(self):
        """Return the population variance."""
        # TODO: return self.M2 / self.count (handle count < 2)
        pass

    def std(self):
        """Return the population standard deviation."""
        # TODO: return sqrt(variance)
        pass

### TODO 11: Validate OnlineStats against NumPy

In [None]:
def test_online_stats(n_rows: int = 100_000) -> dict:
    """
    Test OnlineStats against NumPy for validation.

    Args:
        n_rows: Number of rows to read for testing

    Returns:
        Dictionary with OnlineStats vs NumPy comparison.
    """
    # TODO: Implement this function
    # 1. Read first n_rows from CSV: pd.read_csv(SALES_CSV, nrows=n_rows)
    # 2. Create OnlineStats and update with each price value
    # 3. Compare with NumPy: chunk['price'].mean(), chunk['price'].std(ddof=0)
    # 4. Assert np.isclose() for both mean and std
    pass

In [None]:
# Validate OnlineStats
validation = test_online_stats()

if validation is None:
    print("TODO: Implement test_online_stats()")

### TODO 12: Streaming statistics over full dataset

In [None]:
def streaming_stats_full(chunksize: int = 500_000) -> dict:
    """
    Compute statistics over the full CSV using OnlineStats with chunking.

    Args:
        chunksize: Number of rows per chunk

    Returns:
        Dictionary with final statistics.
    """
    # TODO: Implement this function
    # 1. Create OnlineStats instance
    # 2. Loop: for chunk in pd.read_csv(SALES_CSV, chunksize=chunksize):
    #      for value in chunk['price'].values:
    #          stats.update(value)
    # 3. Print and return results
    pass

In [None]:
# Streaming stats on full dataset
print("Computing streaming statistics over full dataset...")
print("(This may take a few minutes)\n")

start = time.perf_counter()
full_stats = streaming_stats_full()
elapsed = time.perf_counter() - start

if full_stats is None:
    print("TODO: Implement streaming_stats_full()")
else:
    print(f"\nCompleted in {elapsed:.1f} seconds")

---

## Exercise 3: Practical Parallelization (25 min)

**Objetivo**: Comparar threading, multiprocessing y secuencial.

### TODO 13: `create_partitions()`

Split the dataset into 16 Parquet partition files.

**Hints:**
- Read the CSV, split into `n_partitions` pieces
- Save each piece as `partitions/part_000.parquet`, `part_001.parquet`, etc.

In [None]:
def create_partitions(n_partitions: int = 16) -> list:
    """
    Create partition files from the large dataset.

    Args:
        n_partitions: Number of partitions to create

    Returns:
        List of partition file paths.
    """
    # TODO: Implement this function
    # 1. Read CSV: df = pd.read_csv(SALES_CSV)
    # 2. For each partition:
    #      start_idx = i * len(df) // n_partitions
    #      end_idx = (i + 1) * len(df) // n_partitions
    #      chunk = df.iloc[start_idx:end_idx]
    #      chunk.to_parquet(PARTITIONS_DIR / f'part_{i:03d}.parquet', index=False)
    # 3. Return list of file paths
    pass

In [None]:
# Create partitions
print("Creating partitions...")
partition_files = create_partitions()

if partition_files is None:
    print("TODO: Implement create_partitions()")
else:
    print(f"Created {len(partition_files)} partition files")

### TODO 14: `benchmark_threading()`

Compare sequential vs threaded **reading** of partition files.

**Hints:**
- Sequential: `[pd.read_parquet(f) for f in files]`
- Threaded: `ThreadPoolExecutor(max_workers=8)` with `executor.map(pd.read_parquet, files)`
- Reading files is I/O-bound → threads help!

In [None]:
def benchmark_threading(n_workers: int = 8) -> dict:
    """
    Benchmark sequential vs threaded reading of partition files.

    Args:
        n_workers: Number of threads to use

    Returns:
        Dictionary with sequential/threaded times and speedup.
    """
    # TODO: Implement this function
    # 1. Get file list: files = sorted(glob.glob(str(PARTITIONS_DIR / '*.parquet')))
    # 2. Sequential: [pd.read_parquet(f) for f in files]
    # 3. Threaded: ThreadPoolExecutor(max_workers=n_workers)
    # 4. Print times and speedup
    pass

In [None]:
# Benchmark threading
thread_bench = benchmark_threading()

if thread_bench is None:
    print("TODO: Implement benchmark_threading()")

### TODO 15: `benchmark_multiprocessing()`

Compare sequential vs multiprocessing for **heavy computation** on each partition.

**Hints:**
- The `heavy_process()` function does computation (sqrt, log1p, groupby)
- Sequential: `[heavy_process(f) for f in files]`
- Parallel: `ProcessPoolExecutor(max_workers=4)`
- CPU-bound work → processes help!

In [None]:
def heavy_process(filepath):
    """Process a partition: read, transform, aggregate."""
    df = pd.read_parquet(filepath)
    df['score'] = np.sqrt(df['price']) * np.log1p(df['quantity'])
    return df.groupby('category')['score'].agg(['mean', 'sum', 'count'])

In [None]:
def benchmark_multiprocessing(n_workers: int = 4) -> dict:
    """
    Benchmark sequential vs multiprocessing for heavy computation.

    Args:
        n_workers: Number of processes to use

    Returns:
        Dictionary with sequential/parallel times and speedup.
    """
    # TODO: Implement this function
    # 1. Get file list
    # 2. Sequential: [heavy_process(f) for f in files]
    # 3. Parallel: ProcessPoolExecutor(max_workers=n_workers)
    # 4. Print times and speedup
    pass

In [None]:
# Benchmark multiprocessing
proc_bench = benchmark_multiprocessing()

if proc_bench is None:
    print("TODO: Implement benchmark_multiprocessing()")

### TODO 16: `benchmark_workers_scaling()`

Experiment: vary the number of workers (1, 2, 4, 8) and compare speedup vs linear ideal.

**Hints:**
- Run `heavy_process` sequentially first (baseline)
- Then run with 1, 2, 4, 8 workers using ProcessPoolExecutor
- Plot actual speedup vs ideal linear speedup

In [None]:
def benchmark_workers_scaling(max_workers: int = 8) -> dict:
    """
    Experiment: vary number of workers and measure speedup.

    Args:
        max_workers: Maximum number of workers to test

    Returns:
        Dictionary with worker count → speedup mapping.
    """
    # TODO: Implement this function
    # 1. Run sequential baseline
    # 2. For n_workers in [1, 2, 4, max_workers]:
    #      Run with ProcessPoolExecutor(max_workers=n_workers)
    #      Record speedup = seq_time / elapsed
    # 3. Plot: actual speedup vs ideal linear
    # 4. Save plot to RESULTS_DIR / 'speedup_workers.png'
    pass

In [None]:
# Benchmark worker scaling
print("Benchmarking worker scaling...")
print("(This may take a few minutes)\n")

scaling = benchmark_workers_scaling()

if scaling is None:
    print("TODO: Implement benchmark_workers_scaling()")
else:
    # Show the plot inline
    plt.figure(figsize=(8, 5))
    plt.plot(list(scaling.keys()), list(scaling.values()), 'bo-', linewidth=2, markersize=8)
    plt.plot([1, max(scaling.keys())], [1, max(scaling.keys())], 'r--', label='Lineal ideal', alpha=0.7)
    plt.xlabel('Workers')
    plt.ylabel('Speedup')
    plt.legend()
    plt.title('Speedup vs Número de Workers')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

---

## Exercise 4: Complete Pipeline — Out-of-Core + Parallel (20 min)

**Objetivo**: Combinar chunking y paralelización en un pipeline real.

### TODO 17: `process_partition()`

Process a single partition: read, transform, aggregate.

**Hints:**
- Calculate `revenue = price * quantity`
- Create `price_bin` with `pd.cut(df['price'], bins=[0, 50, 200, 500, 1000], labels=[...])`
- Group by `['category', 'price_bin']` and aggregate revenue and quantity

In [None]:
def process_partition(filepath):
    """
    Process a partition: read, transform, aggregate.

    Args:
        filepath: Path to the Parquet partition file

    Returns:
        Aggregated DataFrame with revenue statistics by category and price_bin.
    """
    # TODO: Implement this function
    # 1. Read: df = pd.read_parquet(filepath)
    # 2. Transform:
    #    df['revenue'] = df['price'] * df['quantity']
    #    df['price_bin'] = pd.cut(df['price'], bins=[0, 50, 200, 500, 1000],
    #                            labels=['low', 'mid', 'high', 'premium'])
    # 3. Aggregate:
    #    result = df.groupby(['category', 'price_bin'], observed=True).agg({
    #        'revenue': ['sum', 'mean', 'count'],
    #        'quantity': 'sum'
    #    })
    # 4. Return result
    pass

### TODO 18: `run_parallel_pipeline()`

Run the complete pipeline: process all partitions in parallel, then combine results.

**Hints:**
- Sequential: `[process_partition(f) for f in files]`
- Parallel: `ProcessPoolExecutor(max_workers=4)` with `executor.map()`
- Combine: `pd.concat(results).groupby(level=[0, 1]).sum()`

In [None]:
def run_parallel_pipeline(n_workers: int = 4) -> dict:
    """
    Run the complete parallel pipeline and compare with sequential.

    Args:
        n_workers: Number of worker processes

    Returns:
        Dictionary with sequential/parallel times, speedup, and final results.
    """
    # TODO: Implement this function
    # 1. Get file list from PARTITIONS_DIR
    # 2. Sequential pipeline:
    #      results = [process_partition(f) for f in files]
    #      final = pd.concat(results).groupby(level=[0, 1]).sum()
    # 3. Parallel pipeline:
    #      with ProcessPoolExecutor(max_workers=n_workers) as executor:
    #          results = list(executor.map(process_partition, files))
    #      final = pd.concat(results).groupby(level=[0, 1]).sum()
    # 4. Print times, speedup, and final results
    pass

In [None]:
# Run complete pipeline
print("Running complete pipeline (sequential vs parallel)...")
print("(This may take a few minutes)\n")

pipeline_results = run_parallel_pipeline()

if pipeline_results is None:
    print("TODO: Implement run_parallel_pipeline()")

---

## Save Metrics

In [None]:
# Collect all metrics
metrics = {
    'exercise_0': {
        'read_benchmark': read_bench if 'read_bench' in dir() and read_bench else {},
        'projection': proj_bench if 'proj_bench' in dir() and proj_bench else {},
        'iter_batches': batch_results if 'batch_results' in dir() and batch_results else {},
    },
    'exercise_1': {
        'chunked_stats': stats if 'stats' in dir() and stats else {},
        'electronics_rows': n_electronics if 'n_electronics' in dir() and n_electronics else 0,
    },
    'exercise_2': {
        'validation': validation if 'validation' in dir() and validation else {},
        'full_stats': full_stats if 'full_stats' in dir() and full_stats else {},
    },
    'exercise_3': {
        'threading': thread_bench if 'thread_bench' in dir() and thread_bench else {},
        'multiprocessing': proc_bench if 'proc_bench' in dir() and proc_bench else {},
        'scaling': scaling if 'scaling' in dir() and scaling else {},
    },
    'exercise_4': pipeline_results if 'pipeline_results' in dir() and pipeline_results else {},
}

with open(METRICS_PATH, 'w') as f:
    json.dump(metrics, f, indent=2, default=str)

print(f"\n✅ Metrics saved to {METRICS_PATH}")
print(json.dumps(metrics, indent=2, default=str))

---

## Key Takeaways

1. **PyArrow > Pandas for I/O** — `pq.read_table()` avoids conversion overhead
2. **Projection pushdown** — read only needed columns, save time and memory
3. **Chunking keeps memory constant** — process any file size with fixed RAM
4. **Welford's algorithm** — compute statistics in a single pass without storing data
5. **Threading for I/O** — parallelize file reading with threads
6. **Multiprocessing for CPU** — parallelize computation with processes
7. **Speedup is sub-linear** — Amdahl's Law limits gains from parallelization
8. **Combine everything** — chunking + parallelization = scalable pipelines

---

**Questions?** Check the [Tips & Reference Guide](../docs/labs/lab06_guide.md) or ask your instructor.