## Latency vs Bandwidth: Sequential vs Random Disk Access

This experiment shows how access patterns — not data size — dominate performance at scale by trading bandwidth for latency.

**Note:** Random reads are aligned to fixed storage blocks (`ALIGN`)  to prevent the OS and storage controller from collapsing them into an accidental sequential stream.

At scale, systems are limited by either bandwidth (sequential access) or latency (random access) — rarely both at once.

Sequential access pays latency once and then streams at full bandwidth.
Random access pays latency repeatedly and never reaches peak bandwidth.

The “cost” of sequential access is not speed, but selectivity: you must scan more data to find what you want.

In [32]:
import os
import random
import time
from pathlib import Path

# ----------------------------
# Parameters you can tweak
# ----------------------------
FILE = Path("bigfile.bin")
SIZE_GB = 4                       # size of the file to create
SEQ_CHUNK = 8 * 1024 * 1024       # 8 MB sequential chunk
RAND_READ_SIZE = 4096             # 4 KB random read size (classic "small random I/O")

TOTAL_RANDOM_BYTES = SIZE_GB * 1024**3  # if too slow, reduce this

ALIGN = 4096  # align random offsets to 4KB boundaries


def make_file():
    target = SIZE_GB * 1024**3
    if FILE.exists() and FILE.stat().st_size >= target:
        return
    print(f"Creating {SIZE_GB} GB file at {FILE} ...")
    with open(FILE, "wb") as f:
        remaining = target
        block = os.urandom(SEQ_CHUNK)
        while remaining > 0:
            n = min(SEQ_CHUNK, remaining)
            f.write(block[:n])
            remaining -= n
    print("Done.")

def open_readonly_nocache(path: Path) -> int:
    """
    Tries to open a file descriptor with OS cache disabled (macOS).
    Falls back to normal reads on other platforms (still works, but may be cache-influenced).
    """
    fd = os.open(str(path), os.O_RDONLY)

    # macOS: F_NOCACHE disables caching for this fd
    try:
        import fcntl
        if hasattr(fcntl, "F_NOCACHE"):
            fcntl.fcntl(fd, fcntl.F_NOCACHE, 1)
            return fd
    except Exception:
        pass

    # If we get here, we couldn't disable cache
    print("NOTE: Could not disable OS page cache on this platform.")
    print("      Results may look 'too fast' due to caching.")
    return fd

def sequential_read_nocache() -> tuple[float, float]:
    fd = open_readonly_nocache(FILE)
    try:
        t0 = time.perf_counter()
        total = 0
        while True:
            b = os.read(fd, SEQ_CHUNK)
            if not b:
                break
            total += len(b)
        dt = time.perf_counter() - t0
        mbps = total / (1024**2) / dt
        return dt, mbps
    finally:
        os.close(fd)

def random_reads_nocache(total_random_bytes: int) -> tuple[float, float, float]:
    size = FILE.stat().st_size
    n_random = total_random_bytes // RAND_READ_SIZE

    fd = open_readonly_nocache(FILE)
    try:
        t0 = time.perf_counter()
        total = 0
        for _ in range(n_random):            
            off = random.randrange(0, size - RAND_READ_SIZE, ALIGN) # choose a random aligned offset
            b = os.pread(fd, RAND_READ_SIZE, off)
            total += len(b)
        dt = time.perf_counter() - t0

        avg_us = (dt / n_random) * 1e6
        eff_mbps = total / (1024**2) / dt
        return dt, avg_us, eff_mbps
    finally:
        os.close(fd)


if __name__ == "__main__":
    make_file()

    n_random = TOTAL_RANDOM_BYTES // RAND_READ_SIZE
    print(f"\nRandom sample: {TOTAL_RANDOM_BYTES/1024**2:.0f} MB "
          f"= {n_random:,} ops × {RAND_READ_SIZE} bytes\n")
    
    dt, avg_us, eff_mbps = random_reads_nocache(TOTAL_RANDOM_BYTES)
    print(f"Random 4KB (nocache if supported): {dt:.2f}s, avg {avg_us:.1f} µs/op, "
          f"effective {eff_mbps:.2f} MB/s")

    dt, mbps = sequential_read_nocache()
    print(f"Sequential (nocache if supported): {dt:.2f}s, {mbps:.1f} MB/s")

    



Random sample: 4096 MB = 1,048,576 ops × 4096 bytes

Random 4KB (nocache if supported): 108.16s, avg 103.1 µs/op, effective 37.87 MB/s
Sequential (nocache if supported): 1.07s, 3842.3 MB/s


## From Full Scans to Partition Pruning: Why HDFS & Data Lakes Chunk Data

In distributed systems like **HDFS** (Hadoop Distributed File System), large datasets are split into **chunks** (typically 128MB blocks) and spread across nodes. This design enables parallel processing, but also introduces a key optimization opportunity: **reading only the chunks you need**.

### The Problem with Flat Files

When you query a plain CSV, the engine must scan the **entire file** to find matching rows—even if you only want one day's worth of data from a multi-year dataset.

### The Solution: Partitioning + Columnar Storage

Modern data lakes solve this with two techniques:

1. **Partitioning** — Physically organizing data into folders by a key column (e.g., `end_ym=2026-01/`). Queries that filter on this column can skip irrelevant partitions entirely ("partition pruning").

2. **Columnar formats like Parquet** — Store data by column rather than row, enabling engines to read only the columns referenced in your query. Parquet also embeds min/max statistics per chunk, allowing further "row group pruning."

### What This Demo Shows

| Step | What Happens | I/O Cost |
|------|--------------|----------|
| CSV scan | Read entire file, filter in memory | **High** |
| Build partitioned Parquet | One-time ETL to reorganize data | One-time |
| Query partitioned Parquet | Read only `end_ym==2026-01` folders | **Low** |

> **Key takeaway:** The same logical query can have vastly different physical costs depending on how data is laid out on disk. This is why chunking strategies matter at scale.

In [34]:
import duckdb, time
from pathlib import Path

# Assume you have a CSV with columns: date, tickers, item, value
CSV = "combined_quarterly_financials.csv"

con = duckdb.connect()

# 1) Query unchunked CSV (scan everything)
t0 = time.perf_counter()
res1 = con.execute("""
  SELECT item, COUNT(*)
  FROM read_csv_auto($1)
  WHERE end_ym == '2026-01'
  GROUP BY item
""", [CSV]).fetchall()
dt1 = time.perf_counter() - t0
print("CSV scan:", dt1, res1[:5])

# 2) Convert to partitioned Parquet by date (one-time cost)
out_dir = Path("events_parquet")
out_dir.mkdir(exist_ok=True)

t0 = time.perf_counter()
con.execute(f"""
  COPY (
    SELECT * FROM read_csv_auto('{CSV}')
  ) TO '{out_dir.as_posix()}'
  (FORMAT PARQUET, PARTITION_BY (end_ym));
""")
dt_build = time.perf_counter() - t0
print("Build partitioned parquet:", dt_build)

# 3) Query partitioned Parquet (reads fewer files)
t0 = time.perf_counter()
res2 = con.execute(f"""
  SELECT item, COUNT(*)
  FROM read_parquet('{out_dir.as_posix()}/**/*.parquet')
  WHERE end_ym == '2026-01'
  GROUP BY item
""").fetchall()
dt2 = time.perf_counter() - t0
print("Partitioned parquet:", dt2, res2[:5])


CSV scan: 1.7385910829761997 [('market_capitalization', 703)]
Build partitioned parquet: 31.15480912500061
Partitioned parquet: 0.447352499992121 [('market_capitalization', 703)]


## Object Storage Basics: The S3 API

While HDFS uses a traditional filesystem model with directories and blocks, modern data lakes increasingly rely on **object storage** systems like Amazon S3, Google Cloud Storage, or open-source alternatives like **MinIO**.

### Key Concepts

- **Buckets** — Top-level containers (similar to root folders)
- **Objects** — Files stored with a unique key (path-like string) and metadata
- **Flat namespace** — No true directories; `folder/file.txt` is just a key containing a `/`

### Core S3 Operations

| Operation | Description |
|-----------|-------------|
| `PUT` | Upload an object to a bucket |
| `HEAD` | Retrieve metadata (size, content-type, etc.) without downloading the object |
| `GET` | Download the full object |
| `Range GET` | Download only a **byte range** — critical for reading chunks of large files |

### Why Range GETs Matter for Big Data

Columnar formats like Parquet store metadata footers at the end of files. A query engine can:

1. Issue a small Range GET to read the footer
2. Determine which row groups contain relevant data
3. Issue targeted Range GETs for only those chunks

This means you can query a 10GB Parquet file but only transfer a few MB over the network—the same "read only what you need" principle from HDFS, but over HTTP.

### This Demo

Below we use **MinIO** (an S3-compatible server running locally) to demonstrate the basic operations. The same code works against real S3 by changing the `endpoint_url` and credentials.

In [35]:
import boto3
from botocore.client import Config

s3 = boto3.client(
    "s3",
    endpoint_url="http://localhost:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    config=Config(signature_version="s3v4"),
    region_name="us-east-1",
)

bucket = "demo"
try:
    s3.create_bucket(Bucket=bucket)
except s3.exceptions.BucketAlreadyOwnedByYou:
    pass

# PUT
s3.upload_file("bigfile.bin", bucket, "bigfile.bin")

# HEAD (metadata)
head = s3.head_object(Bucket=bucket, Key="bigfile.bin")
print("Size:", head["ContentLength"])

# GET (full)
start = time.perf_counter()
s3.download_file(bucket, "bigfile.bin", "bigfile_downloaded.bin")
print("Full GET time:", time.perf_counter() - start)

# Range GET (partial read)
resp = s3.get_object(Bucket=bucket, Key="bigfile.bin", Range="bytes=0-1048575")
data = resp["Body"].read()
print("Range bytes:", len(data))

# do many get ranges to get the full object
start_time = time.perf_counter()
size = head["ContentLength"]
chunk_size = 4*1024*1024  # 4 MB
n_chunks = size // chunk_size
for i in range(n_chunks):
    start = i * chunk_size
    end = start + chunk_size - 1
    resp = s3.get_object(Bucket=bucket, Key="bigfile.bin", Range=f"bytes={start}-{end}")
    data = resp["Body"].read()
print("Range GET time:", time.perf_counter() - start_time)

Size: 4294967296
Full GET time: 18.252469667000696
Range bytes: 1048576
Range GET time: 17.17798308300553
