# Zoonomia bHLH Mapping

This notebook filters Zoonomia alignments for bHLH genes, maps human bHLH domain coordinates to orthologous sequences, and compares with Ensembl/TOGA annotations.

**Inputs**
- `data/raw/Zoonomia_protaln/*.fa`
- `data/intermediate/bHLH_StartEnd_withISO.csv`
- `data/intermediate/orthologs/TOGA_orthologs_allSpecies.csv`
- `data/intermediate/orthologs/annotated_bHLH_merged_data_with_gene_names.csv`

**Outputs**
- `data/intermediate/zoonomia/Zoonomia_Start_End_final.csv`
- `outputs/reports/report_comparison.csv`

**Note**: Set `BHLH_PROJECT_ROOT` if running from a different working directory.


In [None]:
import os
from pathlib import Path
from collections import defaultdict
import pandas as pd
from Bio import SeqIO
import re
import csv

project_root = Path(__import__("os").getenv("BHLH_PROJECT_ROOT", ".")).resolve()

def p(*parts):
    return str(project_root.joinpath(*parts))


## 1) Filter Zoonomia alignments for bHLH genes

In [None]:
input_dir = Path(p("data", "raw", "Zoonomia_protaln"))
filtered_dir = input_dir / "filtered_alignments"
filtered_dir.mkdir(exist_ok=True)

domain_csv = Path(p("data", "intermediate", "bHLH_StartEnd_withISO.csv"))
toga_file = Path(p("data", "intermediate", "orthologs", "TOGA_orthologs_allSpecies.csv"))
output_file = Path(p("data", "intermediate", "zoonomia", "Zoonomia_Start_End_final.csv"))
output_file.parent.mkdir(parents=True, exist_ok=True)

gene_symbols = [
    "TFAP4", "MLX", "MLXIPL", "MLXIP", "TCFL5", "SOHLH1", "SOHLH2", "MYC", "MYCN", "MYCL", "MAX", "MNT",
    "MXD3", "MXD4", "MXI1", "MXD1", "SREBF2", "SREBF1", "MITF", "TFE3", "TFEC", "TFEB", "USF3", "USF2", "USF1",
    "NCOA1", "NCOA2", "NCOA3", "NPAS2", "CLOCK", "ARNTL2", "ARNTL", "ARNT2", "ARNT", "NPAS4", "AHRR", "AHR",
    "SIM2", "SIM1", "NPAS3", "NPAS1", "HIF1A", "HIF3A", "EPAS1", "HELT", "BHLHE41", "BHLHE40", "HEYL", "HEY2",
    "HEY1", "HES7", "HES6", "HES5", "HES3", "HES2", "HES4", "HES1", "ATOH8", "TCF4", "TCF3", "TCF12", "MYOG",
    "MYF6", "MYOD1", "MYF5", "FIGLA", "ID1", "ID4", "ID3", "ID2", "ASCL2", "ASCL1", "ASCL4", "ASCL5", "ASCL3",
    "TAL1", "LYL1", "TAL2", "NHLH2", "NHLH1", "MESP2", "MSGN1", "MESP1", "PTF1A", "FERD3L", "ATOH7", "ATOH1",
    "BHLHA9", "BHLHA15", "BHLHE23", "BHLHE22", "OLIG1", "OLIG3", "OLIG2", "NEUROG2", "NEUROG3", "NEUROG1",
    "NEUROD2", "NEUROD6", "NEUROD4", "NEUROD1", "TCF21", "MSC", "TCF24", "TCF23", "TWIST2", "TWIST1", "HAND2",
    "HAND1", "TCF15", "SCX"
]

species_map = {
    "Mus_protaln": "mus_musculus",
    "Opossum_protaln": "monodelphis_domestica",
    "Macaca_protaln": "macaca_mulatta",
    "Bos_protaln": "bos_taurus",
    "Rat_protaln": "rattus_norvegicus",
    "Canis_protaln": "canis_lupus_familiaris",
    "Chimp_protaln": "pan_troglodytes",
    "Gorilla_protaln": "gorilla_gorilla",
}

# Load TOGA transcripts

df_toga = pd.read_csv(toga_file)
df_toga["t_transcript_clean"] = df_toga["t_transcript"].str.split(".").str[0]
toga_transcripts = set(df_toga["t_transcript_clean"])

# Filter fasta files

def extract_gene(header: str) -> str | None:
    parts = header.split(".")
    if len(parts) > 1:
        return parts[1].split()[0]
    return None

for file_path in input_dir.glob("*.fa"):
    with open(file_path) as f:
        lines = f.readlines()

    filtered_lines = []
    keep = False

    for line in lines:
        if line.startswith(">"):
            gene = extract_gene(line)
            header_transcript = line.split()[0].lstrip(">").split(".")[0]
            keep = gene in gene_symbols and header_transcript in toga_transcripts
        if keep:
            filtered_lines.append(line)

    if filtered_lines:
        out_file = filtered_dir / file_path.name
        with open(out_file, "w") as fo:
            fo.writelines(filtered_lines)

print("Fasta filtering completed")


## 2) Load human bHLH domain coordinates

In [None]:
df_domain = pd.read_csv(domain_csv, dtype=str)
df_domain.columns = [c.strip() for c in df_domain.columns]

domain_info = {}
for _, r in df_domain.iterrows():
    enst = r["ensembl_transcript_id"].strip()
    try:
        start = int(float(r["interpro_start"]))
        end = int(float(r["interpro_end"]))
    except Exception:
        continue
    domain_info[enst] = {
        "HGNC": r["HGNC symbol"].strip(),
        "human_start": start,
        "human_end": end,
    }


## 3) Map human domain positions to aligned orthologs

In [None]:
def map_domain(ref_seq: str, query_seq: str, domain_start: int, domain_end: int):
    if len(ref_seq) != len(query_seq):
        raise ValueError("Alignment length mismatch")
    human_pos = 0
    query_pos = 0
    q_start = None
    q_end = None
    for r, q in zip(ref_seq, query_seq):
        if r != "-":
            human_pos += 1
        if q != "-":
            query_pos += 1
        if human_pos == domain_start and q_start is None:
            q_start = query_pos
        if human_pos == domain_end and q_end is None:
            q_end = query_pos
            break
    return q_start, q_end

results = []
header_re = re.compile(r"^>?(.+) \| PROT \| (REFERENCE|QUERY)")

for filepath in filtered_dir.glob("*.fa"):
    raw_species = filepath.stem
    species = species_map.get(raw_species, raw_species)

    seqs_by_transcript = defaultdict(dict)
    for rec in SeqIO.parse(filepath, "fasta"):
        desc = rec.description
        m = header_re.search(desc)
        if not m:
            continue
        enst = m.group(1).split()[0]
        label = m.group(2).upper()
        seqs_by_transcript[enst][label] = str(rec.seq)

    for enst, seqs in seqs_by_transcript.items():
        enst_key = enst.split(".")[0]
        if enst_key not in domain_info:
            continue
        if "REFERENCE" not in seqs or "QUERY" not in seqs:
            continue
        ref_seq = seqs["REFERENCE"]
        query_seq = seqs["QUERY"]
        info = domain_info[enst_key]
        try:
            q_start, q_end = map_domain(ref_seq, query_seq, info["human_start"], info["human_end"])
        except Exception:
            q_start, q_end = None, None

        results.append({
            "HGNC": info["HGNC"],
            "ENST": enst_key,
            "full_enst_label": enst,
            "target_species": species,
            "human_start": info["human_start"],
            "human_end": info["human_end"],
            "query_start": q_start,
            "query_end": q_end,
        })

df_zoo = pd.DataFrame(results)


## 4) Merge with TOGA and export

In [None]:
df_toga_subset = df_toga[["t_transcript_clean", "homology_type", "HGNC symbol", "target_species"]].drop_duplicates()

df_final = df_zoo.merge(
    df_toga_subset,
    left_on=["ENST", "target_species"],
    right_on=["t_transcript_clean", "target_species"],
    how="left",
)

df_final = df_final.rename(columns={"HGNC symbol": "HGNC_final"})

final_cols = [
    "HGNC", "ENST", "full_enst_label", "target_species",
    "human_start", "human_end", "query_start", "query_end", "homology_type"
]

df_final = df_final[final_cols]

df_final.to_csv(output_file, index=False)
print(f"Saved: {output_file}")
print(f"Rows: {len(df_final)}, species: {df_final['target_species'].nunique()}, genes: {df_final['HGNC'].nunique()}")


## 5) Quality controls (optional)

In [None]:
# df_final = pd.read_csv(output_file)

null_coords = df_final[df_final['query_start'].isna() | df_final['query_end'].isna()]
null_counts = null_coords.groupby('target_species').size().reset_index(name='null_count')

print("Entries with null query_start or query_end per species:")
print(null_counts.to_string(index=False))

valid_coords = df_final[df_final['query_start'].notna() & df_final['query_end'].notna()]

gene_species_valid = valid_coords.groupby(['HGNC', 'target_species']).size().reset_index(name='count')

missing_genes = []
for gene in df_final['HGNC'].unique():
    for sp in df_final['target_species'].unique():
        if not ((gene_species_valid['HGNC'] == gene) & (gene_species_valid['target_species'] == sp)).any():
            missing_genes.append({'HGNC': gene, 'target_species': sp})

if missing_genes:
    print(f"Missing valid entries (sample): {len(missing_genes)}")
    print(pd.DataFrame(missing_genes).head(20))
else:
    print("All genes have at least one valid entry per species.")


## 6) Compare with Ensembl (optional)

In [None]:
zoo_file = p("data", "intermediate", "zoonomia", "Zoonomia_Start_End_final.csv")
annot_file = p("data", "intermediate", "orthologs", "annotated_bHLH_merged_data_with_gene_names.csv")
output_report = p("outputs", "reports", "report_comparison.csv")
Path(output_report).parent.mkdir(parents=True, exist_ok=True)

df_zoo = pd.read_csv(zoo_file)
df_annot = pd.read_csv(annot_file)

df_zoo_filtered = df_zoo[(df_zoo["query_start"] > 0) & (df_zoo["query_end"] > 0)].copy()

df_compare = df_zoo_filtered.merge(
    df_annot,
    left_on=['HGNC', 'target_species'],
    right_on=['HGNC symbol', 'target_species_name'],
    how='outer',
    suffixes=('_zoo', '_annot')
)

df_compare = df_compare.drop(columns=['HGNC symbol', 'target_species_name'], errors='ignore')

df_compare["target_species"] = df_compare["target_species_zoo"].fillna(df_compare["target_species_annot"])

df_compare = df_compare.drop(columns=['target_species_zoo', 'target_species_annot'], errors='ignore')

df_compare['diff_start'] = df_compare['query_start'].fillna(0) - df_compare['Start_Q'].fillna(0)
df_compare['diff_end'] = df_compare['query_end'].fillna(0) - df_compare['Stop_Q'].fillna(0)

homology_map = {
    'ortholog_one2one': 'one2one',
    'ortholog_one2many': 'one2many',
    'ortholog_many2many': 'many2many'
}

df_compare['homology_type_annot_mapped'] = df_compare['homology_type_annot'].map(homology_map)

df_compare['homology_match'] = (
    df_compare['homology_type_zoo'] == df_compare['homology_type_annot_mapped']
)

df_compare['present_in_zoo'] = df_compare['query_start'].notna()
df_compare['present_in_annot'] = df_compare['Start_Q'].notna()

report_cols = [
    'HGNC', 'ENST', 'target_species',
    'query_start', 'query_end',
    'Start_Q', 'Stop_Q',
    'diff_start', 'diff_end',
    'homology_type_zoo', 'homology_type_annot_mapped', 'homology_match',
    'present_in_zoo', 'present_in_annot'
]

df_report = df_compare[report_cols].sort_values(['HGNC', 'target_species'])

df_report.to_csv(output_report, index=False)
print(f"Saved report: {output_report}")


## Exploratory notes (optional)

- Zoonomia alignments include both REFERENCE and QUERY sequences; only pairs with both are used.
- The TOGA transcript IDs are normalized by removing version suffixes (e.g., `ENST...1.2` â†’ `ENST...1`).
- If column naming is inconsistent across sources, prefer standardizing to `HGNC`, `ENST`, and `target_species` before merging.
