# Module 9: Files, APIs & Larger-than-Memory Tactics

**Learning Objectives:**
- Ingest large CSV files in chunks without running out of RAM
- Use the Parquet format for faster, compressed data storage
- Pull data from REST APIs into a Pandas workflow
- Apply caching and profiling techniques to speed up ML pipelines

> **Why does this matter?** Real-world datasets are large and live in many places — spreadsheets, APIs, databases. Knowing how to efficiently read, transform, and store them is what separates a production data scientist from a notebook prototype.

In [None]:
import os
import io
import time
import json
import gzip
import pickle
import hashlib
import requests
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# Create a working directory for this module
os.makedirs('data', exist_ok=True)
os.makedirs('cache', exist_ok=True)
print("Working directories created: data/ and cache/")

---
## 1. Generating a Large Synthetic Dataset

Before practicing chunked ingestion, we need a big file to work with.

In [None]:
# Write a 500k-row CSV to simulate a 'large' file
np.random.seed(42)
N = 500_000

large_df = pd.DataFrame({
    'transaction_id': range(1, N + 1),
    'user_id': np.random.randint(1000, 9999, N),
    'amount': np.random.exponential(scale=200, size=N).round(2),
    'category': np.random.choice(['Food', 'Electronics', 'Clothing', 'Travel', 'Health'], N),
    'status': np.random.choice(['completed', 'refunded', 'failed'], N, p=[0.85, 0.1, 0.05]),
    'timestamp': pd.date_range('2022-01-01', periods=N, freq='1min').strftime('%Y-%m-%d %H:%M:%S')
})

csv_path = 'data/transactions_large.csv'
large_df.to_csv(csv_path, index=False)
file_size_mb = os.path.getsize(csv_path) / (1024 * 1024)
print(f"Wrote {N:,} rows to '{csv_path}' ({file_size_mb:.1f} MB)")

---
## 2. Chunked CSV Ingestion

When a file is too large to fit in RAM, we read and process it in **chunks**. Each chunk is a regular DataFrame — we process it and throw it away before reading the next one.

In [None]:
# Example 1: Sum total revenue by category across all chunks
CHUNK_SIZE = 50_000
category_totals = {}

start = time.time()
with pd.read_csv(csv_path, chunksize=CHUNK_SIZE) as reader:
    for i, chunk in enumerate(reader):
        # Keep only completed transactions
        chunk_filtered = chunk[chunk['status'] == 'completed']
        # Accumulate totals
        chunk_agg = chunk_filtered.groupby('category')['amount'].sum()
        for cat, total in chunk_agg.items():
            category_totals[cat] = category_totals.get(cat, 0) + total

elapsed = time.time() - start
result_df = pd.Series(category_totals).reset_index()
result_df.columns = ['Category', 'Total Revenue ($)']
result_df = result_df.sort_values('Total Revenue ($)', ascending=False)

print(f"Processed {N:,} rows in {elapsed:.2f}s using chunks of {CHUNK_SIZE:,}")
print("\nRevenue by Category (completed transactions):")
result_df

In [None]:
# Example 2: Filter and save only 'refunded' rows
refunded_chunks = []

with pd.read_csv(csv_path, chunksize=CHUNK_SIZE) as reader:
    for chunk in reader:
        refunded_chunks.append(chunk[chunk['status'] == 'refunded'])

refunded_df = pd.concat(refunded_chunks, ignore_index=True)
print(f"Total refunded transactions: {len(refunded_df):,}")
refunded_df.head()

In [None]:
# Reusable chunk processing function
def process_csv_in_chunks(filepath, chunk_size, filter_fn=None, agg_fn=None):
    """
    Generic chunked CSV processor.
    - filter_fn: function(chunk) -> filtered_chunk
    - agg_fn: function(chunk) -> value to accumulate
    Returns a list of per-chunk results.
    """
    results = []
    with pd.read_csv(filepath, chunksize=chunk_size) as reader:
        for chunk in reader:
            if filter_fn:
                chunk = filter_fn(chunk)
            if agg_fn:
                results.append(agg_fn(chunk))
            else:
                results.append(chunk)
    return results

# Example usage
count_results = process_csv_in_chunks(
    csv_path,
    chunk_size=100_000,
    filter_fn=lambda c: c[c['status'] == 'completed'],
    agg_fn=lambda c: len(c)
)
print(f"Total completed transactions: {sum(count_results):,}")

---
## 3. Parquet & Compression

Parquet is a **columnar storage format** that is:
- Much faster to read (only loads requested columns)
- Significantly smaller on disk (compressed by default)
- Schema-aware (preserves dtypes perfectly)

In [None]:
# Use the refunded dataset as our example
csv_out_path = 'data/refunded.csv'
parquet_path = 'data/refunded.parquet'
parquet_gz_path = 'data/refunded_gzip.parquet'

refunded_df.to_csv(csv_out_path, index=False)
refunded_df.to_parquet(parquet_path, index=False)
refunded_df.to_parquet(parquet_gz_path, index=False, compression='gzip')

sizes = {
    'CSV': os.path.getsize(csv_out_path) / 1024,
    'Parquet (snappy)': os.path.getsize(parquet_path) / 1024,
    'Parquet (gzip)': os.path.getsize(parquet_gz_path) / 1024
}

print("File size comparison:")
for fmt, kb in sizes.items():
    print(f"  {fmt:<22}: {kb:>8.1f} KB")

In [None]:
# Read speed comparison
def time_read(path, read_fn, label):
    start = time.time()
    df = read_fn(path)
    elapsed = time.time() - start
    print(f"{label:<28}: {elapsed:.4f}s  ({len(df):,} rows)")
    return df

print("Read performance comparison:")
time_read(csv_out_path, pd.read_csv, 'CSV')
time_read(parquet_path, pd.read_parquet, 'Parquet (snappy)')
time_read(parquet_gz_path, pd.read_parquet, 'Parquet (gzip)')

In [None]:
# Parquet column pruning — only load the columns you need (huge speed gain!)
start = time.time()
partial_df = pd.read_parquet(parquet_path, columns=['user_id', 'amount'])
elapsed = time.time() - start
print(f"Parquet with column pruning (2 cols): {elapsed:.4f}s")
partial_df.head()

---
## 4. API Ingestion Basics

APIs (Application Programming Interfaces) are the most common way to pull fresh data from external services — financial data, weather, CRM systems, etc.

In [None]:
# Example 1: Public JSON API (Open-Meteo weather — no auth required)
def fetch_weather(latitude=1.3521, longitude=103.8198, days=7):
    """Fetch daily temperature forecast from Open-Meteo API."""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
        'latitude': latitude,
        'longitude': longitude,
        'daily': 'temperature_2m_max,temperature_2m_min,precipitation_sum',
        'timezone': 'Asia/Singapore',
        'forecast_days': days
    }
    response = requests.get(url, params=params, timeout=10)
    response.raise_for_status()  # Raises exception on HTTP error
    return response.json()

try:
    weather_json = fetch_weather()
    weather_df = pd.DataFrame(weather_json['daily'])
    print("✅ Weather API call successful!")
    weather_df
except Exception as e:
    print(f"⚠️  API call failed (expected offline): {e}")
    # Fallback mock data
    weather_df = pd.DataFrame({
        'time': pd.date_range('2024-01-01', periods=7, freq='D').strftime('%Y-%m-%d'),
        'temperature_2m_max': [32.1, 33.5, 31.8, 34.0, 32.7, 33.2, 31.5],
        'temperature_2m_min': [25.4, 26.1, 24.9, 26.5, 25.8, 26.0, 24.7],
        'precipitation_sum': [0.2, 0.0, 5.4, 0.0, 2.1, 0.0, 0.0]
    })
    print("Using mock weather data instead.")
weather_df

In [None]:
# Robust API pattern: retry with exponential backoff
def api_get_with_retry(url, params=None, max_retries=3, backoff=2.0):
    """GET request with retry logic and exponential backoff."""
    for attempt in range(1, max_retries + 1):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            wait = backoff ** attempt
            print(f"Attempt {attempt}/{max_retries} failed: {e}. Retrying in {wait}s...")
            time.sleep(wait)
    raise RuntimeError(f"API call failed after {max_retries} attempts.")

print("api_get_with_retry() defined and ready to use.")

In [None]:
# Pattern: ingest paginated API
def fetch_paginated(base_url, page_param='page', per_page=100, max_pages=5):
    """
    Yields DataFrames page by page from a paginated JSON API.
    Stops when an empty page is returned or max_pages reached.
    """
    for page in range(1, max_pages + 1):
        params = {page_param: page, 'per_page': per_page}
        try:
            data = api_get_with_retry(base_url, params=params)
            if not data:
                print(f"Empty page {page}, stopping.")
                break
            yield pd.DataFrame(data)
        except Exception as e:
            print(f"Stopped at page {page}: {e}")
            break

# Usage example (JSONPlaceholder - free test API)
frames = list(fetch_paginated('https://jsonplaceholder.typicode.com/posts', max_pages=2))
if frames:
    posts_df = pd.concat(frames, ignore_index=True)
    print(f"Fetched {len(posts_df)} posts from API.")
    posts_df[['userId', 'id', 'title']].head()

---
## 5. Caching & Performance Tips for ML Pipelines

Caching avoids re-running expensive operations (API calls, heavy data processing) every time.

In [None]:
# Simple disk-based cache using pickle
def cache_key(fn_name, *args, **kwargs):
    """Generate a unique cache key from function name and arguments."""
    raw = f"{fn_name}_{args}_{sorted(kwargs.items())}"
    return hashlib.md5(raw.encode()).hexdigest()

def cached_call(fn, cache_dir='cache', ttl_seconds=3600, *args, **kwargs):
    """
    Calls fn(*args, **kwargs), caching the result to disk.
    Returns cached result if it exists and is within TTL.
    """
    key = cache_key(fn.__name__, *args, **kwargs)
    cache_file = os.path.join(cache_dir, f"{key}.pkl")
    
    if os.path.exists(cache_file):
        age = time.time() - os.path.getmtime(cache_file)
        if age < ttl_seconds:
            print(f"[CACHE HIT]  {fn.__name__} (age: {age:.0f}s)")
            with open(cache_file, 'rb') as f:
                return pickle.load(f)
    
    print(f"[CACHE MISS] Running {fn.__name__}...")
    result = fn(*args, **kwargs)
    with open(cache_file, 'wb') as f:
        pickle.dump(result, f)
    return result


# Demo: slow data loading function
def load_and_aggregate_data(filepath):
    time.sleep(1)  # Simulating expensive operation
    df = pd.read_csv(filepath)
    return df.groupby('category')['amount'].sum()

# First call: cache miss
result1 = cached_call(load_and_aggregate_data, 'cache', 3600, csv_path)

# Second call: cache hit
result2 = cached_call(load_and_aggregate_data, 'cache', 3600, csv_path)
result2

In [None]:
# Memory optimization: downcast numeric types
def optimize_dtypes(df):
    """Reduce memory usage by downcasting numeric columns."""
    before = df.memory_usage(deep=True).sum() / 1024 / 1024
    
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:  # Low cardinality
            df[col] = df[col].astype('category')
    
    after = df.memory_usage(deep=True).sum() / 1024 / 1024
    print(f"Memory: {before:.2f} MB → {after:.2f} MB  ({(1-after/before)*100:.1f}% reduction)")
    return df

sample = pd.read_csv(csv_path, nrows=100_000)
sample_optimized = optimize_dtypes(sample.copy())
sample_optimized.dtypes

---
## 6. Exercises

1. Modify `process_csv_in_chunks` to calculate the **median** amount per category across 100k-row chunks. (*Hint: true median across chunks requires storing all values — why?*)
2. Compare Parquet with the `brotli` compression codec. How does it compare to gzip in size and speed?
3. Call the Open Library API at `https://openlibrary.org/search.json?q=python` and load the first 10 results into a DataFrame with columns: title, author, first_publish_year.
4. Add a `from_date` parameter to the cache key in `cached_call` so the same function with different date args gets different cache files.
5. Apply `optimize_dtypes` to the full 500k-row CSV and measure the total reduction.