#  Single-nuclei Pseudobulk Preprocessing (RNA-seq and ATAC-seq) Pipeline

## Overview

This pipeline preprocesses single-nuclei pseudobulk **count** data (snATAC-seq or snRNA-seq) for downstream QTL analysis and region-specific studies.

**Goals:**
- Transform raw pseudobulk counts into analysis-ready formats
- Remove technical confounders
- Generate QTL-ready phenotype files or region-specific datasets

## Pipeline Structure

```
Step 0: Sample ID Mapping                                 [sampleid_mapping]
                    ↓
Step 1: Pseudobulk QC                                     [pseudobulk_qc]
        (optional) Region Peak/Gene Filtering          
        (optional) Batch Correction (ComBat or limma)
        (optional) Quantile Normalization
                    ↓
Step 2: Phenotype Reformatting  → BED                     [phenotype_formatting]
        (genome-wide QTL mapping, snATAC-seq only)  
```

## Modality Support

| Feature | snATAC-seq | snRNA-seq |
|---------|-----------|-----------|
| Sample ID mapping | ✓ | ✓ |
| Region/gene filtering | ✓ (`--regions`) | ✓ (`--gene-list`) |
| Blacklist filtering | ✓ | — |
| `pseudobulk_qc` step | ✓ | ✓ |
| `phenotype_formatting` step | ✓ | — (refer to this [pipeline](https://github.com/StatFunGen/xqtl-protocol/blob/main/code/data_preprocessing/phenotype/phenotype_formatting.ipynb)) |

## Input Files

All toy input files required to run this pipeline can be downloaded
[here](https://drive.google.com/drive/folders/13ORslmqWTpICMIufhj_mrdL1KxQsG4lH?usp=drive_link).

| File | Used in |
|------|---------|
| `pseudobulk_peaks_counts_{celltype}.csv.gz` *(snATAC-seq)* | Step 0, Step 1 |
| `pseudobulk_counts_{celltype}.csv.gz` *(snRNA-seq)* | Step 0, Step 1 |
| `metadata_{celltype}.csv` | Step 0, Step 1 |
| `rosmap_sample_mapping_data.csv` | Step 0 |
| `tech_vars_{celltype}.csv` | Step 1 |
| `hg38-blacklist.v2.bed.gz` | Step 1 (snATAC-seq only) |


## Minimal Working Example

## Step 0: Sample ID Mapping

Maps original sample identifiers (`individualID`) to standardized sample IDs (`sampleid`)
across metadata and count matrix files.

### Input

| File | Description |
|------|-------------|
| `rosmap_sample_mapping_data.csv` | Mapping reference: `individualID → sampleid` |
| `metadata_{celltype}.csv` | Per-cell-type sample metadata |
| `pseudobulk_peaks_counts_{celltype}.csv.gz` *(snATAC-seq)* | Per-cell-type peak count matrices |
| `pseudobulk_counts_{celltype}.csv.gz` *(snRNA-seq)* | Per-cell-type gene count matrices |

### Process

**Part 1 — Metadata files**

For each metadata file:
1. Look up each `individualID` in the mapping reference
2. Assign `sampleid` — falls back to `individualID` if no mapping found
3. Reorder columns: `sampleid` first, then `individualID`, then the rest
4. Save updated file

**Part 2 — Count matrix files**

For each count file:
1. Extract the header row (column names only)
2. Keep the first column (peak or gene IDs) unchanged
3. Map remaining column names (`individualID` → `sampleid`) where mapping exists, otherwise keep original
4. Write new header and stream data rows unchanged
5. Recompress with gzip

### Parameters

| Parameter | Default | Description |
|-----------|---------|-------------|
| `map_file` | *required* | CSV with `individualID` → `sampleid` mapping |
| `meta_files` | *required* | Metadata CSV files to remap |
| `count_files` | *required* | Count CSV.gz files to remap |
| `output_dir` | *required* | Parent output directory; writes to `{output_dir}/1_files_with_sampleid/` |

### Output

Output directory: `{output_dir}/1_files_with_sampleid/`

| File | Description |
|------|-------------|
| `metadata_{celltype}.csv` | Metadata with `sampleid` column prepended |
| `pseudobulk_peaks_counts_{celltype}.csv.gz` *(snATAC-seq)* | Count matrices with mapped column headers |
| `pseudobulk_counts_{celltype}.csv.gz` *(snRNA-seq)* | Count matrices with mapped column headers |


**Timing:** < 1 min

In [1]:
sos run pipeline/pseudobulk_preprocessing.ipynb sampleid_mapping \
    --output-dir output/snatac_seq \
    --map-file data/rosmap_sample_mapping_data.csv \
    --meta-files data/snatac_seq/metadata_Mic_50nuc.csv \
    --count-files data/snatac_seq/pseudobulk_peaks_counts_Mic_50nuc.csv.gz


ERROR: Error in parse(text = input): <text>:1:5: unexpected symbol
1: sos run
        ^


## Step 1: Pseudobulk QC

Regresses out technical covariates for downstream QTL analysis. Works for both snATAC-seq and snRNA-seq.

### Input

| File | Description |
|------|-------------|
| `metadata_{celltype}.csv` | Sample-level metadata (nuclei counts, batch info) |
| `pseudobulk_*counts_{celltype}.csv.gz` | Pseudobulk count matrix |
| `tech_vars.csv` | Technical covariates (sampleid + tech var columns, pre-processed) |
| `hg38-blacklist.v2.bed.gz` *(snATAC-seq, optional)* | Blacklisted genomic regions |

### Process

1. Load count matrix and auto-detect modality (snATAC-seq vs snRNA-seq)
2. ***(Optional)*** Filter to specific genomic regions (snATAC-seq) or gene list (snRNA-seq)
3. Load metadata; filter samples with fewer than `min_nuclei` nuclei (default: 20)
4. Align samples between metadata and count matrix
5. ***(Optional)*** Filter blacklisted genomic regions (snATAC-seq only)
6. Merge tech vars from `tech_vars_file` by `sampleid` 
7. Drop samples with NA in any tech var
8. Apply expression filtering (`filterByExpr`):
   - `min_count = 5`: minimum reads in at least one sample
   - `min_total_count = 15`: minimum total reads across all samples
   - `min_prop = 0.1`: feature expressed in ≥10% of samples
9. TMM normalization
10. ***(Optional)*** Batch correction on `sequencingBatch`:
    - `limma::removeBatchEffect` (default)
    - `ComBat` (on log-CPM)
11. Add `sequencingBatch` and `Library` to model if present and multi-level
12. Fit linear model (`voom` + `lmFit` + `eBayes`) with **tech vars + batch vars only** 
13. Compute `offset + residuals` as final adjusted values:
    - `offset`: intercept + batch effects at reference level
    - `residuals`: variation after removing technical effects; biological signal retained
14. ***(Optional)*** Quantile normalization of final values

**Model formula:**
```
~ {tech_vars} + [sequencingBatch] + [Library]
```
> `sequencingBatch` and `Library` included only if present and have more than one level.
> Biological variables (`pmi`, `study`, `msex`, `age_death` etc.) are **not** included — they should not be regressed out as they may be associated with genotype.

### Parameters

| Parameter | Default | Description |
|-----------|---------|-------------|
| `meta_files` | *required* | Metadata CSV files (one per cell type) |
| `count_files` | *required* | Count CSV.gz files (one per cell type, same order as `meta_files`) |
| `output_dir` | *required* | Parent output directory; writes to `{output_dir}/2_residuals/{ct}/` |
| `tech_vars_file` | *required* | CSV with `sampleid` + tech var columns |
| `blacklist_file` | `''` | Genomic blacklist BED file (snATAC-seq only) |
| `regions` | `''` | Comma-separated genomic regions e.g. `chr7:28000000-28300000` (snATAC-seq) |
| `gene_list` | `''` | Comma-separated gene IDs e.g. `ENSG00000000010` (snRNA-seq) |
| `batch_correction` | `FALSE` | Apply batch correction (`TRUE`/`FALSE`) |
| `batch_method` | `limma` | Batch correction method (`limma` or `combat`) |
| `quant_norm` | `FALSE` | Apply quantile normalization after residuals |
| `min_count` | `5` | Min reads in at least one sample |
| `min_total_count` | `15` | Min total reads across all samples |
| `min_prop` | `0.1` | Min proportion of samples with expression |
| `min_nuclei` | `20` | Min nuclei per sample |

### Output

Output directory: `{output_dir}/2_residuals/{celltype}/`

| File | Description |
|------|-------------|
| `{celltype}_residuals.txt` | Tech-covariate-adjusted values (log2-CPM) |
| `{celltype}_residuals_qn.txt` | Quantile-normalized adjusted values *(if `quant_norm=TRUE`)* |
| `{celltype}_results.rds` | Full results: DGEList, fit, offset, residuals, design, parameters |
| `{celltype}_filtered_raw_counts.txt` | Filtered raw counts before normalization |

**Timing:** < 5 min per cell type

### Pseudobulk QC


In [None]:
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
    --meta-files output/snatac_seq/1_files_with_sampleid/metadata_Mic_50nuc.csv \
    --count-files output/snatac_seq/1_files_with_sampleid/pseudobulk_peaks_counts_Mic_50nuc.csv.gz  \
    --output-dir output/snatac_seq \
    --tech-vars-file data/snatac_seq/tech_vars_MIC.csv \
    --blacklist-file data/hg38-blacklist.v2.bed.gz #only for snATAC-seq

sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
    --meta-files output/snrna_seq/1_files_with_sampleid/metadata_MIC.csv \
    --count-files output/snrna_seq/1_files_with_sampleid/pseudobulk_counts_MIC.csv.gz  \
    --output-dir output/snrna_seq \
    --tech-vars-file data/snrna_seq/tech_vars_MIC.csv \
    --gene-list ENSG00000000010,ENSG00000000020 

### Additional parameters


In [None]:
--min-count 5
--min-total-count 15
--min-prop 0.1
--min-nuclei 20
--quant-norm TRUE
--batch-correction TRUE 
--batch-method combat # or limma
--gene-list ENSG00000000010,ENSG00000000020  # for snRNA-seq
--regions chr7:28000000-28300000  # for snATAC-seq

## Step 2: Phenotype Reformatting (snATAC-seq only)

Converts residuals into a QTL-ready BED format for genome-wide caQTL mapping.

> For snRNA-seq, please follow this [pipeline](https://github.com/StatFunGen/xqtl-protocol/blob/main/code/data_preprocessing/phenotype/phenotype_formatting.ipynb).

### Input

| File | Description |
|------|-------------|
| `{celltype}_residuals.txt` | Residuals from `pseudobulk_qc` |

### Process

1. Read residuals file with proper handling of feature IDs and sample columns
2. Parse peak coordinates from peak IDs (`chr-start-end` format)
3. Convert to midpoint coordinates (standard for QTLtools):
```
start = floor((peak_start + peak_end) / 2)
end   = start + 1
```
4. Build BED format: `#chr`, `start`, `end`, `ID` followed by per-sample values
5. Sort by chromosome and position
6. Compress with `bgzip` and index with `tabix`

### Parameters

| Parameter | Default | Description |
|-----------|---------|-------------|
| `residual_files` | *required* | Residual txt files from `pseudobulk_qc` |
| `output_dir` | *required* | Parent output directory; writes to `{output_dir}/3_pheno_reformat/` |

### Output

Output directory: `{output_dir}/3_pheno_reformat/`

| File | Description |
|------|-------------|
| `{celltype}_phenotype.bed.gz` | bgzip-compressed BED with midpoint coordinates |
| `{celltype}_phenotype.bed.gz.tbi` | tabix index for random-access queries |

Compatible with FastQTL, TensorQTL, and QTLtools.

**Timing:** < 1 min per cell type

In [1]:
sos run pipeline/pseudobulk_preprocessing.ipynb phenotype_formatting \
    --residual-files output/snatac_seq/2_residuals/Mic_50nuc/Mic_50nuc_residuals.txt \
    --output-dir output/snatac_seq

ERROR: Error in parse(text = input): <text>:1:5: unexpected symbol
1: sos run
        ^


## Command interface

In [None]:
sos run pipeline/pseudobulk_preprocessing.ipynb -h

## Setup and global parameters

In [None]:
[global]
parameter: cwd = path("output")
parameter: job_size = 1
parameter: walltime = "5h"
parameter: mem = "16G"
parameter: numThreads = 8
parameter: container = ""

import re
from sos.utils import expand_size

entrypoint = (
    'micromamba run -a "" -n' + ' ' +
    re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])
) if container else ""

cwd = path(f'{cwd:a}')

```
usage: sos run pipeline/pseudobulk_preprocessing.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  targets:              One or more targets to generate
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters
Workflows:
  sampleid_mapping
  pseudobulk_qc
  phenotype_formatting
Global Workflow Options:
  --cwd output (as path)
  --job-size 1 (as int)
  --walltime 5h
  --mem 16G
  --numThreads 8 (as int)
  --container ''
Sections
  sampleid_mapping:
    Workflow Options:
      --map-file VAL (as str, required)
      --output-dir VAL (as str, required)
      --meta-files  (as list)
      --count-files  (as list)
  pseudobulk_qc:
    Workflow Options:
      --meta-files  (as list)
      --count-files  (as list)
      --output-dir VAL (as str, required)
      --tech-vars-file VAL (as str, required)
      --blacklist-file ''
      --batch-correction FALSE
      --batch-method limma
      --quant-norm FALSE
      --min-count 5 (as int)
      --min-total-count 15 (as int)
      --min-prop 0.1 (as float)
      --min-nuclei 20 (as int)
      --regions ''
      --gene-list ''
  phenotype_formatting:
    Workflow Options:
      --residual-files  (as list)
      --output-dir VAL (as str, required)
```

## `sampleid_mapping`

In [None]:
[sampleid_mapping]
parameter: map_file    = str
parameter: output_dir  = str
parameter: meta_files  = []
parameter: count_files = []

import os

input:  meta_files + count_files
output: [f'{output_dir}/1_files_with_sampleid/{os.path.basename(f)}' for f in meta_files + count_files]
         
python: expand = "${ }"
import pandas as pd
import gzip
import os
import subprocess
import csv
import numpy as np
import tempfile

map_df = pd.read_csv("${map_file}")
id_map = dict(zip(map_df["individualID"], map_df["sampleid"]))
output_dir  = "${output_dir}/1_files_with_sampleid"
meta_files  = ${meta_files}
count_files = ${count_files}

os.makedirs(output_dir, exist_ok=True)

def map_id(ind_id):
    return id_map.get(ind_id, ind_id)

def format_value(val):
    if pd.isna(val):
        return ''
    if isinstance(val, (int, np.integer)):
        return str(val)
    if isinstance(val, (float, np.floating)):
        if val == int(val):
            return str(int(val))
        else:
            return str(val)
    return str(val)

# ── Process metadata ───────────────────────────────────────────────────────
for in_path in meta_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    meta = pd.read_csv(in_path)
    if "individualID" not in meta.columns:
        print(f"Warning: individualID column not found in {fname}, skipping.")
        continue
    meta["sampleid"] = meta["individualID"].map(map_id)
    cols = meta.columns.tolist()
    cols.remove("sampleid")
    cols.remove("individualID")
    meta = meta[["sampleid", "individualID"] + cols]
    with open(out_path, 'w', newline='') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        writer.writerow(meta.columns)
        for _, row in meta.iterrows():
            writer.writerow([format_value(val) for val in row])

# ── Process count files ────────────────────────────────────────────────────
for in_path in count_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    with gzip.open(in_path, "rt") as fh:
        header_line = fh.readline().rstrip("\n")
    col_names       = header_line.split(",")
    peak_id_col     = col_names[0]
    new_sample_cols = [map_id(s) for s in col_names[1:]]
    new_header      = ",".join([peak_id_col] + new_sample_cols)
    tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt')
    tmp.write(new_header + "\n")
    tmp.close()
    cmd = f"zcat {in_path} | tail -n +2 | cat {tmp.name} - | gzip -6 > {out_path}"
    subprocess.run(cmd, shell=True, check=True)
    os.unlink(tmp.name)

## `pseudobulk_qc`

In [None]:
[pseudobulk_qc]
parameter: meta_files       = []
parameter: count_files      = []
parameter: output_dir       = str
parameter: tech_vars_file   = str
parameter: blacklist_file   = ''
parameter: batch_correction = "FALSE"
parameter: batch_method     = "limma"
parameter: quant_norm       = "FALSE"
parameter: min_count        = 5
parameter: min_total_count  = 15
parameter: min_prop         = 0.1
parameter: min_nuclei       = 20
parameter: regions          = ''
parameter: gene_list        = ''

import os

_cts = [os.path.basename(f).replace('metadata_','').replace('.csv','') for f in meta_files]

input:  meta_files + count_files
output: [f'{output_dir}/2_residuals/{ct}/{ct}_residuals.txt' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '6:00:00', mem = '64G', cores = 4

R: expand = "${ }", stdout = f'{_output[0]:n}.stdout', stderr = f'{_output[0]:n}.stderr'

    library(edgeR)
    library(limma)
    library(data.table)
    library(GenomicRanges)
    if (as.logical("${batch_correction}") && "${batch_method}" == "combat") library(sva)

    # ── predictOffset ──────────────────────────────────────────────────────
    predictOffset <- function(fit, tech_vars) {
        D  <- fit$design
        Dm <- D
        for (col in colnames(D)) {
            if (col == "(Intercept)") next
            is_tech <- any(sapply(tech_vars, function(v) grepl(paste0("^", v), col)))
            if (is_tech) {
                if (is.numeric(D[, col]) && !all(D[, col] %in% c(0, 1)))
                    Dm[, col] <- median(D[, col], na.rm=TRUE)
                else
                    Dm[, col] <- 0
            } else {
                Dm[, col] <- 0
            }
        }
        B <- fit$coefficients
        B[is.na(B)] <- 0
        off <- B %*% t(Dm)
        colnames(off) <- rownames(fit$design)
        return(off)
    }

    filter_blacklist <- function(mat, bed, feat_label) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        bl <- fread(bed)[, 1:3]
        setnames(bl, c("chr","start","end"))
        bl[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        gr1 <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr2 <- GRanges(bl$chr,    IRanges(bl$start,    bl$end))
        blacklisted <- unique(queryHits(findOverlaps(gr1, gr2)))
        if (length(blacklisted) > 0) {
            message("Blacklisted ", feat_label, " removed: ", length(blacklisted))
            return(mat[-blacklisted, , drop=FALSE])
        }
        return(mat)
    }

    parse_regions <- function(region_str) {
        if (is.null(region_str) || region_str == "") return(NULL)
        lapply(strsplit(region_str, ",")[[1]], function(r) {
            parts <- strsplit(trimws(r), ":|−|-")[[1]]
            list(chr=parts[1], start=as.integer(parts[2]), end=as.integer(parts[3]))
        })
    }

    filter_regions <- function(mat, regions) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.integer(start), end = as.integer(end))]
        gr_peaks <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr_regions <- GRanges(
            sapply(regions, `[[`, "chr"),
            IRanges(sapply(regions, `[[`, "start"), sapply(regions, `[[`, "end"))
        )
        keep <- unique(queryHits(findOverlaps(gr_peaks, gr_regions)))
        if (length(keep) == 0) stop("No peaks overlap the specified regions.")
        message("Peaks after region filter: ", length(keep))
        mat[keep, , drop=FALSE]
    }

    meta_files  <- c(${','.join([f'"{f}"' for f in meta_files])})
    count_files <- c(${','.join([f'"{f}"' for f in count_files])})

    if (length(meta_files) != length(count_files))
        stop("meta_files and count_files must have the same length and order.")

    # ── Load tech vars from file ───────────────────────────────────────────
    tech_df   <- fread("${tech_vars_file}")
    tech_vars <- setdiff(colnames(tech_df), "sampleid")
    message("Tech vars: ", paste(tech_vars, collapse=", "))

    regions   <- parse_regions("${regions}")
    gene_list <- trimws(strsplit("${gene_list}", ",")[[1]])
    gene_list <- gene_list[gene_list != ""]

    for (i in seq_along(meta_files)) {
        meta_file   <- meta_files[i]
        counts_file <- count_files[i]
        ct          <- sub("\\.csv$", "", sub("^metadata_", "", basename(meta_file)))

        message("\n", paste(rep("=", 40), collapse=""))
        message("Processing: ", ct)
        message("Batch correction: ", ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"))
        message("Quantile normalization: ", as.logical("${quant_norm}"))
        message(paste(rep("=", 40), collapse=""))

        outdir <- file.path("${output_dir}/2_residuals", ct)
        dir.create(outdir, recursive=TRUE, showWarnings=FALSE)

        # ── 1. Load counts ─────────────────────────────────────────────────
        counts_raw       <- fread(counts_file)
        counts           <- as.matrix(counts_raw[, -1, with=FALSE])
        rownames(counts) <- counts_raw[[1]]
        rm(counts_raw)

        # ── Auto-detect modality ───────────────────────────────────────────
        is_atac    <- grepl("^chr.*-[0-9]+-[0-9]+$", rownames(counts)[1])
        feat_label <- ifelse(is_atac, "peaks", "genes")
        message("Modality: ", ifelse(is_atac, "snATAC-seq", "snRNA-seq"))
        message("Loaded: ", nrow(counts), " ", feat_label, " x ", ncol(counts), " samples")

        # ── 1b. Region/gene filtering (optional) ──────────────────────────
        if (is_atac && !is.null(regions)) {
            message("Filtering peaks to specified regions...")
            counts <- filter_regions(counts, regions)
        } else if (!is_atac && length(gene_list) > 0) {
            genes_present <- intersect(rownames(counts), gene_list)
            if (length(genes_present) == 0) stop("No matching genes found in count matrix.")
            message("Genes after gene_list filter: ", length(genes_present))
            counts <- counts[genes_present, , drop=FALSE]
        }

        # ── 2. Load metadata ───────────────────────────────────────────────
        meta  <- fread(meta_file)
        idcol <- intersect(c("sampleid","sampleID","individualID","projid"), colnames(meta))[1]
        if (is.na(idcol)) stop("Cannot find sample ID column in metadata.")

        # ── 3. Nuclei filter ──────────────────────────────────────────────
        n_nuclei_col <- intersect(c("n_nuclei","n.nuclei","nNuclei","nuclei_count"), colnames(meta))[1]
        if (!is.na(n_nuclei_col)) {
            meta <- meta[meta[[n_nuclei_col]] > ${min_nuclei}]
            message("Samples after nuclei (>${min_nuclei}) filter: ", nrow(meta))
        }

        # ── 4. Align samples ──────────────────────────────────────────────
        common <- intersect(meta[[idcol]], colnames(counts))
        if (length(common) == 0) stop("Zero sample overlap between metadata and count matrix.")
        counts <- counts[, common, drop=FALSE]
        message("Samples after alignment: ", length(common))

        # ── 5. Blacklist filtering ─────────────────────────────────────────
        if ("${blacklist_file}" != "" && file.exists("${blacklist_file}")) {
            counts <- filter_blacklist(counts, "${blacklist_file}", feat_label)
            message(feat_label, " after blacklist filter: ", nrow(counts))
        } else {
            message("No blacklist file - skipping.")
        }

        # ── 6. Merge tech vars by sampleid ────────────────────────────────
        tech_sub <- tech_df[tech_df$sampleid %in% common]
        tech_sub <- tech_sub[match(common, tech_sub$sampleid)]

        # ── 7. Drop samples with NA in tech vars ──────────────────────────
        keep_rows <- complete.cases(tech_sub[, ..tech_vars])
        tech_sub  <- tech_sub[keep_rows]
        counts    <- counts[, tech_sub$sampleid, drop=FALSE]
        message("Valid samples for modelling: ", nrow(tech_sub))

        # ── 8. Expression filtering ────────────────────────────────────────
        dge <- DGEList(counts=counts, samples=tech_sub)
        dge$samples$group <- factor(rep("all", ncol(dge)))
        message(feat_label, " before filter: ", nrow(dge))

        keep <- filterByExpr(dge, group=dge$samples$group,
                             min.count=${min_count},
                             min.total.count=${min_total_count},
                             min.prop=${min_prop})
        dge <- dge[keep,, keep.lib.sizes=FALSE]
        message(feat_label, " after filter: ", nrow(dge))

        write.table(dge$counts,
                    file.path(outdir, paste0(ct, "_filtered_raw_counts.txt")),
                    sep="\t", quote=FALSE, col.names=NA)

        # ── 9. TMM normalization ───────────────────────────────────────────
        dge <- calcNormFactors(dge, method="TMM")

        # ── 10. Optional batch correction ──────────────────────────────────
        if (as.logical("${batch_correction}") && "sequencingBatch" %in% colnames(dge$samples)) {
            batches       <- dge$samples$sequencingBatch
            batch_counts  <- table(batches)
            valid_batches <- names(batch_counts[batch_counts > 1])
            keep_bc       <- batches %in% valid_batches
            dge           <- dge[, keep_bc, keep.lib.sizes=FALSE]
            batches       <- batches[keep_bc]
            message("Samples after singleton batch removal: ", ncol(dge))

            if ("${batch_method}" == "combat") {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- ComBat(dat=logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("ComBat applied on log-CPM.")
            } else {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- removeBatchEffect(logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("limma removeBatchEffect applied.")
            }
        }

        # ── 11. Add batch vars to model if multi-level ────────────────────
        batch_vars <- c()
        if ("sequencingBatch" %in% colnames(dge$samples) &&
            length(unique(dge$samples$sequencingBatch)) > 1) {
            dge$samples$sequencingBatch_factor <- factor(dge$samples$sequencingBatch)
            batch_vars <- c(batch_vars, "sequencingBatch_factor")
        }
        if ("Library" %in% colnames(dge$samples) &&
            length(unique(dge$samples$Library)) > 1) {
            dge$samples$Library_factor <- factor(dge$samples$Library)
            batch_vars <- c(batch_vars, "Library_factor")
        }

        # ── 12. Build design matrix ────────────────────────────────────────
        all_model_vars <- intersect(c(tech_vars, batch_vars), colnames(dge$samples))
        form           <- as.formula(paste("~", paste(all_model_vars, collapse=" + ")))
        design         <- model.matrix(form, data=dge$samples)
        message("Formula: ", deparse(form))

        if (!is.fullrank(design)) {
            message("Design not full rank - trimming.")
            qr_d   <- qr(design)
            design <- design[, qr_d$pivot[seq_len(qr_d$rank)], drop=FALSE]
        }
        message("Design matrix: ", nrow(design), " x ", ncol(design))

        # ── 13. Voom + lmFit + eBayes ─────────────────────────────────────
        v   <- voom(dge, design, plot=FALSE)
        fit <- lmFit(v, design)
        fit <- eBayes(fit)

        # ── 14. Offset + residuals ─────────────────────────────────────────
        off   <- predictOffset(fit, tech_vars=tech_vars)
        res   <- residuals(fit, v$E)
        final <- off + res

        # ── 15. Save residuals ─────────────────────────────────────────────
        out_file <- file.path(outdir, paste0(ct, "_residuals.txt"))
        write.table(final, out_file, sep="\t", quote=FALSE, col.names=NA)
        message("Saved: ", out_file)
        message("  ", ifelse(is_atac,"Peaks","Genes"), ": ", nrow(final), " | Samples: ", ncol(final))

        # ── 16. Optional quantile normalization ───────────────────────────
        if (as.logical("${quant_norm}")) {
            final_qn <- t(apply(final, 1, rank, ties.method="average"))
            final_qn <- stats::qnorm(final_qn / (ncol(final_qn) + 1))
            qn_file  <- file.path(outdir, paste0(ct, "_residuals_qn.txt"))
            write.table(final_qn, qn_file, sep="\t", quote=FALSE, col.names=NA)
            message("Saved QN: ", qn_file)

            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final, final_data_qn=final_qn,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=TRUE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results_qn.rds")))
        } else {
            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=FALSE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results.rds")))
        }

        message("Completed: ", ct, " -> ", outdir)
    }

## `phenotype_reformatting`

In [2]:
[phenotype_formatting]
parameter: residual_files = []
parameter: output_dir     = str

import os

_cts = [os.path.basename(os.path.dirname(f)) for f in residual_files]

input:  residual_files
output: [f'{output_dir}/3_pheno_reformat/{ct}_phenotype.bed.gz' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '2:00:00', mem = '16G', cores = 2

python: expand = "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    import os
    import subprocess
    import pandas as pd

    residual_files = ${residual_files}
    output_dir     = "${output_dir}"

    def read_residuals(path):
        first_line = open(path).readline().rstrip("\n")
        col_names  = first_line.split("\t")
        df = pd.read_csv(path, sep="\t", header=None, skiprows=1)
        if df.shape[1] > len(col_names):
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names
        else:
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names[1:]
        return peak_ids, df

    def to_midpoint_bed(peak_ids, residuals):
        parts  = pd.Series(peak_ids).str.split("-", expand=True)
        chrs   = parts[0].values
        starts = parts[1].astype(int).values
        ends   = parts[2].astype(int).values
        mids   = ((starts + ends) // 2).astype(int)
        bed = pd.DataFrame({
            "#chr":  chrs,
            "start": mids,
            "end":   mids + 1,
            "ID":    peak_ids
        })
        bed = pd.concat([bed, residuals.reset_index(drop=True)], axis=1)
        return bed.sort_values(["#chr", "start"]).reset_index(drop=True)

    def run_cmd(cmd, label):
        r = subprocess.run(cmd, capture_output=True)
        if r.returncode != 0:
            print(f"WARNING: {label} failed: {r.stderr.decode()}")
        else:
            print(f"{label}: OK")

    out_dir = os.path.join(output_dir, "3_pheno_reformat")
    os.makedirs(out_dir, exist_ok=True)

    for res_path in residual_files:
        ct = os.path.basename(os.path.dirname(res_path))

        print(f"\n{'='*40}\nPhenotype Formatting: {ct}\n{'='*40}")

        if not os.path.exists(res_path):
            print(f"WARNING: {res_path} not found, skipping.")
            continue

        peak_ids, residuals = read_residuals(res_path)
        print(f"Loaded {len(peak_ids)} peaks x {residuals.shape[1]} samples")

        bed     = to_midpoint_bed(peak_ids, residuals)
        out_bed = os.path.join(out_dir, f"{ct}_phenotype.bed")
        bed.to_csv(out_bed, sep="\t", index=False, float_format="%.15f")
        print(f"Written: {out_bed}")

        run_cmd(["bgzip", "-f", out_bed],                "bgzip")
        run_cmd(["tabix", "-p", "bed", f"{out_bed}.gz"], "tabix")
        print(f"Completed: {ct} -> {out_dir}")

ERROR: Error in parse(text = input): <text>:1:1: unexpected '['
1: [
    ^
