This notebook provides examples for interacting with Zarr stores created by `clam collect`, using Dask.

The Zarr store structure:
- Root group with `clam_metadata` containing contigs, column_names (sample names), and chunk_size
- One array per chromosome with shape `(chrom_length, n_samples)`
- Arrays store u32 depth values

In [None]:
from distributed import Client, LocalCluster

try:
    client = Client("tcp://localhost:8786", timeout="2s")
except OSError:
    cluster = LocalCluster(scheduler_port=8786)
    client = Client(cluster)
client.restart()
client

In [2]:
import zarr
import dask.array as da
import pandas as pd
from typing import Any, cast
from pathlib import Path


class ClamDepthStore:
    def __init__(self, path: str | Path):
        self.path = Path(path)

        self.store = zarr.open(self.path, mode="r")

        metadata = cast(dict[str, Any], self.store.attrs["clam_metadata"])
        self.chunk_size: int = metadata["chunk_size"]
        self.sample_names: list[str] = metadata["column_names"]
        self.contigs: dict[str, int] = {
            c["name"]: c["length"] for c in metadata["contigs"]
        }

    def get_dask_array(self, contig: str) -> da.Array:
        """Get a dask array for a contig."""
        return da.from_zarr(self.path / contig)

    def mean_depth_contig(self, contig: str) -> float:
        """Mean depth using dask."""
        return self.get_dask_array(contig).mean().compute()

    def mean_depth_all_contigs(self) -> pd.Series:
        """Mean depth per contig, computed in parallel."""

        means = {contig: self.get_dask_array(contig).mean() for contig in self.contigs}

        results = da.compute(means)[0]
        return pd.Series(results)

    def depth_statistics(self, contig: str) -> dict[str, float]:
        """Compute stats in single pass with dask."""
        arr = self.get_dask_array(contig)

        stats = {
            "mean": arr.mean(),
            "std": arr.std(),
            "min": arr.min(),
            "max": arr.max(),
        }

        return da.compute(stats)[0]

    def depth_statistics_all_contigs(self) -> pd.DataFrame:
        """Compute stats for all contigs in parallel."""

        all_stats = {}
        for contig in self.contigs:
            arr = self.get_dask_array(contig)
            all_stats[contig] = {
                "mean": arr.mean(),
                "std": arr.std(),
                "min": arr.min(),
                "max": arr.max(),
            }

        # Execute all at once
        results = da.compute(all_stats)[0]
        return pd.DataFrame(results).T

In [None]:
store = ClamDepthStore(path="")  # put your clam collect zarr path here.

In [4]:
depth_stats = store.depth_statistics_all_contigs()

In [None]:
depth_stats

In [None]:
import numpy as np


def make_threshold_file(
    depth_stats: pd.DataFrame, output_path: str | Path, n_std: float = 2.0
):
    """Create threshold TSV from depth statistics using Poisson assumption."""
    mean = depth_stats["mean"]
    poisson_std = np.sqrt(mean)

    thresholds = pd.DataFrame(
        {
            "contig": depth_stats.index,
            "min_depth": (mean - n_std * poisson_std).clip(lower=0),
            "max_depth": mean + n_std * poisson_std,
        }
    )
    thresholds.to_csv(output_path, sep="\t", index=False)
    return thresholds


make_threshold_file(depth_stats, "thresholds.tsv")