<a href="https://colab.research.google.com/github/Enso-bio/Neuroimmune-Synchronization-Across-the-Sleep-Wake-Cycl/blob/main/T_Info_Necrpot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Full-automatic necroptosis information analysis
# Works in Google Colab or locally (Python 3.9+)

import os, re, json, gzip, textwrap, math, warnings
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Suprimir advertencias de Matplotlib
warnings.filterwarnings("ignore", category=UserWarning)

BASE_DIR = os.getcwd()
OUT_DIR = os.path.join(BASE_DIR, "necroptosis_information")
DATA_DIR = os.path.join(OUT_DIR, "data_fasta")
FIG_DIR  = os.path.join(OUT_DIR, "figures")
TAB_DIR  = os.path.join(OUT_DIR, "tables")
TEX_DIR  = os.path.join(OUT_DIR, "manuscript")

for d in [OUT_DIR, DATA_DIR, FIG_DIR, TAB_DIR, TEX_DIR]:
    os.makedirs(d, exist_ok=True)

print("Output folder:", OUT_DIR)
print("Directories created successfully.")

# Species (UniProt taxonomy IDs)
SPECIES = {
    "Homo_sapiens": 9606,
    "Mus_musculus": 10090,
    "Danio_rerio": 7955,
}

# Protein panel: necroptosis core + controls
# gene: primary gene symbol used in UniProt query
# role: label for grouping
PROTEINS = [
    {"gene": "MLKL",     "role": "necroptosis"},
    {"gene": "RIPK1",    "role": "necroptosis"},
    {"gene": "RIPK3",    "role": "necroptosis"},
    {"gene": "FADD",     "role": "apoptosis_control"},
    {"gene": "CASP8",    "role": "apoptosis_control"},
    {"gene": "CASP3",    "role": "apoptosis_control"},
    {"gene": "ACTB",     "role": "housekeeping_control"},
    {"gene": "GAPDH",    "role": "housekeeping_control"},
]

UNIPROT_SEARCH_URL = "https://rest.uniprot.org/uniprotkb/search"
UNIPROT_FASTA_URL  = "https://rest.uniprot.org/uniprotkb/{}.fasta"

HEADERS = {"User-Agent": "necroptosis-info-paper/1.0", "Accept": "application/json"}

def uniprot_search_best_accession(gene: str, taxid: int) -> Optional[Dict]:
    """
    Returns best UniProt entry dict with accession for a gene in a species.
    Prefers reviewed: true. If none, falls back to unreviewed.
    """
    def _query(reviewed: bool):
        q = f'(gene_exact:{gene} AND organism_id:{taxid}'
        q += f' AND reviewed:{str(reviewed).lower()})'
        params = {
            "query": q,
            "format": "json",
            "size": 5,
            "fields": "accession,id,protein_name,gene_primary,organism_name,reviewed,length"
        }
        try:
            r = requests.get(UNIPROT_SEARCH_URL, params=params, headers=HEADERS, timeout=30)
            r.raise_for_status()
            data = r.json()
            results = data.get("results", [])
            return results
        except requests.exceptions.RequestException as e:
            print(f"[ERROR] Query failed for {gene} taxid {taxid}: {e}")
            return []
        except json.JSONDecodeError as e:
            print(f"[ERROR] JSON decode failed for {gene} taxid {taxid}: {e}")
            return []

    # try reviewed first
    res = _query(True)
    if not res:
        print(f"[INFO] No reviewed entries found for {gene} in taxid {taxid}, trying unreviewed...")
        res = _query(False)
        if not res:
            return None

    # choose: exact primary gene match if available, else longest
    def get_primary_gene(entry):
        genes = entry.get("genes", [])
        if genes and "geneName" in genes[0] and "value" in genes[0]["geneName"]:
            return genes[0]["geneName"]["value"]
        return None

    exact = [e for e in res if (get_primary_gene(e) or "").upper() == gene.upper()]
    candidates = exact if exact else res

    if not candidates:
        return None

    # pick longest sequence
    def get_len(entry):
        return entry.get("sequence", {}).get("length", 0)

    best = sorted(candidates, key=get_len, reverse=True)[0]
    return best

def search_with_aliases(gene: str, taxid: int) -> Optional[Dict]:
    """Intenta con alias si la búsqueda principal falla."""
    result = uniprot_search_best_accession(gene, taxid)
    if result:
        return result

    # Diccionario de alias comunes por gen
    GENE_ALIASES = {
        "ACTB": ["ACT", "ACTB1", "ACTG", "beta-actin"],
        "GAPDH": ["GAPD", "G3PD", "GAPDHS"],
        "MLKL": ["MLKL_HUMAN", "MLKL_MOUSE"],
        "RIPK1": ["RIP", "RIP1", "RIP-1"],
        "RIPK3": ["RIP3", "RIP-3"],
        "FADD": ["MORT1"],
        "CASP8": ["CASP-8", "MACH", "FLICE"],
        "CASP3": ["CASP-3", "CPP32", "YAMA"],
    }

    aliases = GENE_ALIASES.get(gene, [])
    for alias in aliases:
        if alias != gene:
            print(f"[INFO] Trying alias '{alias}' for gene {gene}")
            result = uniprot_search_best_accession(alias, taxid)
            if result:
                print(f"[SUCCESS] Found {gene} as alias '{alias}'")
                return result

    # Si es Danio rerio, intentar con nombres de ortólogos
    if taxid == 7955:  # Danio rerio
        ZEBRAFISH_ORTHOLOGS = {
            "ACTB": ["actb1", "actb2", "beta-actin"],
            "MLKL": ["mlkl", "ripk4"],  # mlkl puede no existir, ripk4 es un pariente
            "RIPK1": ["ripk1a", "ripk1b", "ripk1"],
        }
        fish_aliases = ZEBRAFISH_ORTHOLOGS.get(gene, [])
        for alias in fish_aliases:
            print(f"[INFO] Trying zebrafish ortholog '{alias}' for {gene}")
            result = uniprot_search_best_accession(alias, taxid)
            if result:
                print(f"[SUCCESS] Found zebrafish ortholog '{alias}' for {gene}")
                return result

    return None

def fetch_fasta(accession: str) -> str:
    try:
        r = requests.get(UNIPROT_FASTA_URL.format(accession), timeout=30)
        r.raise_for_status()
        return r.text
    except requests.exceptions.RequestException as e:
        print(f"[ERROR] Failed to fetch FASTA for {accession}: {e}")
        return ""

def parse_fasta(fasta_text: str) -> Tuple[str, str]:
    if not fasta_text:
        return "", ""
    lines = [l.strip() for l in fasta_text.splitlines() if l.strip()]
    if not lines:
        return "", ""
    header = lines[0]
    seq = "".join(lines[1:]).replace(" ", "").upper()
    seq = re.sub(r"[^ACDEFGHIKLMNPQRSTVWY]", "", seq)  # keep standard AA
    return header, seq

print("\n" + "="*60)
print("FETCHING DATA FROM UNIPROT")
print("="*60)

records = []
missing_proteins = []

for sp_name, taxid in SPECIES.items():
    print(f"\nProcessing species: {sp_name} (taxid: {taxid})")
    print("-" * 40)

    for p in PROTEINS:
        gene = p["gene"]
        role = p["role"]

        print(f"  Searching for: {gene} ({role})...", end=" ")

        # Usar la función mejorada con alias
        best = search_with_aliases(gene, taxid)

        if best is None:
            print(f"NOT FOUND")
            missing_proteins.append(f"{gene} in {sp_name}")
            continue

        acc = best["primaryAccession"]
        reviewed = best.get("entryType", "") == "UniProtKB reviewed (Swiss-Prot)"
        org = best.get("organism", {}).get("scientificName", sp_name.replace("_"," "))
        prot_name = best.get("proteinDescription", {}).get("recommendedName", {}).get("fullName", {}).get("value", "")

        print(f"FOUND: {acc} {'(reviewed)' if reviewed else '(unreviewed)'}")

        fasta = fetch_fasta(acc)
        if not fasta:
            print(f"    [WARN] Failed to fetch FASTA for {acc}")
            continue

        header, seq = parse_fasta(fasta)

        if not seq:
            print(f"    [WARN] Empty sequence for {acc}")
            continue

        fname = f"{gene}__{sp_name}__{acc}.fasta"
        fpath = os.path.join(DATA_DIR, fname)
        with open(fpath, "w", encoding="utf-8") as f:
            f.write(fasta)

        records.append({
            "gene": gene,
            "role": role,
            "species": sp_name,
            "taxid": taxid,
            "accession": acc,
            "organism": org,
            "protein_name": prot_name,
            "length": len(seq),
            "fasta_file": fpath
        })

if missing_proteins:
    print(f"\n[SUMMARY] Missing proteins: {len(missing_proteins)}")
    for mp in missing_proteins:
        print(f"  - {mp}")
else:
    print("\n[SUMMARY] All proteins found successfully!")

if not records:
    print("\n[ERROR] No records found! Exiting.")
    exit(1)

df_meta = pd.DataFrame(records).sort_values(["role","gene","species"])
df_meta.to_csv(os.path.join(TAB_DIR, "uniprot_metadata.csv"), index=False)

print(f"\nMetadata saved: {len(df_meta)} records")
print(df_meta[['gene', 'species', 'accession', 'length']].head(10))

AA = list("ACDEFGHIKLMNPQRSTVWY")
AA_TO_I = {a:i for i,a in enumerate(AA)}

def shannon_entropy(seq: str) -> float:
    if not seq:
        return np.nan
    counts = np.zeros(len(AA), dtype=float)
    for ch in seq:
        if ch in AA_TO_I:
            counts[AA_TO_I[ch]] += 1
    p = counts / counts.sum() if counts.sum() > 0 else counts
    p = p[p > 0]
    return float(-(p * np.log2(p)).sum()) if len(p) else 0.0

def sliding_entropy(seq: str, w: int = 50, step: int = 5) -> Tuple[np.ndarray, np.ndarray]:
    xs, hs = [], []
    for start in range(0, max(1, len(seq) - w + 1), step):
        window = seq[start:start+w]
        xs.append(start + w/2)
        hs.append(shannon_entropy(window))
    return np.array(xs), np.array(hs)

def gzip_compress_ratio(seq: str) -> float:
    if not seq:
        return np.nan
    try:
        raw = seq.encode("utf-8")
        comp = gzip.compress(raw, compresslevel=9)
        return len(comp) / max(1, len(raw))
    except Exception as e:
        print(f"[ERROR] gzip compression failed: {e}")
        return np.nan

def split_halves(seq: str) -> Tuple[str, str]:
    mid = len(seq)//2
    return seq[:mid], seq[mid:]

def load_seq_from_fasta(path: str) -> str:
    try:
        with open(path, "r", encoding="utf-8") as f:
            header, seq = parse_fasta(f.read())
        return seq
    except Exception as e:
        print(f"[ERROR] Failed to load FASTA from {path}: {e}")
        return ""

print("\n" + "="*60)
print("CALCULATING INFORMATION METRICS")
print("="*60)

rows = []
W = 50
STEP = 5

for idx, r in df_meta.iterrows():
    print(f"Processing: {r['gene']} ({r['species']})...", end=" ")
    seq = load_seq_from_fasta(r["fasta_file"])

    if not seq:
        print(f"SKIPPED (empty sequence)")
        continue

    n_half, c_half = split_halves(seq)

    ent_full = shannon_entropy(seq)
    ent_n = shannon_entropy(n_half)
    ent_c = shannon_entropy(c_half)

    gz_full = gzip_compress_ratio(seq)
    gz_n = gzip_compress_ratio(n_half)
    gz_c = gzip_compress_ratio(c_half)

    rows.append({
        "role": r["role"],
        "gene": r["gene"],
        "species": r["species"],
        "accession": r["accession"],
        "length": len(seq),
        "entropy_full": ent_full,
        "entropy_Nhalf": ent_n,
        "entropy_Chalf": ent_c,
        "gzip_ratio_full": gz_full,
        "gzip_ratio_Nhalf": gz_n,
        "gzip_ratio_Chalf": gz_c,
        "delta_entropy_NminusC": ent_n - ent_c,
        "delta_gzip_NminusC": gz_n - gz_c
    })
    print(f"DONE")

df = pd.DataFrame(rows).sort_values(["role","gene","species"])
df.to_csv(os.path.join(TAB_DIR, "main_metrics.csv"), index=False)

print(f"\nMetrics calculated for {len(df)} sequences")
print(df[['gene', 'species', 'entropy_full', 'gzip_ratio_full']].head(10))

print("\n" + "="*60)
print("GENERATING FIGURES")
print("="*60)

# Figura 1: Entropía por rol
print("Generating Figure 1...")
roles = df["role"].unique().tolist()
data = [df.loc[df["role"]==role, "entropy_full"].dropna().values for role in roles]

plt.figure(figsize=(8,4.5))
box = plt.boxplot(data, showfliers=False)
plt.xticks(range(1, len(roles)+1), roles, rotation=20, ha="right")
plt.ylabel("Shannon entropy (bits)")
plt.title("Global sequence entropy by functional group")
plt.grid(True, alpha=0.3, linestyle='--')
plt.tight_layout()
out = os.path.join(FIG_DIR, "Fig1_entropy_by_role.png")
plt.savefig(out, dpi=300)
plt.close()
print(f"  Saved: {out}")

# Figura 2: Compresibilidad por rol
print("Generating Figure 2...")
data = [df.loc[df["role"]==role, "gzip_ratio_full"].dropna().values for role in roles]

plt.figure(figsize=(8,4.5))
box = plt.boxplot(data, showfliers=False)
plt.xticks(range(1, len(roles)+1), roles, rotation=20, ha="right")
plt.ylabel("gzip compressed size / raw size")
plt.title("Algorithmic compressibility by functional group")
plt.grid(True, alpha=0.3, linestyle='--')
plt.tight_layout()
out = os.path.join(FIG_DIR, "Fig2_gzip_by_role.png")
plt.savefig(out, dpi=300)
plt.close()
print(f"  Saved: {out}")

# Figura 3: Perfiles de ventana deslizante
print("Generating Figure 3...")
def plot_sliding_profiles(species="Homo_sapiens", genes=("MLKL","RIPK3","CASP3","ACTB"), w=W, step=STEP):
    plt.figure(figsize=(9,4.8))
    colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']

    for idx, g in enumerate(genes):
        sub = df_meta[(df_meta["species"]==species) & (df_meta["gene"]==g)]
        if sub.empty:
            print(f"  [WARN] {g} not found in {species}, skipping...")
            continue
        fpath = sub.iloc[0]["fasta_file"]
        seq = load_seq_from_fasta(fpath)
        if seq:
            x, h = sliding_entropy(seq, w=w, step=step)
            plt.plot(x, h, label=f"{g} ({species})", color=colors[idx % len(colors)], linewidth=2)

    plt.xlabel("Position (aa)")
    plt.ylabel(f"Shannon entropy in window (w={w})")
    plt.title("Sliding-window entropy profiles")
    plt.legend()
    plt.grid(True, alpha=0.3, linestyle='--')
    plt.tight_layout()
    out = os.path.join(FIG_DIR, "Fig3_sliding_entropy_profiles.png")
    plt.savefig(out, dpi=300)
    plt.close()
    return out

fig3_path = plot_sliding_profiles()
print(f"  Saved: {fig3_path}")

# Figura 4: Comparación N vs C terminal
print("Generating Figure 4...")
sub = df.copy()
plt.figure(figsize=(7.5,5))

# Colores por rol
role_colors = {
    "necroptosis": "#1f77b4",
    "apoptosis_control": "#ff7f0e",
    "housekeeping_control": "#2ca02c"
}

for role in sub["role"].unique():
    mask = sub["role"] == role
    plt.scatter(sub.loc[mask, "entropy_Nhalf"],
                sub.loc[mask, "entropy_Chalf"],
                label=role,
                color=role_colors.get(role, "#000000"),
                alpha=0.7, s=80)

plt.xlabel("Entropy N-half (bits)")
plt.ylabel("Entropy C-half (bits)")
plt.title("N-terminal vs C-terminal sequence entropy")

# Línea de identidad
mn = np.nanmin([sub["entropy_Nhalf"].min(), sub["entropy_Chalf"].min()]) - 0.1
mx = np.nanmax([sub["entropy_Nhalf"].max(), sub["entropy_Chalf"].max()]) + 0.1
plt.plot([mn,mx], [mn,mx], linestyle="--", color="gray", alpha=0.7, label="y=x")

plt.legend()
plt.grid(True, alpha=0.3, linestyle='--')
plt.tight_layout()
out = os.path.join(FIG_DIR, "Fig4_N_vs_C_entropy.png")
plt.savefig(out, dpi=300)
plt.close()
print(f"  Saved: {out}")

print("\n" + "="*60)
print("SUMMARY STATISTICS")
print("="*60)

# Estadísticas descriptivas
print("\nEntropy statistics by role:")
entropy_stats = df.groupby("role")["entropy_full"].agg(['mean', 'std', 'min', 'max']).round(3)
print(entropy_stats)

print("\nGzip ratio statistics by role:")
gzip_stats = df.groupby("role")["gzip_ratio_full"].agg(['mean', 'std', 'min', 'max']).round(3)
print(gzip_stats)

# Generar reporte de resumen
print("\n" + "="*60)
print("ANALYSIS COMPLETE")
print("="*60)

print(f"\nTotal sequences analyzed: {len(df)}")
print(f"Total species: {len(df['species'].unique())}")
print(f"Total genes: {len(df['gene'].unique())}")

print(f"\nOutput directory: {OUT_DIR}")
print(f"  FASTA files: {len(os.listdir(DATA_DIR)) if os.path.exists(DATA_DIR) else 0}")
print(f"  Figures: {len(os.listdir(FIG_DIR)) if os.path.exists(FIG_DIR) else 0}")
print(f"  Tables: {len(os.listdir(TAB_DIR)) if os.path.exists(TAB_DIR) else 0}")

print(f"\nMain output files:")
print(f"  - {os.path.join(TAB_DIR, 'uniprot_metadata.csv')}")
print(f"  - {os.path.join(TAB_DIR, 'main_metrics.csv')}")
print(f"  - {os.path.join(FIG_DIR, 'Fig1_entropy_by_role.png')}")
print(f"  - {os.path.join(FIG_DIR, 'Fig2_gzip_by_role.png')}")
print(f"  - {os.path.join(FIG_DIR, 'Fig3_sliding_entropy_profiles.png')}")
print(f"  - {os.path.join(FIG_DIR, 'Fig4_N_vs_C_entropy.png')}")

# Guardar resumen en archivo
summary_path = os.path.join(OUT_DIR, "analysis_summary.txt")
with open(summary_path, "w") as f:
    f.write("NECROPTOSIS INFORMATION ANALYSIS - SUMMARY REPORT\n")
    f.write("="*60 + "\n\n")
    f.write(f"Total sequences: {len(df)}\n")
    f.write(f"Species analyzed: {', '.join(sorted(df['species'].unique()))}\n")
    f.write(f"Genes analyzed: {', '.join(sorted(df['gene'].unique()))}\n\n")

    f.write("Missing proteins:\n")
    for mp in missing_proteins:
        f.write(f"  - {mp}\n")

    f.write("\nStatistics by role:\n")
    f.write("\nEntropy (bits):\n")
    f.write(entropy_stats.to_string())
    f.write("\n\nGzip ratio:\n")
    f.write(gzip_stats.to_string())

print(f"\nSummary saved to: {summary_path}")
print("\nAnalysis completed successfully!")


Output folder: /content/necroptosis_information
Directories created successfully.

FETCHING DATA FROM UNIPROT

Processing species: Homo_sapiens (taxid: 9606)
----------------------------------------
  Searching for: MLKL (necroptosis)... FOUND: Q8NB16 (reviewed)
  Searching for: RIPK1 (necroptosis)... FOUND: Q13546 (reviewed)
  Searching for: RIPK3 (necroptosis)... FOUND: Q9Y572 (reviewed)
  Searching for: FADD (apoptosis_control)... FOUND: Q13158 (reviewed)
  Searching for: CASP8 (apoptosis_control)... FOUND: Q14790 (reviewed)
  Searching for: CASP3 (apoptosis_control)... FOUND: P42574 (reviewed)
  Searching for: ACTB (housekeeping_control)... FOUND: P60709 (reviewed)
  Searching for: GAPDH (housekeeping_control)... FOUND: P04406 (reviewed)

Processing species: Mus_musculus (taxid: 10090)
----------------------------------------
  Searching for: MLKL (necroptosis)... FOUND: Q9D2Y4 (reviewed)
  Searching for: RIPK1 (necroptosis)... FOUND: Q60855 (reviewed)
  Searching for: RIPK3 (necro