
# PtyPy demo: Outliers, STXM & DPC, and Region Selection (synthetic data)

This notebook generates a synthetic soft X-ray ptychography dataset and demonstrates:

1. **Outlier detection** (e.g., saturated or anomalous frames) using robust metrics.  
2. **STXM** map (scalar) and **DPC** maps (x/y), computed directly from the diffraction stack.  
3. **Outlier map vs. STXM vs. DPC** side-by-side comparison.  
4. **Region selection** in scan space (circle or rectangle) and export of a **subset HDF5** ready for PtyPy.

> Notes  
> • STXM here is approximated as **total intensity per position**.  
> • DPC is computed as the **intensity-weighted center-of-mass (COM) shift** along x and y, per position.  
> • Outlier detection combines robust z-scores for total intensity, COM magnitude, and saturated pixel ratio.  
> • The exported subset follows a CXI-like structure: `/entry/data`, `/entry/pos`, plus optional metadata.


In [1]:

# If needed, install dependencies (uncomment in your environment)
# %pip install numpy h5py matplotlib

import numpy as np
import h5py
import matplotlib.pyplot as plt
from pathlib import Path

# Set a fixed RNG seed for reproducibility
rng = np.random.default_rng(42)

print("Libraries loaded.")


ModuleNotFoundError: No module named 'h5py'

## Helper functions

In [None]:

def center_of_mass(img):
    arr = np.asarray(img, dtype=np.float64)
    arr = np.clip(arr, 0, None)
    tot = arr.sum()
    if tot <= 0:
        ny, nx = arr.shape
        return (ny/2.0, nx/2.0)
    ny, nx = arr.shape
    y = np.arange(ny, dtype=np.float64)
    x = np.arange(nx, dtype=np.float64)
    cy = (arr.sum(axis=1) * y).sum() / tot
    cx = (arr.sum(axis=0) * x).sum() / tot
    return (cy, cx)

def robust_zscores(x):
    # Median & MAD-based z-score (scaled by 1.4826 for normal consistency)
    x = np.asarray(x, dtype=np.float64)
    med = np.median(x)
    mad = np.median(np.abs(x - med)) + 1e-12
    return 0.67448975 * (x - med) / (mad)  # z ~ (x - median) / (MAD/0.6745)

def make_scan_positions(ny, nx, ystep=1e-6, xstep=1e-6, y0=0.0, x0=0.0):
    ys = y0 + np.arange(ny) * ystep
    xs = x0 + np.arange(nx) * xstep
    Y, X = np.meshgrid(ys, xs, indexing='ij')
    return np.stack([Y.ravel(), X.ravel()], axis=1)  # (N,2) with (y,x)

def gaussian2d(ny, nx, cy, cx, sigma):
    y = np.arange(ny)[:, None]
    x = np.arange(nx)[None, :]
    return np.exp(-((y - cy)**2 + (x - cx)**2) / (2*sigma**2))

def generate_synthetic_stack(scan_ny=30, scan_nx=40, pix_ny=256, pix_nx=256,
                             base_sigma=12.0, dpc_scale=0.5, noise_level=2.0,
                             sat_level=60000, n_outliers=60):
    """
    Create a synthetic stack (N, Y, X) with positions (N,2).
    - Each frame is a Gaussian whose center is shifted linearly with scan position (DPC-like).
    - Add Poisson-like noise and occasional saturated frames or dead frames as outliers.
    """
    pos = make_scan_positions(scan_ny, scan_nx, ystep=1.0, xstep=1.0, y0=-1.0*scan_ny/2, x0=-1.0*scan_nx/2)
    N = pos.shape[0]
    stack = np.zeros((N, pix_ny, pix_nx), dtype=np.float32)

    # Base center (in pixels)
    base_cy, base_cx = pix_ny/2, pix_nx/2

    # Linear COM shift proportional to scan (DPC-like)
    # Normalize positions to [-1, 1]
    py = (pos[:,0] - pos[:,0].mean()) / (np.abs(pos[:,0]).max() + 1e-12)
    px = (pos[:,1] - pos[:,1].mean()) / (np.abs(pos[:,1]).max() + 1e-12)
    shift_y = dpc_scale * py * (pix_ny/10)
    shift_x = dpc_scale * px * (pix_nx/10)

    for i in range(N):
        cy = base_cy + shift_y[i]
        cx = base_cx + shift_x[i]
        img = 300.0 * gaussian2d(pix_ny, pix_nx, cy, cx, base_sigma)
        # Add noise
        img = img + noise_level * rng.normal(size=img.shape)
        img = np.clip(img, 0, None)
        stack[i] = img.astype(np.float32)

    # Inject outliers: saturated or dead frames
    idx = rng.choice(N, size=n_outliers, replace=False)
    half = n_outliers // 2
    sat_idx = idx[:half]
    dead_idx = idx[half:]
    for i in sat_idx:
        # introduce a hot stripe
        stack[i, :, rng.integers(0, pix_nx)] = sat_level
    for i in dead_idx:
        # zero frame
        stack[i] *= 0.0

    # Clip to 16-bit range
    stack = np.clip(stack, 0, sat_level).astype(np.uint16)

    return stack, pos, sat_level

print("Helper funcs defined.")


## Generate and save a synthetic dataset

In [None]:

out_dir = Path("./synthetic_demo")
out_dir.mkdir(exist_ok=True)

stack, pos, sat_level = generate_synthetic_stack(scan_ny=30, scan_nx=40, pix_ny=256, pix_nx=256,
                                                 base_sigma=12.0, dpc_scale=0.6, noise_level=2.5,
                                                 sat_level=60000, n_outliers=80)

h5_path = out_dir / "synthetic_demo.h5"
with h5py.File(h5_path, "w") as h5:
    e = h5.create_group("/entry")
    e.create_dataset("data", data=stack, compression="gzip", compression_opts=4)
    e.create_dataset("pos", data=pos)
    e.create_dataset("pixel_m", data=np.float64(5e-6))     # example
    e.create_dataset("det_dist_m", data=np.float64(1.2))   # example
    e.create_dataset("energy_eV", data=np.float64(700.0))  # example

print("Saved:", h5_path, "shape:", stack.shape)


## Compute per-frame metrics and outliers

In [None]:

# Metrics: total intensity, fraction saturated, COM shift magnitude
N, ny, nx = stack.shape
tot = stack.reshape(N, -1).sum(axis=1)
frac_sat = (stack >= sat_level).reshape(N, -1).mean(axis=1)

com = np.array([center_of_mass(img) for img in stack])
# Reference center (middle of detector)
ref_cy, ref_cx = ny/2, nx/2
com_shift = np.sqrt((com[:,0]-ref_cy)**2 + (com[:,1]-ref_cx)**2)

# Robust z-scores
z_tot = robust_zscores(tot)
z_sat = robust_zscores(frac_sat)
z_com = robust_zscores(com_shift)

# Combine a simple score; tweak weights if desired
score = np.abs(z_tot) + np.abs(z_sat) + np.abs(z_com)
thresh = 6.0  # heuristic
is_outlier = score > thresh

print("Outliers:", int(is_outlier.sum()), " / ", N)


## Build STXM and DPC maps

In [None]:

# STXM: scalar per position = total intensity
# DPCx, DPCy: COM offsets relative to center
ys = np.unique(pos[:,0])
xs = np.unique(pos[:,1])
scan_ny = len(ys)
scan_nx = len(xs)

if scan_ny * scan_nx != N:
    raise RuntimeError("Positions not on a perfect grid in this synthetic demo. Adjust generator.")

STXM = tot.reshape(scan_ny, scan_nx)
DPCy = (com[:,0] - ref_cy).reshape(scan_ny, scan_nx)
DPCx = (com[:,1] - ref_cx).reshape(scan_ny, scan_nx)
OUT = is_outlier.reshape(scan_ny, scan_nx)

print("Maps shaped:", STXM.shape, DPCx.shape, DPCy.shape, OUT.shape)


## Visualize: STXM vs DPCx/DPCy vs Outliers

In [None]:

plt.figure(figsize=(6,5))
plt.imshow(STXM)
plt.title("STXM (sum intensity)")
plt.colorbar()
plt.show()

plt.figure(figsize=(6,5))
plt.imshow(DPCx)
plt.title("DPCx (COM x-offset)")
plt.colorbar()
plt.show()

plt.figure(figsize=(6,5))
plt.imshow(DPCy)
plt.title("DPCy (COM y-offset)")
plt.colorbar()
plt.show()

plt.figure(figsize=(6,5))
plt.imshow(OUT.astype(float))
plt.title("Outlier map (1=flagged)")
plt.colorbar()
plt.show()


## Region selection in scan space (circle / rectangle)

In [None]:

def select_by_position(pos, shape='none', center=None, radius=None, size=None):
    pos = np.asarray(pos, dtype=float)
    if shape == 'none' or pos is None:
        return np.ones(len(pos), dtype=bool)
    if center is None:
        raise ValueError("center is required for shape != 'none'")
    cy, cx = center
    dy = pos[:,0] - cy
    dx = pos[:,1] - cx
    if shape == 'circle':
        if radius is None:
            raise ValueError("radius required for circle")
        return (dy*dy + dx*dx) <= (radius*radius)
    if shape == 'rect':
        if size is None:
            raise ValueError("size (dy,dx) required for rect")
        sy, sx = size
        return (np.abs(dy) <= sy/2.0) & (np.abs(dx) <= sx/2.0)
    raise ValueError(f"Unknown shape {shape!r}")

# Example selection: circle in scan space (units of the synthetic positions)
sel_circle = select_by_position(pos, shape='circle', center=(0.0, 0.0), radius=8.0)
sel_rect   = select_by_position(pos, shape='rect',   center=(0.0, 0.0), size=(10.0, 16.0))

print("Selected (circle):", sel_circle.sum(), "frames")
print("Selected (rect):  ", sel_rect.sum(), "frames")

# Show masks on the scan grid
plt.figure(figsize=(6,5))
plt.imshow(sel_circle.reshape(scan_ny, scan_nx).astype(float))
plt.title("Selection mask: circle")
plt.colorbar()
plt.show()

plt.figure(figsize=(6,5))
plt.imshow(sel_rect.reshape(scan_ny, scan_nx).astype(float))
plt.title("Selection mask: rectangle")
plt.colorbar()
plt.show()


## Export a subset HDF5 (ready for PtyPy)

In [None]:

subset_mask = sel_circle  # choose circle selection for this demo
clean_mask = (~is_outlier) & subset_mask  # optional: drop outliers within the selection

sub_path = out_dir / "synthetic_subset.h5"
with h5py.File(sub_path, "w") as h5:
    e = h5.create_group("/entry")
    e.create_dataset("data", data=stack[clean_mask].astype(np.float32), compression="gzip", compression_opts=4)
    e.create_dataset("pos", data=pos[clean_mask])
    e.create_dataset("pixel_m", data=np.float64(5e-6))
    e.create_dataset("det_dist_m", data=np.float64(1.2))
    e.create_dataset("energy_eV", data=np.float64(700.0))

print("Subset saved:", sub_path, "frames:", int(clean_mask.sum()))



## (Optional) Running PtyPy on the subset

If you have **PtyPy** and an HDF5 loader recipe ready (e.g., the `ptypy_recipe_hdf5.py` from this repo), you can run:

```bash
python pytpy_ingest/examples/ptypy_recipe_hdf5.py   --data ./synthetic_demo/synthetic_subset.h5   --engine DM   --iters 200
```
