# VCF statistics

- Modality specific statistics
    - DeepSomatic
    - Mutect2
    - Strelka
- Consensus statistics
- Rescue statistics

```bash
micromamba install -n rnadnavar -c conda-forge -c bioconda cyvcf2=0.31.1 pysam=0.22.1 bcftools=1.21 htslib=1.21 pandas polars seaborn plotly ipykernel jupyterlab_widgets ipywidgets anywidget
```

In [19]:
import os
import sys
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
import warnings

warnings.filterwarnings("ignore")

# Data processing
import numpy as np
import pandas as pd

# VCF and BAM handling
from cyvcf2 import VCF
import pysam

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Set plotting style
sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (12, 6)

print("✓ All libraries imported successfully")

✓ All libraries imported successfully


## Configuration and File Discovery

Define paths and discover all VCF and alignment files in the dataset.

In [20]:
REFERENCE_FASTA = "/t9k/mnt/joey/bio_db/references/Homo_sapiens/GATK/GRCh38/Sequence/WholeGenomeFasta/Homo_sapiens_assembly38.fasta"

# Configuration
BASE_DIR = Path("/t9k/mnt/hdd/work/Vax/sequencing/aim_exp/rdv_test/COO8801.subset")

# Define file structure
TOOLS = ["deepsomatic", "mutect2", "strelka"]
MODALITIES = ["DNA_TUMOR_vs_DNA_NORMAL", "RNA_TUMOR_vs_DNA_NORMAL"]

In [None]:
class VCFFileDiscovery:
    """Discover and organize all VCF files in the pipeline output"""

    def __init__(self, base_dir: Path):
        self.base_dir = Path(base_dir)
        self.vcf_files = {
            "variant_calling": {},  # Raw tool outputs
            "normalized": {},  # Normalized VCFs
            "annotated": {},  # VEP annotated
            "consensus": {},  # Consensus VCFs
            "rescue": {},  # Rescue VCFs
            "filtered": {},  # Filtered VCFs
        }
        self.alignment_files = {}

    def discover_vcfs(self):
        """Discover all VCF files"""

        # 1. Per-tool variant calling outputs
        for tool in TOOLS:
            for modality in MODALITIES:
                vcf_dir = self.base_dir / "variant_calling" / tool / modality
                if vcf_dir.exists():
                    vcf_files = list(vcf_dir.glob("*.vcf.gz"))
                    # Filter out gVCF files
                    vcf_files = [f for f in vcf_files if ".g.vcf.gz" not in str(f)]
                    if vcf_files:
                        key = f"{tool}_{modality}"
                        self.vcf_files["variant_calling"][key] = vcf_files[0]

        # 2. Normalized VCFs
        for tool in TOOLS:
            for modality in MODALITIES:
                vcf_dir = self.base_dir / "normalized" / tool / modality
                if vcf_dir.exists():
                    vcf_files = list(vcf_dir.glob("*.norm.vcf.gz"))
                    if vcf_files:
                        key = f"{tool}_{modality}"
                        self.vcf_files["normalized"][key] = vcf_files[0]

        # 3. Consensus VCFs
        consensus_dir = self.base_dir / "consensus" / "vcf"
        if consensus_dir.exists():
            for modality in MODALITIES:
                vcf_dir = consensus_dir / modality
                if vcf_dir.exists():
                    vcf_files = list(vcf_dir.glob("*.consensus.vcf.gz"))
                    if vcf_files:
                        self.vcf_files["consensus"][modality] = vcf_files[0]

        # 4. Rescue VCFs
        rescue_dir = self.base_dir / "rescue"
        if rescue_dir.exists():
            for subdir in rescue_dir.iterdir():
                if subdir.is_dir():
                    vcf_files = list(subdir.glob("*.rescued.vcf.gz"))
                    if vcf_files:
                        self.vcf_files["rescue"][subdir.name] = vcf_files[0]

        return self.vcf_files

    def discover_alignments(self):
        """Discover alignment files (CRAM/BAM)"""
        recal_dir = self.base_dir / "preprocessing" / "recalibrated"

        if recal_dir.exists():
            for sample_dir in recal_dir.iterdir():
                if sample_dir.is_dir():
                    cram_files = list(sample_dir.glob("*.cram"))
                    bam_files = list(sample_dir.glob("*.bam"))

                    if cram_files:
                        self.alignment_files[sample_dir.name] = cram_files[0]
                    elif bam_files:
                        self.alignment_files[sample_dir.name] = bam_files[0]

        return self.alignment_files

    def print_summary(self):
        """Print discovery summary"""
        print("=" * 80)
        print("VCF FILE DISCOVERY SUMMARY")
        print("=" * 80)

        for category, files in self.vcf_files.items():
            if files:
                print(f"\n{category.upper()}:")
                for name, path in files.items():
                    print(f"  ✓ {name}: {path.name}")

        if self.alignment_files:
            print(f"\nALIGNMENT FILES:")
            for name, path in self.alignment_files.items():
                print(f"  ✓ {name}: {path.name}")

        print("\n" + "=" * 80)


class VCFStatisticsExtractor:
    """Extract comprehensive statistics from VCF files"""

    def __init__(self, vcf_path: Path):
        self.vcf_path = vcf_path
        self.vcf = None
        self.stats = {}

    def extract_basic_stats(self):
        """Extract basic variant statistics"""
        try:
            self.vcf = VCF(str(self.vcf_path))

            stats = {
                "total_variants": 0,
                "snps": 0,
                "indels": 0,
                "mnps": 0,
                "complex": 0,
                "passed": 0,
                "filtered": 0,
                "chromosomes": set(),
                "qualities": [],
                "variant_types": defaultdict(int),
            }

            for variant in self.vcf:
                stats["total_variants"] += 1
                stats["chromosomes"].add(variant.CHROM)

                # Quality scores
                if variant.QUAL is not None and variant.QUAL > 0:
                    stats["qualities"].append(variant.QUAL)

                # Filter status
                if (
                    variant.FILTER is None
                    or variant.FILTER == "PASS"
                    or variant.FILTER == "."
                ):
                    stats["passed"] += 1
                else:
                    stats["filtered"] += 1

                # Variant type
                if variant.is_snp:
                    stats["snps"] += 1
                    stats["variant_types"]["SNP"] += 1
                elif variant.is_indel:
                    stats["indels"] += 1
                    if variant.is_deletion:
                        stats["variant_types"]["DEL"] += 1
                    else:
                        stats["variant_types"]["INS"] += 1
                else:
                    stats["complex"] += 1
                    stats["variant_types"]["COMPLEX"] += 1

            stats["chromosomes"] = sorted(list(stats["chromosomes"]))

            self.stats["basic"] = stats
            return stats

        except Exception as e:
            print(f"Error processing {self.vcf_path}: {e}")
            return None

    def extract_info_fields(self):
        """Extract INFO field statistics"""
        try:
            if self.vcf is None:
                self.vcf = VCF(str(self.vcf_path))

            # Get available INFO fields from header
            info_fields = {}
            try:
                for key in self.vcf.header_iter():
                    try:
                        # Use .get() method which HREC objects support
                        header_type = key.get("HeaderType", None)
                        if header_type == "INFO":
                            field_id = key.get("ID", None)
                            field_type = key.get("Type", "unknown")

                            if field_id:
                                info_fields[field_id] = {
                                    "type": field_type,
                                    "values": [],
                                }
                    except (KeyError, AttributeError, TypeError):
                        # Skip this header entry if we can't parse it
                        continue
            except Exception as header_err:
                print(
                    f"  Error parsing header: {type(header_err).__name__}: {header_err}"
                )
                raise

            # Collect values
            variant_count = 0
            for variant in self.vcf:
                variant_count += 1
                for info_id in info_fields.keys():
                    try:
                        val = variant.INFO.get(info_id)
                        if val is not None:
                            info_fields[info_id]["values"].append(val)
                    except:
                        pass

                # Limit to first 10000 variants for efficiency
                if variant_count > 10000:
                    break

            # Calculate statistics for numeric fields
            info_stats = {}
            for info_id, data in info_fields.items():
                if data["values"]:
                    try:
                        # Try to convert to numeric
                        numeric_vals = []
                        for v in data["values"]:
                            if isinstance(v, (list, tuple)):
                                numeric_vals.extend(
                                    [float(x) for x in v if x is not None]
                                )
                            else:
                                numeric_vals.append(float(v))

                        if numeric_vals:
                            info_stats[info_id] = {
                                "count": len(numeric_vals),
                                "mean": np.mean(numeric_vals),
                                "median": np.median(numeric_vals),
                                "std": np.std(numeric_vals),
                                "min": np.min(numeric_vals),
                                "max": np.max(numeric_vals),
                                "q25": np.percentile(numeric_vals, 25),
                                "q75": np.percentile(numeric_vals, 75),
                            }
                    except (ValueError, TypeError):
                        # Non-numeric field
                        info_stats[info_id] = {
                            "count": len(data["values"]),
                            "type": "categorical",
                        }

            self.stats["info"] = info_stats
            return info_stats

        except Exception as e:
            import traceback

            print(f"Error extracting INFO fields from {self.vcf_path}:")
            print(f"  {type(e).__name__}: {str(e)}")
            if str(e) == "":
                traceback.print_exc()
            return {}

    def extract_format_fields(self):
        """Extract FORMAT field statistics (sample-level)"""
        try:
            if self.vcf is None:
                self.vcf = VCF(str(self.vcf_path))

            samples = self.vcf.samples
            format_stats = {sample: {} for sample in samples}

            # Common FORMAT fields to extract
            format_fields = ["DP", "AD", "AF", "GQ"]

            for sample in samples:
                for field in format_fields:
                    format_stats[sample][field] = []

            variant_count = 0
            for variant in self.vcf:
                variant_count += 1

                for i, sample in enumerate(samples):
                    # Depth
                    try:
                        dp = variant.format("DP")[i]
                        if dp is not None and dp[0] > 0:
                            format_stats[sample]["DP"].append(dp[0])
                    except:
                        pass

                    # Allelic depth
                    try:
                        ad = variant.format("AD")[i]
                        if ad is not None:
                            format_stats[sample]["AD"].append(ad)
                    except:
                        pass

                    # Allele frequency
                    try:
                        af = variant.format("AF")[i]
                        if af is not None and af[0] is not None:
                            format_stats[sample]["AF"].append(af[0])
                    except:
                        pass

                    # Genotype quality
                    try:
                        gq = variant.format("GQ")[i]
                        if gq is not None and gq[0] is not None:
                            format_stats[sample]["GQ"].append(gq[0])
                    except:
                        pass

                # Limit for efficiency
                if variant_count > 10000:
                    break

            # Calculate statistics
            format_summary = {}
            for sample, fields in format_stats.items():
                format_summary[sample] = {}
                for field, values in fields.items():
                    if values and field != "AD":
                        format_summary[sample][field] = {
                            "count": len(values),
                            "mean": np.mean(values),
                            "median": np.median(values),
                            "min": np.min(values),
                            "max": np.max(values),
                            "q25": np.percentile(values, 25),
                            "q75": np.percentile(values, 75),
                        }

            self.stats["format"] = format_summary
            return format_summary

        except Exception as e:
            print(f"Error extracting FORMAT fields from {self.vcf_path}: {e}")
            return {}

    def extract_all_stats(self, verbose: bool = True):
        """Extract all statistics"""
        if verbose:
            print(f"\nProcessing: {self.vcf_path.name}")

        basic = self.extract_basic_stats()
        info = self.extract_info_fields()
        format_stats = self.extract_format_fields()

        if verbose and basic:
            print(f"  ✓ Total variants: {basic['total_variants']}")
            print(f"  ✓ SNPs: {basic['snps']}, INDELs: {basic['indels']}")
            print(
                f"  ✓ Passed filters: {basic['passed']}, Filtered: {basic['filtered']}"
            )

        return self.stats


def process_all_vcfs(vcf_files_dict):
    """Process all VCF files and collect statistics"""
    all_stats = {}

    for category, files in vcf_files_dict.items():
        if not files:
            continue

        print(f"\n{'=' * 80}")
        print(f"PROCESSING: {category.upper()}")
        print(f"{'=' * 80}")

        all_stats[category] = {}

        for name, vcf_path in files.items():
            try:
                extractor = VCFStatisticsExtractor(vcf_path)
                stats = extractor.extract_all_stats()
                all_stats[category][name] = {"path": vcf_path, "stats": stats}
            except Exception as e:
                print(f"  ✗ Failed to process {name}: {e}")

    return all_stats


def analyze_rescue_vcf(all_vcf_stats, show_plot: bool = True):
    """Analyze rescue VCF statistics"""

    if "rescue" not in all_vcf_stats or not all_vcf_stats["rescue"]:
        print("No rescue VCFs found")
        return

    print("=" * 80)
    print("RESCUE VCF ANALYSIS")
    print("=" * 80)

    for name, data in all_vcf_stats["rescue"].items():
        if "stats" in data and "basic" in data["stats"]:
            basic = data["stats"]["basic"]

            print(f"\n{name}:")
            print(f"  Total rescued variants: {basic.get('total_variants', 0)}")
            print(f"  SNPs: {basic.get('snps', 0)}")
            print(f"  INDELs: {basic.get('indels', 0)}")
            print(f"  Passed filters: {basic.get('passed', 0)}")
            print(f"  Filtered: {basic.get('filtered', 0)}")

            # Compare with DNA consensus
            if "DNA_TUMOR_vs_DNA_NORMAL" in all_vcf_stats.get("consensus", {}):
                dna_consensus = all_vcf_stats["consensus"]["DNA_TUMOR_vs_DNA_NORMAL"]
                if "stats" in dna_consensus and "basic" in dna_consensus["stats"]:
                    dna_total = dna_consensus["stats"]["basic"].get("total_variants", 0)
                    rescue_total = basic.get("total_variants", 0)

                    print(f"\n  DNA Consensus variants: {dna_total}")
                    print(f"  After rescue (DNA + RNA): {rescue_total}")
                    print(f"  Variants added by rescue: {rescue_total - dna_total}")
                    print(
                        f"  Increase: {((rescue_total - dna_total) / dna_total * 100):.1f}%"
                        if dna_total > 0
                        else "N/A"
                    )

    # Create comparison plot
    if show_plot and all_vcf_stats["rescue"]:
        fig = go.Figure()

        categories = []
        values = []

        # DNA consensus
        if "DNA_TUMOR_vs_DNA_NORMAL" in all_vcf_stats.get("consensus", {}):
            dna_data = all_vcf_stats["consensus"]["DNA_TUMOR_vs_DNA_NORMAL"]
            if "stats" in dna_data and "basic" in dna_data["stats"]:
                categories.append("DNA Consensus")
                values.append(dna_data["stats"]["basic"].get("total_variants", 0))

        # RNA consensus
        if "RNA_TUMOR_vs_DNA_NORMAL" in all_vcf_stats.get("consensus", {}):
            rna_data = all_vcf_stats["consensus"]["RNA_TUMOR_vs_DNA_NORMAL"]
            if "stats" in rna_data and "basic" in rna_data["stats"]:
                categories.append("RNA Consensus")
                values.append(rna_data["stats"]["basic"].get("total_variants", 0))

        # Rescue
        for name, data in all_vcf_stats["rescue"].items():
            if "stats" in data and "basic" in data["stats"]:
                categories.append("Rescued (DNA+RNA)")
                values.append(data["stats"]["basic"].get("total_variants", 0))

        fig.add_trace(
            go.Bar(
                x=categories,
                y=values,
                text=values,
                textposition="auto",
                marker_color=["skyblue", "lightcoral", "lightgreen"],
            )
        )

        fig.update_layout(
            title="Variant Counts: DNA Consensus → RNA Consensus → Rescue",
            xaxis_title="VCF Type",
            yaxis_title="Number of Variants",
            template="plotly_white",
            height=500,
        )

        fig.show()


def generate_summary_report(
    all_vcf_stats,
    vcf_files,
    variant_summary,
    quality_summary,
    tool_comparison,
    consensus_comparison,
    output_dir: Path,
):
    """Generate comprehensive summary report"""

    report = []
    report.append("=" * 80)
    report.append("VCF STATISTICS - COMPREHENSIVE SUMMARY REPORT")
    report.append("=" * 80)
    report.append("")

    # 1. Overview
    report.append("## 1. OVERVIEW")
    report.append("")
    total_vcfs = sum(len(files) for files in vcf_files.values() if files)
    report.append(f"Total VCF files analyzed: {total_vcfs}")
    report.append(
        f"Categories: {', '.join([cat for cat, files in vcf_files.items() if files])}"
    )
    report.append(f"Tools: {', '.join(TOOLS)}")
    report.append(f"Modalities: DNA, RNA")
    report.append("")

    # 2. Variant Calling Tools Comparison
    report.append("## 2. VARIANT CALLING TOOLS COMPARISON")
    report.append("")

    if not tool_comparison.empty:
        report.append("### DNA Modality:")
        dna_tools = tool_comparison[
            tool_comparison["Modality"].str.contains("DNA_TUMOR")
        ]
        for _, row in dna_tools.iterrows():
            report.append(
                f"  {row['Tool']:12} - {row['Total_Variants']:6} variants "
                f"(SNPs: {row['SNPs']:5}, INDELs: {row['INDELs']:4})"
            )

        report.append("")
        report.append("### RNA Modality:")
        rna_tools = tool_comparison[
            tool_comparison["Modality"].str.contains("RNA_TUMOR")
        ]
        for _, row in rna_tools.iterrows():
            report.append(
                f"  {row['Tool']:12} - {row['Total_Variants']:6} variants "
                f"(SNPs: {row['SNPs']:5}, INDELs: {row['INDELs']:4})"
            )
    report.append("")

    # 3. Consensus Analysis
    report.append("## 3. CONSENSUS ANALYSIS")
    report.append("")

    if not consensus_comparison.empty:
        for modality in ["DNA_TUMOR_vs_DNA_NORMAL", "RNA_TUMOR_vs_DNA_NORMAL"]:
            mod_name = "DNA" if "DNA_TUMOR" in modality else "RNA"
            mod_data = consensus_comparison[
                consensus_comparison["Modality"].str.contains(mod_name)
            ]

            if not mod_data.empty:
                consensus_count = mod_data["Consensus_Variants"].iloc[0]
                report.append(f"### {mod_name} Consensus: {consensus_count} variants")
                report.append("")
                report.append("  Tool contributions:")
                for _, row in mod_data.iterrows():
                    retention = row["Retention_Rate"] * 100
                    report.append(
                        f"    {row['Tool']:12}: {row['Tool_Variants']:5} variants "
                        f"→ {retention:5.1f}% retained in consensus"
                    )
                report.append("")

    # 4. Rescue Statistics
    report.append("## 4. RESCUE (CROSS-MODALITY) ANALYSIS")
    report.append("")

    if "rescue" in all_vcf_stats and all_vcf_stats["rescue"]:
        for name, data in all_vcf_stats["rescue"].items():
            if "stats" in data and "basic" in data["stats"]:
                basic = data["stats"]["basic"]
                rescue_total = basic.get("total_variants", 0)

                # Compare with DNA consensus
                if "DNA_TUMOR_vs_DNA_NORMAL" in all_vcf_stats.get("consensus", {}):
                    dna_consensus = all_vcf_stats["consensus"][
                        "DNA_TUMOR_vs_DNA_NORMAL"
                    ]
                    if "stats" in dna_consensus and "basic" in dna_consensus["stats"]:
                        dna_total = dna_consensus["stats"]["basic"].get(
                            "total_variants", 0
                        )
                        added = rescue_total - dna_total
                        pct_increase = (added / dna_total * 100) if dna_total > 0 else 0

                        report.append(f"DNA Consensus: {dna_total} variants")
                        report.append(f"After RNA rescue: {rescue_total} variants")
                        report.append(f"Variants added: {added} (+{pct_increase:.1f}%)")
                        report.append(
                            f"SNPs: {basic.get('snps', 0)}, INDELs: {basic.get('indels', 0)}"
                        )
    else:
        report.append("No rescue VCFs found")
    report.append("")

    # 5. Quality Metrics
    report.append("## 5. QUALITY METRICS")
    report.append("")

    if not quality_summary.empty:
        report.append("Average quality scores by tool:")
        for _, row in quality_summary.iterrows():
            if row["Category"] == "variant_calling":
                report.append(
                    f"  {row['Tool']:12} ({row['Modality'][:3]}): "
                    f"Mean={row['Mean_QUAL']:7.2f}, Median={row['Median_QUAL']:7.2f}"
                )
    report.append("")

    # 6. Filter Status
    report.append("## 6. FILTER STATUS SUMMARY")
    report.append("")

    total_passed = variant_summary["Passed"].sum()
    total_filtered = variant_summary["Filtered"].sum()
    total_all = total_passed + total_filtered
    pass_rate = (total_passed / total_all * 100) if total_all > 0 else 0

    report.append(f"Total variants across all VCFs: {total_all}")
    report.append(f"  Passed filters: {total_passed} ({pass_rate:.1f}%)")
    report.append(f"  Filtered out: {total_filtered} ({100 - pass_rate:.1f}%)")
    report.append("")

    # 7. Recommendations
    report.append("## 7. KEY INSIGHTS")
    report.append("")

    if not tool_comparison.empty:
        # Find most/least sensitive tool
        dna_tools = tool_comparison[
            tool_comparison["Modality"].str.contains("DNA_TUMOR")
        ]
        if not dna_tools.empty:
            most_sensitive = dna_tools.loc[dna_tools["Total_Variants"].idxmax()]
            least_sensitive = dna_tools.loc[dna_tools["Total_Variants"].idxmin()]

            report.append(
                f"• Most sensitive tool (DNA): {most_sensitive['Tool']} "
                f"({most_sensitive['Total_Variants']} variants)"
            )
            report.append(
                f"• Most conservative tool (DNA): {least_sensitive['Tool']} "
                f"({least_sensitive['Total_Variants']} variants)"
            )
            report.append("")

    if "rescue" in all_vcf_stats and all_vcf_stats["rescue"]:
        report.append(
            "• Cross-modality rescue successfully recovered additional variants from RNA data"
        )
        report.append(
            "• RNA sequencing provides complementary variant detection to DNA"
        )

    report.append("")
    report.append("=" * 80)
    report.append("END OF REPORT")
    report.append("=" * 80)

    # Print report
    report_text = "\n".join(report)
    print(report_text)

    # Save report
    with open(output_dir / "summary_report.txt", "w") as f:
        f.write(report_text)

    print(f"\n✓ Report saved to {output_dir / 'summary_report.txt'}")

    return report_text


def export_results(
    variant_summary,
    quality_summary,
    tool_comparison,
    consensus_comparison,
    validation_df=None,
    output_dir: Path = Path("vcf_statistics_output"),
):
    """Export summary statistics to CSV files"""
    output_dir.mkdir(exist_ok=True)

    print("Exporting results...")

    # 1. Variant count summary
    variant_summary.to_csv(output_dir / "variant_count_summary.csv", index=False)
    print(f"✓ Exported: {output_dir / 'variant_count_summary.csv'}")

    # 2. Quality summary
    quality_summary.to_csv(output_dir / "quality_summary.csv", index=False)
    print(f"✓ Exported: {output_dir / 'quality_summary.csv'}")

    # 3. Tool comparison
    tool_comparison.to_csv(output_dir / "tool_comparison.csv", index=False)
    print(f"✓ Exported: {output_dir / 'tool_comparison.csv'}")

    # 4. Consensus comparison
    consensus_comparison.to_csv(output_dir / "consensus_comparison.csv", index=False)
    print(f"✓ Exported: {output_dir / 'consensus_comparison.csv'}")

    # 5. Validation results (if available)
    if validation_df is not None and not validation_df.empty:
        validation_df.to_csv(output_dir / "bam_validation_results.csv", index=False)
        print(f"✓ Exported: {output_dir / 'bam_validation_results.csv'}")

    print(f"\n✓ All results exported to {output_dir}/")


class BAMValidator:
    """Validate variants using BAM/CRAM alignment files"""

    def __init__(self, reference_fasta: Optional[str] = None):
        self.reference_fasta = reference_fasta

    def validate_variants(
        self, vcf_path: Path, bam_paths: Dict[str, Path], max_variants: int = 100
    ):
        """
        Validate variants by checking read support in BAM files

        Args:
            vcf_path: Path to VCF file
            bam_paths: Dictionary mapping sample names to BAM/CRAM paths
            max_variants: Maximum number of variants to validate
        """
        validation_results = []

        try:
            vcf = VCF(str(vcf_path))

            # Open BAM files
            bam_files = {}
            for sample, bam_path in bam_paths.items():
                try:
                    if self.reference_fasta and str(bam_path).endswith(".cram"):
                        bam_files[sample] = pysam.AlignmentFile(
                            str(bam_path), "rc", reference_filename=self.reference_fasta
                        )
                    else:
                        bam_files[sample] = pysam.AlignmentFile(str(bam_path))
                except Exception as e:
                    print(f"Warning: Could not open {sample} BAM file: {e}")

            if not bam_files:
                print("No BAM files could be opened for validation")
                return []

            # Validate variants
            variant_count = 0
            for variant in vcf:
                if variant_count >= max_variants:
                    break

                chrom = variant.CHROM
                pos = variant.POS
                ref = variant.REF
                alts = variant.ALT

                variant_result = {
                    "chrom": chrom,
                    "pos": pos,
                    "ref": ref,
                    "alt": ",".join(alts) if alts else "",
                    "qual": variant.QUAL,
                    "filter": variant.FILTER if variant.FILTER else "PASS",
                }

                # Check each sample
                for sample_name, bam_file in bam_files.items():
                    try:
                        # Fetch reads covering this position
                        pileup_count = 0
                        ref_count = 0
                        alt_counts = {alt: 0 for alt in alts if alt}
                        total_depth = 0

                        for pileupcolumn in bam_file.pileup(
                            chrom,
                            pos - 1,
                            pos,
                            truncate=True,
                            min_base_quality=20,
                            max_depth=10000,
                        ):
                            if pileupcolumn.pos == pos - 1:  # 0-based
                                total_depth = pileupcolumn.n

                                for pileupread in pileupcolumn.pileups:
                                    if (
                                        not pileupread.is_del
                                        and not pileupread.is_refskip
                                    ):
                                        base = pileupread.alignment.query_sequence[
                                            pileupread.query_position
                                        ]

                                        if base == ref:
                                            ref_count += 1
                                        elif base in alt_counts:
                                            alt_counts[base] += 1

                                        pileup_count += 1

                        variant_result[f"{sample_name}_total_depth"] = total_depth
                        variant_result[f"{sample_name}_ref_count"] = ref_count
                        for alt, count in alt_counts.items():
                            variant_result[f"{sample_name}_alt_{alt}_count"] = count

                        # Calculate VAF
                        if pileup_count > 0:
                            total_alt = sum(alt_counts.values())
                            vaf = total_alt / pileup_count if pileup_count > 0 else 0
                            variant_result[f"{sample_name}_vaf"] = vaf
                        else:
                            variant_result[f"{sample_name}_vaf"] = 0

                    except Exception as e:
                        variant_result[f"{sample_name}_error"] = str(e)

                validation_results.append(variant_result)
                variant_count += 1

            # Close BAM files
            for bam_file in bam_files.values():
                bam_file.close()

            return validation_results

        except Exception as e:
            print(f"Error during validation: {e}")
            return []

    def summarize_validation(self, validation_results: List[Dict]) -> pd.DataFrame:
        """Convert validation results to DataFrame"""
        if not validation_results:
            return pd.DataFrame()

        df = pd.DataFrame(validation_results)
        return df


class StatisticsAggregator:
    """Aggregate and summarize VCF statistics"""

    def __init__(self, all_stats: Dict):
        self.all_stats = all_stats

    def create_variant_count_summary(self) -> pd.DataFrame:
        """Create summary table of variant counts across all VCFs"""
        rows = []

        for category, files in self.all_stats.items():
            for name, data in files.items():
                if "stats" in data and "basic" in data["stats"]:
                    basic = data["stats"]["basic"]

                    # Parse tool and modality from name
                    parts = name.split("_")
                    if len(parts) >= 2:
                        tool = parts[0]
                        modality = "_".join(parts[1:])
                    else:
                        tool = category
                        modality = name

                    rows.append(
                        {
                            "Category": category,
                            "Tool": tool,
                            "Modality": modality,
                            "Total_Variants": basic.get("total_variants", 0),
                            "SNPs": basic.get("snps", 0),
                            "INDELs": basic.get("indels", 0),
                            "Passed": basic.get("passed", 0),
                            "Filtered": basic.get("filtered", 0),
                            "Pass_Rate": basic.get("passed", 0)
                            / basic.get("total_variants", 1)
                            if basic.get("total_variants", 0) > 0
                            else 0,
                        }
                    )

        df = pd.DataFrame(rows)
        return df.sort_values(["Category", "Tool", "Modality"])

    def create_quality_summary(self) -> pd.DataFrame:
        """Create summary of quality score distributions"""
        rows = []

        for category, files in self.all_stats.items():
            for name, data in files.items():
                if "stats" in data and "basic" in data["stats"]:
                    basic = data["stats"]["basic"]
                    qualities = basic.get("qualities", [])

                    if qualities:
                        parts = name.split("_")
                        tool = parts[0] if parts else category
                        modality = "_".join(parts[1:]) if len(parts) > 1 else name

                        rows.append(
                            {
                                "Category": category,
                                "Tool": tool,
                                "Modality": modality,
                                "Mean_QUAL": np.mean(qualities),
                                "Median_QUAL": np.median(qualities),
                                "Min_QUAL": np.min(qualities),
                                "Max_QUAL": np.max(qualities),
                                "Q25": np.percentile(qualities, 25),
                                "Q75": np.percentile(qualities, 75),
                            }
                        )

        df = pd.DataFrame(rows)
        return df.sort_values(["Category", "Tool", "Modality"])

    def create_info_field_summary(self, info_field: str) -> pd.DataFrame:
        """Create summary for specific INFO field across all VCFs"""
        rows = []

        for category, files in self.all_stats.items():
            for name, data in files.items():
                if "stats" in data and "info" in data["stats"]:
                    info_stats = data["stats"]["info"]

                    if info_field in info_stats:
                        field_data = info_stats[info_field]

                        if isinstance(field_data, dict) and "mean" in field_data:
                            parts = name.split("_")
                            tool = parts[0] if parts else category
                            modality = "_".join(parts[1:]) if len(parts) > 1 else name

                            row = {
                                "Category": category,
                                "Tool": tool,
                                "Modality": modality,
                                "Field": info_field,
                            }
                            row.update(field_data)
                            rows.append(row)

        df = pd.DataFrame(rows)
        return df.sort_values(["Category", "Tool", "Modality"])

    def compare_tools_by_modality(self) -> pd.DataFrame:
        """Compare variant calling tools within each modality"""
        rows = []

        # Focus on variant_calling category
        if "variant_calling" in self.all_stats:
            for name, data in self.all_stats["variant_calling"].items():
                if "stats" in data and "basic" in data["stats"]:
                    basic = data["stats"]["basic"]
                    parts = name.split("_")

                    if len(parts) >= 2:
                        tool = parts[0]
                        modality = "_".join(parts[1:])

                        rows.append(
                            {
                                "Tool": tool,
                                "Modality": modality,
                                "Total_Variants": basic.get("total_variants", 0),
                                "SNPs": basic.get("snps", 0),
                                "INDELs": basic.get("indels", 0),
                                "SNP_Ratio": basic.get("snps", 0)
                                / basic.get("total_variants", 1)
                                if basic.get("total_variants", 0) > 0
                                else 0,
                                "INDEL_Ratio": basic.get("indels", 0)
                                / basic.get("total_variants", 1)
                                if basic.get("total_variants", 0) > 0
                                else 0,
                            }
                        )

        df = pd.DataFrame(rows)
        return df.sort_values(["Modality", "Tool"])

    def compare_consensus_to_individual(self) -> pd.DataFrame:
        """Compare consensus VCFs to individual tool outputs"""
        rows = []

        # Get consensus counts
        consensus_counts = {}
        if "consensus" in self.all_stats:
            for modality, data in self.all_stats["consensus"].items():
                if "stats" in data and "basic" in data["stats"]:
                    consensus_counts[modality] = data["stats"]["basic"].get(
                        "total_variants", 0
                    )

        # Get individual tool counts
        if "variant_calling" in self.all_stats:
            for name, data in self.all_stats["variant_calling"].items():
                if "stats" in data and "basic" in data["stats"]:
                    basic = data["stats"]["basic"]
                    parts = name.split("_")

                    if len(parts) >= 2:
                        tool = parts[0]
                        modality = "_".join(parts[1:])
                        tool_count = basic.get("total_variants", 0)
                        consensus_count = consensus_counts.get(modality, 0)

                        rows.append(
                            {
                                "Tool": tool,
                                "Modality": modality,
                                "Tool_Variants": tool_count,
                                "Consensus_Variants": consensus_count,
                                "Difference": tool_count - consensus_count,
                                "Retention_Rate": consensus_count / tool_count
                                if tool_count > 0
                                else 0,
                            }
                        )

        df = pd.DataFrame(rows)
        return df.sort_values(["Modality", "Tool"])


class VCFVisualizer:
    """Create visualizations for VCF statistics"""

    def __init__(self, all_stats: Dict):
        self.all_stats = all_stats

    def plot_variant_counts_by_tool(self):
        """Bar plot comparing variant counts across tools and modalities"""
        data = []

        if "variant_calling" in self.all_stats:
            for name, vcf_data in self.all_stats["variant_calling"].items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    basic = vcf_data["stats"]["basic"]
                    parts = name.split("_")
                    tool = parts[0] if parts else name
                    modality = "DNA" if "DNA_TUMOR" in name else "RNA"

                    data.append(
                        {
                            "Tool": tool,
                            "Modality": modality,
                            "SNPs": basic.get("snps", 0),
                            "INDELs": basic.get("indels", 0),
                        }
                    )

        if not data:
            print("No data available for plotting")
            return

        df = pd.DataFrame(data)

        # Create grouped bar chart
        fig = go.Figure()

        for modality in df["Modality"].unique():
            df_mod = df[df["Modality"] == modality]
            fig.add_trace(
                go.Bar(
                    name=f"{modality} - SNPs",
                    x=df_mod["Tool"],
                    y=df_mod["SNPs"],
                    text=df_mod["SNPs"],
                    textposition="auto",
                )
            )
            fig.add_trace(
                go.Bar(
                    name=f"{modality} - INDELs",
                    x=df_mod["Tool"],
                    y=df_mod["INDELs"],
                    text=df_mod["INDELs"],
                    textposition="auto",
                )
            )

        fig.update_layout(
            title="Variant Counts by Tool and Modality",
            xaxis_title="Tool",
            yaxis_title="Number of Variants",
            barmode="group",
            height=500,
            template="plotly_white",
        )

        fig.show()

    def plot_quality_distributions(self):
        """Box plot of quality score distributions"""
        data = []

        for category, files in self.all_stats.items():
            for name, vcf_data in files.items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    qualities = vcf_data["stats"]["basic"].get("qualities", [])

                    if qualities:
                        parts = name.split("_")
                        tool = parts[0] if parts else category
                        modality = "DNA" if "DNA_TUMOR" in name else "RNA"

                        for qual in qualities[:1000]:  # Limit for performance
                            data.append(
                                {
                                    "Category": category,
                                    "Tool": tool,
                                    "Modality": modality,
                                    "Quality": qual,
                                }
                            )

        if not data:
            print("No quality data available")
            return

        df = pd.DataFrame(data)

        fig = px.box(
            df,
            x="Tool",
            y="Quality",
            color="Modality",
            facet_col="Category",
            title="Quality Score Distributions",
            template="plotly_white",
            height=500,
        )

        fig.update_yaxes(title_text="QUAL Score")
        fig.show()

    def plot_variant_type_distribution(self):
        """Pie charts showing SNP vs INDEL distribution"""
        fig = make_subplots(
            rows=1,
            cols=2,
            subplot_titles=("DNA Modality", "RNA Modality"),
            specs=[[{"type": "pie"}, {"type": "pie"}]],
        )

        if "variant_calling" in self.all_stats:
            # DNA data
            dna_snps = 0
            dna_indels = 0
            rna_snps = 0
            rna_indels = 0

            for name, vcf_data in self.all_stats["variant_calling"].items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    basic = vcf_data["stats"]["basic"]

                    if "DNA_TUMOR" in name:
                        dna_snps += basic.get("snps", 0)
                        dna_indels += basic.get("indels", 0)
                    else:
                        rna_snps += basic.get("snps", 0)
                        rna_indels += basic.get("indels", 0)

            # DNA pie
            fig.add_trace(
                go.Pie(
                    labels=["SNPs", "INDELs"],
                    values=[dna_snps, dna_indels],
                    name="DNA",
                    marker_colors=["#636EFA", "#EF553B"],
                ),
                row=1,
                col=1,
            )

            # RNA pie
            fig.add_trace(
                go.Pie(
                    labels=["SNPs", "INDELs"],
                    values=[rna_snps, rna_indels],
                    name="RNA",
                    marker_colors=["#636EFA", "#EF553B"],
                ),
                row=1,
                col=2,
            )

        fig.update_layout(
            title_text="Variant Type Distribution by Modality",
            height=400,
            template="plotly_white",
        )

        fig.show()

    def plot_consensus_comparison(self):
        """Compare consensus variants to individual tools"""
        data = []

        # Get consensus counts
        consensus_counts = {}
        if "consensus" in self.all_stats:
            for modality, vcf_data in self.all_stats["consensus"].items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    consensus_counts[modality] = vcf_data["stats"]["basic"].get(
                        "total_variants", 0
                    )

        # Get tool counts
        if "variant_calling" in self.all_stats:
            for name, vcf_data in self.all_stats["variant_calling"].items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    basic = vcf_data["stats"]["basic"]
                    parts = name.split("_")
                    tool = parts[0] if parts else name
                    modality_key = "_".join(parts[1:]) if len(parts) > 1 else name

                    data.append(
                        {
                            "Tool": tool,
                            "Modality": "DNA" if "DNA_TUMOR" in name else "RNA",
                            "Tool_Variants": basic.get("total_variants", 0),
                            "Consensus_Variants": consensus_counts.get(modality_key, 0),
                        }
                    )

        if not data:
            print("No comparison data available")
            return

        df = pd.DataFrame(data)

        fig = go.Figure()

        # Tool variants
        fig.add_trace(
            go.Bar(
                name="Individual Tool",
                x=df["Tool"],
                y=df["Tool_Variants"],
                marker_color="lightblue",
                text=df["Tool_Variants"],
                textposition="auto",
            )
        )

        # Consensus variants (only unique values)
        unique_consensus = df.drop_duplicates(subset=["Modality"])[
            ["Tool", "Consensus_Variants"]
        ]
        fig.add_trace(
            go.Scatter(
                name="Consensus",
                x=df["Tool"],
                y=df["Consensus_Variants"],
                mode="markers+lines",
                marker=dict(size=12, color="red", symbol="diamond"),
                line=dict(color="red", dash="dash"),
            )
        )

        fig.update_layout(
            title="Consensus vs Individual Tool Variant Counts",
            xaxis_title="Tool",
            yaxis_title="Number of Variants",
            template="plotly_white",
            height=500,
        )

        fig.show()

    def plot_filter_status(self):
        """Stacked bar chart showing pass/filter rates"""
        data = []

        for category, files in self.all_stats.items():
            for name, vcf_data in files.items():
                if "stats" in vcf_data and "basic" in vcf_data["stats"]:
                    basic = vcf_data["stats"]["basic"]
                    parts = name.split("_")
                    tool = parts[0] if parts else category

                    data.append(
                        {
                            "Category": category,
                            "Tool": tool,
                            "Name": name[:30],  # Truncate for display
                            "Passed": basic.get("passed", 0),
                            "Filtered": basic.get("filtered", 0),
                        }
                    )

        if not data:
            print("No filter data available")
            return

        df = pd.DataFrame(data)

        fig = go.Figure()

        fig.add_trace(
            go.Bar(name="Passed", x=df["Name"], y=df["Passed"], marker_color="green")
        )

        fig.add_trace(
            go.Bar(name="Filtered", x=df["Name"], y=df["Filtered"], marker_color="red")
        )

        fig.update_layout(
            title="Filter Status Across VCFs",
            xaxis_title="VCF File",
            yaxis_title="Number of Variants",
            barmode="stack",
            template="plotly_white",
            height=500,
            xaxis={"tickangle": -45},
        )

        fig.show()

---

## EXECUTION SECTION

All reusable code (classes and functions) are defined above. Below are the execution cells that use them.

## Step 1: Discover VCF and Alignment Files

Discover all VCF files across the pipeline output and alignment files.

In [30]:
# Discover files
discovery = VCFFileDiscovery(BASE_DIR)
vcf_files = discovery.discover_vcfs()
alignment_files = discovery.discover_alignments()
discovery.print_summary()

VCF FILE DISCOVERY SUMMARY

VARIANT_CALLING:
  ✓ deepsomatic_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL.deepsomatic.vcf.gz
  ✓ deepsomatic_RNA_TUMOR_vs_DNA_NORMAL: RNA_TUMOR_vs_DNA_NORMAL.deepsomatic.vcf.gz
  ✓ mutect2_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL.mutect2.vcf.gz
  ✓ mutect2_RNA_TUMOR_vs_DNA_NORMAL: RNA_TUMOR_vs_DNA_NORMAL.mutect2.vcf.gz
  ✓ strelka_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL.strelka.variants.vcf.gz
  ✓ strelka_RNA_TUMOR_vs_DNA_NORMAL: RNA_TUMOR_vs_DNA_NORMAL.strelka.variants.vcf.gz

NORMALIZED:
  ✓ deepsomatic_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL.deepsomatic.variants.dec.norm.vcf.gz
  ✓ deepsomatic_RNA_TUMOR_vs_DNA_NORMAL: RNA_TUMOR_vs_DNA_NORMAL.deepsomatic.variants.dec.norm.vcf.gz
  ✓ mutect2_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL.mutect2.variants.dec.norm.vcf.gz
  ✓ mutect2_RNA_TUMOR_vs_DNA_NORMAL: RNA_TUMOR_vs_DNA_NORMAL.mutect2.variants.dec.norm.vcf.gz
  ✓ strelka_DNA_TUMOR_vs_DNA_NORMAL: DNA_TUMOR_vs_DNA_NORMAL

## Step 2: Process All VCF Files

Extract comprehensive statistics from all discovered VCF files.

In [31]:
# Process all VCFs
print("Starting comprehensive VCF analysis...")
all_vcf_stats = process_all_vcfs(vcf_files)
print("\n✓ All VCF files processed successfully!")

Starting comprehensive VCF analysis...

PROCESSING: VARIANT_CALLING

Processing: DNA_TUMOR_vs_DNA_NORMAL.deepsomatic.vcf.gz
  ✓ Total variants: 27697
  ✓ SNPs: 26353, INDELs: 1344
  ✓ Passed filters: 52, Filtered: 27645

Processing: RNA_TUMOR_vs_DNA_NORMAL.deepsomatic.vcf.gz
  ✓ Total variants: 13719
  ✓ SNPs: 10866, INDELs: 2853
  ✓ Passed filters: 48, Filtered: 13671

Processing: DNA_TUMOR_vs_DNA_NORMAL.mutect2.vcf.gz
  ✓ Total variants: 758
  ✓ SNPs: 731, INDELs: 27
  ✓ Passed filters: 758, Filtered: 0

Processing: RNA_TUMOR_vs_DNA_NORMAL.mutect2.vcf.gz
  ✓ Total variants: 338
  ✓ SNPs: 303, INDELs: 35
  ✓ Passed filters: 338, Filtered: 0

Processing: DNA_TUMOR_vs_DNA_NORMAL.strelka.variants.vcf.gz
  ✓ Total variants: 15555
  ✓ SNPs: 15545, INDELs: 10
  ✓ Passed filters: 577, Filtered: 14978

Processing: RNA_TUMOR_vs_DNA_NORMAL.strelka.variants.vcf.gz
  ✓ Total variants: 8738
  ✓ SNPs: 8695, INDELs: 43
  ✓ Passed filters: 248, Filtered: 8490

PROCESSING: NORMALIZED

Processing: DNA_

In [33]:
all_vcf_stats

{'variant_calling': {'deepsomatic_DNA_TUMOR_vs_DNA_NORMAL': {'path': PosixPath('/t9k/mnt/hdd/work/Vax/sequencing/aim_exp/rdv_test/COO8801.subset/variant_calling/deepsomatic/DNA_TUMOR_vs_DNA_NORMAL/DNA_TUMOR_vs_DNA_NORMAL.deepsomatic.vcf.gz'),
   'stats': {'basic': {'total_variants': 27697,
     'snps': 26353,
     'indels': 1344,
     'mnps': 0,
     'complex': 0,
     'passed': 52,
     'filtered': 27645,
     'chromosomes': ['chr1',
      'chr10',
      'chr11',
      'chr12',
      'chr13',
      'chr14',
      'chr15',
      'chr16',
      'chr17',
      'chr18',
      'chr19',
      'chr2',
      'chr20',
      'chr21',
      'chr22',
      'chr3',
      'chr4',
      'chr5',
      'chr6',
      'chr7',
      'chr8',
      'chr9',
      'chrX'],
     'qualities': [37.400001525878906,
      0.10000000149011612,
      0.30000001192092896,
      8.399999618530273,
      34.400001525878906,
      36.599998474121094,
      38.900001525878906,
      39.099998474121094,
      37.40000152

## Step 3: BAM Validation (Optional)

Validate variants by checking read support in original BAM/CRAM alignment files.

### Run Validation Example

Example: Validate first 50 variants from a consensus VCF.

In [32]:
# Example: Validate DNA consensus VCF
if "DNA_TUMOR_vs_DNA_NORMAL" in vcf_files.get("consensus", {}):
    dna_consensus_vcf = vcf_files["consensus"]["DNA_TUMOR_vs_DNA_NORMAL"]

    # Map to BAM files
    bam_map = {}
    if "DNA_TUMOR" in alignment_files:
        bam_map["DNA_TUMOR"] = alignment_files["DNA_TUMOR"]
    if "DNA_NORMAL" in alignment_files:
        bam_map["DNA_NORMAL"] = alignment_files["DNA_NORMAL"]

    if bam_map:
        print(f"Validating {dna_consensus_vcf.name} with alignment files...")
        validation_results = validator.validate_variants(
            dna_consensus_vcf, bam_map, max_variants=50
        )

        if validation_results:
            validation_df = validator.summarize_validation(validation_results)
            print(f"\n✓ Validated {len(validation_results)} variants")
            print("\nFirst few validation results:")
            print(validation_df.head(10))
        else:
            print("No validation results obtained")
    else:
        print("No alignment files available for validation")
else:
    print("No consensus VCF found for validation example")

Validating DNA_TUMOR_vs_DNA_NORMAL.consensus.vcf.gz with alignment files...


NameError: name 'validator' is not defined

## Step 4: Data Aggregation & Summary Statistics

Aggregate statistics across all VCF files and create comprehensive summaries.

In [None]:
# Create aggregator
aggregator = StatisticsAggregator(all_vcf_stats)

# Generate summary tables
print("=" * 80)
print("VARIANT COUNT SUMMARY")
print("=" * 80)
variant_summary = aggregator.create_variant_count_summary()
print(variant_summary.to_string(index=False))

print("\n" + "=" * 80)
print("QUALITY SCORE SUMMARY")
print("=" * 80)
quality_summary = aggregator.create_quality_summary()
print(quality_summary.to_string(index=False))

print("\n" + "=" * 80)
print("TOOL COMPARISON BY MODALITY")
print("=" * 80)
tool_comparison = aggregator.compare_tools_by_modality()
print(tool_comparison.to_string(index=False))

print("\n" + "=" * 80)
print("CONSENSUS vs INDIVIDUAL TOOLS")
print("=" * 80)
consensus_comparison = aggregator.compare_consensus_to_individual()
print(consensus_comparison.to_string(index=False))

## Step 5: Visualizations

Create comprehensive visualizations of VCF statistics.

In [None]:
# Create visualizer
visualizer = VCFVisualizer(all_vcf_stats)
print("✓ Visualizer created. Ready to generate plots.")

### Plot 1: Variant Counts by Tool

In [None]:
visualizer.plot_variant_counts_by_tool()

### Plot 2: Quality Score Distributions

### Plot 3: Variant Type Distribution

### Plot 4: Consensus vs Individual Tools

### Plot 5: Filter Status

In [None]:
visualizer.plot_filter_status()

## Step 6: Advanced Analysis - Rescue VCF Statistics

Analyze the rescue VCFs that combine DNA and RNA modality variants.

In [None]:
analyze_rescue_vcf(all_vcf_stats)

## Step 7: Export Results

Export summary statistics to CSV files for further analysis.

In [None]:
# Export results
output_dir = Path("vcf_statistics_output")
export_results(
    variant_summary,
    quality_summary,
    tool_comparison,
    consensus_comparison,
    output_dir=output_dir,
)

## Step 8: Summary Report

Generate a comprehensive summary report of all analyses.

In [None]:
# Generate report
summary_report = generate_summary_report(
    all_vcf_stats,
    vcf_files,
    variant_summary,
    quality_summary,
    tool_comparison,
    consensus_comparison,
    output_dir,
)

---

## Quick Reference Guide

### What This Notebook Does

This comprehensive VCF statistics notebook provides:

1. **File Discovery** - Automatically finds all VCF files across your pipeline
2. **Statistics Extraction** - Uses cyvcf2 to extract:
   - Variant counts (SNPs, INDELs, complex)
   - Quality scores and distributions
   - INFO field statistics (DP, AF, TLOD, etc.)
   - FORMAT field statistics (per-sample depth, allele frequency, genotype quality)
   - Filter status

3. **BAM Validation** - Uses pysam to:
   - Cross-reference variants with alignment files
   - Calculate read support (ref/alt counts)
   - Validate variant allele frequencies (VAF)

4. **Comprehensive Analysis**:
   - Tool comparison (DeepSomatic, Mutect2, Strelka)
   - Modality comparison (DNA vs RNA)
   - Consensus analysis (agreement across tools)
   - Rescue analysis (cross-modality variant recovery)

5. **Visualizations**:
   - Interactive Plotly charts
   - Quality distributions
   - Variant type breakdowns
   - Tool performance comparisons

6. **Export** - All results saved as CSV files

### Main Functions Available

- `VCFFileDiscovery`: Discover VCF and alignment files
- `VCFStatisticsExtractor`: Extract statistics from VCF files
- `BAMValidator`: Validate variants using BAM/CRAM files
- `StatisticsAggregator`: Aggregate and compare statistics
- `VCFVisualizer`: Create visualizations
- `process_all_vcfs()`: Batch process all VCFs
- `analyze_rescue_vcf()`: Analyze rescue variants
- `export_results()`: Export to CSV
- `generate_summary_report()`: Generate text report

### Quick Customization Examples

**Change base directory:**
```python
BASE_DIR = Path("/your/custom/path")
```

**Process single VCF:**
```python
extractor = VCFStatisticsExtractor(vcf_path)
stats = extractor.extract_all_stats()
```

**Analyze specific INFO field:**
```python
tlod_summary = aggregator.create_info_field_summary('TLOD')
print(tlod_summary)
```

**Custom validation:**
```python
results = validator.validate_variants(vcf_path, bam_paths, max_variants=200)
```

In [None]:
visualizer.plot_consensus_comparison()

In [None]:
visualizer.plot_variant_type_distribution()

In [None]:
visualizer.plot_quality_distributions()