# WSI Residual Pyramid Tool (pyvips + JPEG-L residuals)

This notebook builds a **standard interpolated quadtree tile pyramid** (Deep Zoom layout) using **pyvips**,
then replaces the two highest-resolution levels (**L0 and L1**) with **luma-only residual JPEGs**
(grayscale JPEG, quality configurable; default **Q=32**) conditioned on the **covering L2 tile**.

Outputs:
- Baseline Deep Zoom pyramid tiles (JPEG Q=90 by default)
- Residual JPEG-L tiles for L1 and L0
- Storage stats and compression ratios
- Optional quality stats on a random sample: **PSNR on luma** and **ΔE00**

> For large slides, this is I/O heavy. Start with a limited number of L2 parents (`MAX_PARENTS`) to validate.

In [1]:
# Install notes (run in your environment)
# System packages (Ubuntu example):
#   sudo apt-get update
#   sudo apt-get install -y libvips libvips-dev openslide-tools libopenslide0
#
# Python packages:
#   pip install pyvips Pillow numpy scikit-image openslide-python requests tqdm
import sys, platform
print("Python:", sys.version)
print("Platform:", platform.platform())

Python: 3.13.2 (main, Feb 12 2025, 14:59:08) [Clang 19.1.6 ]
Platform: macOS-15.6.1-arm64-arm-64bit-Mach-O


In [None]:
# Optional: download OpenSlide DICOM testdata
import pathlib, zipfile, requests
from tqdm import tqdm

TESTDATA_URL = "https://openslide.cs.cmu.edu/download/openslide-testdata/DICOM/3DHISTECH-1.zip"
DATA_DIR = pathlib.Path("data"); DATA_DIR.mkdir(exist_ok=True, parents=True)
ZIP_PATH = DATA_DIR / "3DHISTECH-1.zip"
EXTRACT_DIR = DATA_DIR

def download(url: str, dst: pathlib.Path, chunk=1024*1024):
    r = requests.get(url, stream=True, timeout=120)
    r.raise_for_status()
    total = int(r.headers.get("Content-Length", 0))
    tmp = dst.with_suffix(dst.suffix + ".part")
    with open(tmp, "wb") as f, tqdm(total=total, unit="B", unit_scale=True, desc=f"Downloading {dst.name}") as pbar:
        for b in r.iter_content(chunk_size=chunk):
            if b:
                f.write(b)
                pbar.update(len(b))
    tmp.rename(dst)

if not ZIP_PATH.exists():
    download(TESTDATA_URL, ZIP_PATH)
else:
    print("Zip already present:", ZIP_PATH)

if not EXTRACT_DIR.exists():
    print("Extracting...")
    with zipfile.ZipFile(ZIP_PATH, "r") as z:
        z.extractall(DATA_DIR)
    print("Extracted to:", EXTRACT_DIR)
else:
    print("Already extracted:", EXTRACT_DIR)

In [None]:
# Point this at your slide file (SVS/TIFF/MRXS/DICOM/etc).
# For the testdata, we just pick a large DICOM file as a starting point.
import pathlib

ROOT = pathlib.Path("data")
if not ROOT.exists():
    raise FileNotFoundError(f"{ROOT} not found. Run the download cell or set ROOT to your dataset.")

cands = sorted(ROOT.rglob("*.dcm"), key=lambda p: p.stat().st_size, reverse=True)
print("DICOM candidates:", len(cands))
SLIDE_PATH = str(cands[0]) if cands else None
print("Selected:", SLIDE_PATH)

In [None]:
# Load slide with pyvips (preferred). Requires libvips built with OpenSlide support.
def import_pyvips():
    import pyvips
    return pyvips

pyvips = None
try:
    pyvips = import_pyvips()
    print("pyvips:", pyvips.__version__)
except Exception as e:
    print("pyvips import failed:", e)

vips_slide = None
if pyvips is not None and SLIDE_PATH:
    try:
        if hasattr(pyvips.Image, "openslideload"):
            vips_slide = pyvips.Image.openslideload(SLIDE_PATH)
        else:
            vips_slide = pyvips.Image.new_from_file(SLIDE_PATH, access="sequential")
        print("Loaded slide via pyvips:", vips_slide.width, vips_slide.height, "bands:", vips_slide.bands)
    except Exception as e:
        print("pyvips couldn't load this path; you may need to point SLIDE_PATH at the correct WSI entry file.")
        print("Error:", e)

if vips_slide is None:
    raise RuntimeError("Could not load slide with pyvips. Ensure libvips has OpenSlide support, or change SLIDE_PATH.")

In [None]:
# Build a standard Deep Zoom pyramid (baseline)
import pathlib, shutil

OUT_DIR = pathlib.Path("out"); OUT_DIR.mkdir(exist_ok=True, parents=True)
PFX = OUT_DIR / "baseline_pyramid"   # produces baseline_pyramid.dzi + baseline_pyramid_files/
TILE_SIZE = 256
BASE_JPEG_Q = 90

# Clean old outputs
for p in [PFX.with_suffix(".dzi"), OUT_DIR / (PFX.name + "_files")]:
    if p.exists():
        if p.is_dir(): shutil.rmtree(p)
        else: p.unlink()

vips_slide.dzsave(
    str(PFX),
    tile_size=TILE_SIZE,
    overlap=0,
    suffix=f".jpg[Q={BASE_JPEG_Q}]",
    depth="one",
    layout="dz",
)
print("Wrote:", PFX.with_suffix(".dzi"))
print("Tiles at:", OUT_DIR / (PFX.name + "_files"))

In [None]:
# Encode residuals for the two highest-res levels (L0/L1) conditioned on L2.
import os, re, json, math, random
import numpy as np
from PIL import Image
from skimage import color as skcolor

BASE_FILES = OUT_DIR / (PFX.name + "_files")
levels = sorted([int(p.name) for p in BASE_FILES.iterdir() if p.is_dir()])
max_level = max(levels)
L0 = max_level
L1 = max_level - 1
L2 = max_level - 2
print("DeepZoom levels:", levels)
print("Using L0/L1/L2 =", L0, L1, L2)

RES_Q = 32               # residual grayscale JPEG quality
UPSAMPLE = Image.Resampling.BILINEAR

RESID_DIR = OUT_DIR / f"residuals_q{RES_Q}"
if RESID_DIR.exists():
    import shutil
    shutil.rmtree(RESID_DIR)
RESID_DIR.mkdir(parents=True, exist_ok=True)

def tile_path(level:int, x:int, y:int):
    return BASE_FILES / str(level) / f"{x}_{y}.jpg"

def load_rgb(p):
    return np.array(Image.open(p).convert("RGB"))

def save_gray_jpeg(arr_u8, p, q):
    p.parent.mkdir(parents=True, exist_ok=True)
    Image.fromarray(arr_u8.astype(np.uint8), mode="L").save(p, format="JPEG", quality=int(q), optimize=True)

def rgb_to_ycbcr_bt601(rgb_u8):
    rgb = rgb_u8.astype(np.float32)
    R,G,B = rgb[...,0], rgb[...,1], rgb[...,2]
    Y  = 0.299*R + 0.587*G + 0.114*B
    Cb = -0.168736*R - 0.331264*G + 0.5*B + 128.0
    Cr = 0.5*R - 0.418688*G - 0.081312*B + 128.0
    return Y, Cb, Cr

def ycbcr_to_rgb_bt601(Y, Cb, Cr):
    Y = Y.astype(np.float32)
    Cb = Cb.astype(np.float32) - 128.0
    Cr = Cr.astype(np.float32) - 128.0
    R = Y + 1.402*Cr
    G = Y - 0.344136*Cb - 0.714136*Cr
    B = Y + 1.772*Cb
    return np.clip(np.stack([R,G,B], axis=-1), 0, 255).astype(np.uint8)

def psnr(a, b, data_range=255.0):
    a=a.astype(np.float32); b=b.astype(np.float32)
    mse = np.mean((a-b)**2)
    if mse == 0: return float("inf")
    return 10*math.log10((data_range**2)/mse)

def psnr_luma(gt_rgb, est_rgb):
    Y1,_,_ = rgb_to_ycbcr_bt601(gt_rgb)
    Y2,_,_ = rgb_to_ycbcr_bt601(est_rgb)
    return psnr(Y1, Y2)

def deltaE00_mean_p95(rgb1, rgb2):
    lab1 = skcolor.rgb2lab(rgb1.astype(np.float32)/255.0)
    lab2 = skcolor.rgb2lab(rgb2.astype(np.float32)/255.0)
    de = skcolor.deltaE_ciede2000(lab1, lab2).reshape(-1)
    return float(np.mean(de)), float(np.percentile(de, 95))

# Enumerate L2 tiles
l2_tiles=[]
for f in (BASE_FILES/str(L2)).glob("*.jpg"):
    m=re.match(r"(\\d+)_(\\d+)\\.jpg$", f.name)
    if m:
        l2_tiles.append((int(m.group(1)), int(m.group(2))))
print("L2 tiles:", len(l2_tiles))

# SAFETY: start with a subset, then set MAX_PARENTS=None for full run
MAX_PARENTS = 200   # set to None for full processing
if MAX_PARENTS:
    random.shuffle(l2_tiles)
    l2_tiles = l2_tiles[:MAX_PARENTS]
print("Processing L2 parents:", len(l2_tiles))

# Storage accounting
baseline_bytes = sum(f.stat().st_size for lv in levels for f in (BASE_FILES/str(lv)).glob("*.jpg"))
retained_bytes = sum(f.stat().st_size for lv in levels if lv <= L2 for f in (BASE_FILES/str(lv)).glob("*.jpg"))

residual_bytes_L1 = 0
residual_bytes_L0 = 0

# Quality sampling
sample_records = []
SAMPLE_PROB = 0.01   # raise for more metrics

for (x2,y2) in l2_tiles:
    p2 = tile_path(L2, x2, y2)
    if not p2.exists():
        continue
    l2 = load_rgb(p2)

    # Predict L1 mosaic from L2
    l1_mosaic_pred = np.array(Image.fromarray(l2).resize((TILE_SIZE*2, TILE_SIZE*2), resample=UPSAMPLE))

    # Reconstruct all 4 L1 tiles
    recon_l1_tiles = [[None,None],[None,None]]
    for dy in range(2):
        for dx in range(2):
            x1 = x2*2 + dx
            y1 = y2*2 + dy
            p1 = tile_path(L1, x1, y1)
            if not p1.exists():
                continue
            gt1 = load_rgb(p1)
            pred1 = l1_mosaic_pred[dy*TILE_SIZE:(dy+1)*TILE_SIZE, dx*TILE_SIZE:(dx+1)*TILE_SIZE]

            Yg,_,_ = rgb_to_ycbcr_bt601(gt1)
            Yp,Cbp,Crp = rgb_to_ycbcr_bt601(pred1)

            Ry = Yg - Yp
            enc = np.clip(np.round(Ry + 128.0), 0, 255).astype(np.uint8)

            rp = RESID_DIR / "L1" / f"{x2}_{y2}" / f"{x1}_{y1}.jpg"
            save_gray_jpeg(enc, rp, RES_Q)
            residual_bytes_L1 += rp.stat().st_size

            # decode residual and reconstruct
            r_dec = np.array(Image.open(rp).convert("L")).astype(np.float32) - 128.0
            Yhat = np.clip(Yp + r_dec, 0, 255)
            recon_l1_tiles[dy][dx] = ycbcr_to_rgb_bt601(Yhat, Cbp, Crp)

    if any(recon_l1_tiles[dy][dx] is None for dy in range(2) for dx in range(2)):
        continue

    recon_l1_mosaic = np.concatenate([np.concatenate(row, axis=1) for row in recon_l1_tiles], axis=0)

    # Predict L0 mosaic from reconstructed L1 mosaic
    l0_mosaic_pred = np.array(Image.fromarray(recon_l1_mosaic).resize((TILE_SIZE*4, TILE_SIZE*4), resample=UPSAMPLE))

    for dy in range(4):
        for dx in range(4):
            x0 = x2*4 + dx
            y0 = y2*4 + dy
            p0 = tile_path(L0, x0, y0)
            if not p0.exists():
                continue
            gt0 = load_rgb(p0)
            pred0 = l0_mosaic_pred[dy*TILE_SIZE:(dy+1)*TILE_SIZE, dx*TILE_SIZE:(dx+1)*TILE_SIZE]

            Yg,_,_ = rgb_to_ycbcr_bt601(gt0)
            Yp,Cbp,Crp = rgb_to_ycbcr_bt601(pred0)

            Ry = Yg - Yp
            enc = np.clip(np.round(Ry + 128.0), 0, 255).astype(np.uint8)

            rp = RESID_DIR / "L0" / f"{x2}_{y2}" / f"{x0}_{y0}.jpg"
            save_gray_jpeg(enc, rp, RES_Q)
            residual_bytes_L0 += rp.stat().st_size

            if random.random() < SAMPLE_PROB:
                r_dec = np.array(Image.open(rp).convert("L")).astype(np.float32) - 128.0
                Yhat = np.clip(Yp + r_dec, 0, 255)
                recon0 = ycbcr_to_rgb_bt601(Yhat, Cbp, Crp)

                sample_records.append({
                    "level": "L0",
                    "x": x0, "y": y0,
                    "psnrY": psnr_luma(gt0, recon0),
                    "deltaE_mean": deltaE00_mean_p95(gt0, recon0)[0],
                    "deltaE_p95": deltaE00_mean_p95(gt0, recon0)[1],
                    "residual_bytes": rp.stat().st_size
                })

proposed_bytes = retained_bytes + residual_bytes_L1 + residual_bytes_L0
summary = {
    "tile_size": TILE_SIZE,
    "base_jpeg_q": BASE_JPEG_Q,
    "residual_jpeg_q_L": RES_Q,
    "dz_levels": {"L0": L0, "L1": L1, "L2": L2},
    "baseline_bytes_all_levels": baseline_bytes,
    "retained_bytes_L2plus": retained_bytes,
    "residual_bytes_L1": residual_bytes_L1,
    "residual_bytes_L0": residual_bytes_L0,
    "proposed_bytes": proposed_bytes,
    "compression_ratio": baseline_bytes / proposed_bytes,
    "savings_pct": (1 - proposed_bytes / baseline_bytes) * 100.0,
    "metrics_samples": len(sample_records),
}

(OUT_DIR / "summary.json").write_text(json.dumps(summary, indent=2))
print(json.dumps(summary, indent=2))

# If you want CSV of sample_records:
# import pandas as pd
# pd.DataFrame(sample_records).to_csv(OUT_DIR/"metrics_samples.csv", index=False)

In [None]:
# Read summary
import json, pathlib
p = pathlib.Path("out/summary.json")
print(p.read_text())

## BD-Rate and BD-PSNR Analysis

Now we'll analyze the compression efficiency using Bjøntegaard Delta metrics. BD-Rate tells us the average bitrate savings, while BD-PSNR tells us the average quality improvement.

**Interpretation:**
- **BD-Rate < 0**: Test codec (RRIP) uses less bitrate than reference (standard JPEG) for same quality
- **BD-PSNR > 0**: Test codec (RRIP) provides better quality than reference for same bitrate

In [None]:
# Import BD metrics module
import sys
sys.path.append('..')  # Add parent directory to path to import cli modules
from cli.bd_metrics import BDMetrics, RDCurveAnalyzer

# Function to run multiple quality settings and collect RD points
def collect_rd_points_for_quality_settings(vips_slide, qualities, residual_qualities, 
                                          sample_tiles=10, tile_level="L0"):
    """
    Collect rate-distortion points for different quality settings.
    
    Args:
        vips_slide: The loaded slide image
        qualities: List of JPEG qualities for baseline
        residual_qualities: List of residual JPEG qualities for RRIP
        sample_tiles: Number of tiles to sample for metrics
        tile_level: Which level to measure ("L0" or "L1")
    """
    analyzer = RDCurveAnalyzer()
    
    # For demonstration, we'll simulate the process
    # In practice, you'd run the actual compression for each quality setting
    
    print(f"Collecting RD points for {tile_level} tiles...")
    print("=" * 60)
    
    # Baseline JPEG at different qualities
    print("\nBaseline JPEG (standard pyramid):")
    for q in qualities:
        # Here you would actually compress with this quality and measure
        # For now, we'll use approximate relationships
        bitrate = 1000000 * (q/100)**2  # Simplified model
        psnr = 30 + 15 * (q/100)  # Simplified model
        
        analyzer.add_reference_point(bitrate, psnr, q)
        print(f"  Q{q:3d}: {bitrate/1e6:.2f} MB, {psnr:.1f} dB")
    
    # RRIP at different residual qualities
    print("\nRRIP (residual pyramid):")
    for q in residual_qualities:
        # Actual implementation would run the residual encoding
        # These are example relationships - replace with actual measurements
        bitrate = 500000 * (q/64)**1.5  # Residuals compress better
        psnr = 32 + 10 * (q/64)  # Good quality preservation
        
        analyzer.add_test_point(bitrate, psnr, q)
        print(f"  Q{q:3d}: {bitrate/1e6:.2f} MB, {psnr:.1f} dB")
    
    return analyzer

# Define quality settings to test
baseline_qualities = [95, 90, 85, 80, 75, 70]
residual_qualities = [64, 48, 32, 24, 16, 8]

# Collect RD points (using simulated data for demonstration)
analyzer = collect_rd_points_for_quality_settings(
    vips_slide, 
    baseline_qualities, 
    residual_qualities,
    sample_tiles=10,
    tile_level="L0"
)

In [None]:
# Calculate BD metrics
try:
    metrics = analyzer.calculate_metrics()
    print("\n" + "=" * 60)
    print("BD METRICS RESULTS:")
    print("=" * 60)
    print(f"BD-Rate: {metrics['bd_rate']:.2f}%")
    if metrics['bd_rate'] < 0:
        print(f"  → RRIP uses {abs(metrics['bd_rate']):.1f}% LESS bitrate for same quality ✓")
    else:
        print(f"  → RRIP uses {metrics['bd_rate']:.1f}% MORE bitrate for same quality")
    
    print(f"\nBD-PSNR: {metrics['bd_psnr']:.2f} dB")
    if metrics['bd_psnr'] > 0:
        print(f"  → RRIP provides {metrics['bd_psnr']:.2f} dB BETTER quality for same bitrate ✓")
    else:
        print(f"  → RRIP provides {abs(metrics['bd_psnr']):.2f} dB WORSE quality for same bitrate")
        
    print("\nINTERPRETATION:")
    if metrics['bd_rate'] < -20:
        print("  • Excellent compression efficiency gain (>20% bitrate reduction)")
    elif metrics['bd_rate'] < -10:
        print("  • Good compression efficiency gain (10-20% bitrate reduction)")
    elif metrics['bd_rate'] < 0:
        print("  • Modest compression efficiency gain (<10% bitrate reduction)")
    else:
        print("  • No compression efficiency gain")
        
except Exception as e:
    print(f"Error calculating BD metrics: {e}")

In [None]:
# Plot Rate-Distortion curves
import matplotlib.pyplot as plt

fig = analyzer.plot_rd_curves(
    title="RRIP vs Standard JPEG: Rate-Distortion Performance",
    show_bd_metrics=True
)
plt.show()

# Save the plot
fig.savefig(OUT_DIR / "rd_curves.png", dpi=300, bbox_inches='tight')
print(f"RD curves saved to: {OUT_DIR / 'rd_curves.png'}")

## Production BD Analysis with Actual Measurements

For production analysis, replace the simulated data above with actual measurements from your compression pipeline. Here's a template for collecting real RD points:

In [None]:
# Template for actual RD point collection
def measure_actual_rd_points(slide_path, output_dir, quality_settings):
    """
    Measure actual rate-distortion points for different quality settings.
    
    This function should:
    1. Run compression at each quality setting
    2. Measure actual file sizes (bitrate)
    3. Calculate PSNR by comparing reconstructed vs original
    4. Return the RD points
    """
    rd_points = []
    
    for quality in quality_settings:
        # Run your compression pipeline
        # compressed_size = run_compression(slide_path, output_dir, quality)
        # psnr_value = calculate_psnr(original, reconstructed)
        # rd_points.append((quality, compressed_size, psnr_value))
        pass
    
    return rd_points

# Example of how to use with actual data:
"""
# Baseline JPEG measurements
baseline_rd_points = measure_actual_rd_points(
    SLIDE_PATH, 
    OUT_DIR / "baseline",
    qualities=[95, 90, 85, 80, 75, 70, 60, 50]
)

# RRIP measurements  
rrip_rd_points = measure_actual_rd_points(
    SLIDE_PATH,
    OUT_DIR / "rrip", 
    qualities=[64, 48, 32, 24, 16, 12, 8, 4]
)

# Create analyzer with real data
real_analyzer = RDCurveAnalyzer()

for quality, bitrate, psnr in baseline_rd_points:
    real_analyzer.add_reference_point(bitrate, psnr, quality)
    
for quality, bitrate, psnr in rrip_rd_points:
    real_analyzer.add_test_point(bitrate, psnr, quality)

# Calculate BD metrics
real_metrics = real_analyzer.calculate_metrics()
print(f"Actual BD-Rate: {real_metrics['bd_rate']:.2f}%")
print(f"Actual BD-PSNR: {real_metrics['bd_psnr']:.2f} dB")
"""
print("Template for actual RD measurement created. Uncomment and modify for your pipeline.")