In [None]:
"""
Ancient DNA Damage Pipeline (Proof-of-Concept)

- Input: BAM and FASTA reference genome
- Computes:
    * mapDamage metrics (fragment lengths, base misincorporations)
    * Windowed C->T statistics (10 kb windows, z-scores)
- Output: Single CSV per genome with all relevant damage metrics
"""

'\nAncient DNA Damage Pipeline (Proof-of-Concept)\n\n- Input: BAM or FASTQ (if FASTQ, assumes pre-alignment steps done separately)\n- Computes:\n    * mapDamage metrics (fragment lengths, base misincorporations)\n    * Windowed C->T statistics (10 kb windows, z-scores)\n- Output: Single CSV per genome with all relevant damage metrics\n'

In [1]:
import os
import shutil
import subprocess
import pysam
import numpy as np
import pandas as pd

In [None]:
bam_file_path = "/Users/eviecolbeck/Documents/aDNA_project/J-44.merged.hg19.sort.rmdup.bam"
fasta_file_path = "/Users/eviecolbeck/Documents/aDNA_project/hs37d5.fa"
output_dir = "damage_output_full"
window_size = 10000
Z_DAMAGE_THRESHOLD = 2.58

os.makedirs(output_dir, exist_ok=True)

# Canonical chromosome list
target_chroms = [str(i) for i in range(1, 23)] + ["X", "Y", "MT"]

detect correct chromosome naming

In [3]:
with pysam.AlignmentFile(bam_file_path, "rb") as bam:
    header_chroms = set(bam.references)

use_chr_prefix = False
if "chr1" in header_chroms:
    use_chr_prefix = True

chrom_map = {}
for c in target_chroms:
    cand = f"chr{c}" if use_chr_prefix else c
    if cand in header_chroms:
        chrom_map[c] = cand

print("Chromosomes detected in BAM:", chrom_map)

Chromosomes detected in BAM: {'1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', '10': '10', '11': '11', '12': '12', '13': '13', '14': '14', '15': '15', '16': '16', '17': '17', '18': '18', '19': '19', '20': '20', '21': '21', '22': '22', 'X': 'X', 'Y': 'Y', 'MT': 'MT'}


In [None]:
def subset_chromosome(bam_path, chrom, out_bam):
    """Extract chromosome-specific BAM."""
    with pysam.AlignmentFile(bam_path, "rb") as bam_in:
        with pysam.AlignmentFile(out_bam, "wb", template=bam_in) as bam_out:
            for read in bam_in.fetch(chrom):
                bam_out.write(read)
    pysam.index(out_bam)


def run_mapdamage(bam, fasta, outdir):
    """Run mapDamage."""
    if os.path.exists(outdir):
        shutil.rmtree(outdir)
    os.makedirs(outdir)

    subprocess.run([
        "mapDamage",
        "-i", bam,
        "-r", fasta,
        "--merge-reference-sequences",
        "--no-stats",
        "-d", outdir
    ], check=True)


def summarize_damage(bam_path, chrom, window_size):
    """Compute C→T hotspot windows."""
    with pysam.AlignmentFile(bam_path, "rb") as bam:
        if chrom not in bam.references:
            return pd.DataFrame()

        chrom_len = dict((sq["SN"], sq["LN"]) for sq in bam.header["SQ"])[chrom]

        starts = []
        ct_counts = []

        for read in bam.fetch(chrom):
            if read.is_unmapped or read.query_sequence is None:
                continue
            starts.append(read.reference_start)

            # Approximate 5' C->T (count T in first 4 bases)
            seq = read.query_sequence
            ct_counts.append(sum(base == "T" for base in seq[:4]))

        if len(starts) == 0:
            return pd.DataFrame()

        # Windows
        bins = np.arange(0, chrom_len + window_size, window_size)
        num_windows = len(bins) - 1

        ct_window_sums = np.zeros(num_windows)
        ct_window_counts = np.zeros(num_windows)

        for start, val in zip(starts, ct_counts):
            win = start // window_size
            if win < num_windows:
                ct_window_sums[win] += val
                ct_window_counts[win] += 1

        ct_window_avg = np.where(ct_window_counts > 0,
                                 ct_window_sums / ct_window_counts,
                                 np.nan)

        # Z-scores
        if not np.isnan(ct_window_avg).all() and np.nanstd(ct_window_avg) > 0:
            ct_z = (ct_window_avg - np.nanmean(ct_window_avg)) / np.nanstd(ct_window_avg)
        else:
            ct_z = np.full_like(ct_window_avg, np.nan)

        # Collect hotspots
        hotspots = []
        for i in range(num_windows):
            if not np.isnan(ct_z[i]) and ct_z[i] >= Z_DAMAGE_THRESHOLD:
                hotspots.append({
                    "Chromosome": chrom,
                    "Start": int(bins[i]),
                    "End": int(bins[i+1]),
                    "CT_Avg": float(ct_window_avg[i]),
                    "CT_Z": float(ct_z[i])
                })

        df = pd.DataFrame(hotspots)
        return df

In [None]:
all_hotspots = []

for short_name, chrom in chrom_map.items():
    print(f"\n=== Processing chromosome {chrom} ===")

    chrom_bam = os.path.join(output_dir, f"{chrom}.bam")
    mapdamage_dir = os.path.join(output_dir, f"mapdamage_{chrom}")
    out_csv = os.path.join(output_dir, f"{chrom}_damage_summary.csv")

    print(" → Subsetting…")
    subset_chromosome(bam_file_path, chrom, chrom_bam)

    print(" → Running mapDamage…")
    run_mapdamage(chrom_bam, fasta_file_path, mapdamage_dir)

    print(" → Summarizing hotspots…")
    df = summarize_damage(chrom_bam, chrom, window_size)
    df.to_csv(out_csv, index=False)

    print(f" → Saved: {out_csv}")
    all_hotspots.append(df)

# Combine all chromosomes into one genome-wide file
combined = pd.concat(all_hotspots, ignore_index=True)
combined.to_csv(os.path.join(output_dir, "GENOME_DAMAGE_SUMMARY.csv"), index=False)

print("Combined output written to GENOME_DAMAGE_SUMMARY.csv")

Bayesian ?