# Generating temp files

In [None]:
import pandas as pd
import os

def merge_and_enrich_trim_data(
    trim_file_path="data/TRIM_Ded1_Dataset.csv", 
    pars_file_path="data/denome_wide_PARS_dataset.csv", 
    output_file_path="data/TRIM_Ded1_Dataset_Enhanced.csv"
):
    """
    1. Load TRIM and PARS Dataset.
    2. Merge 'Max30Pos' column in PARS Dataest with TRIM Dataset(Based on alignment of Gene and #YORF).
    3. Add 'split' column and 'SystematicName' column.
    """

    # Load CSV file
    try:
        df_trim = pd.read_csv(trim_file_path)
        df_pars = pd.read_csv(pars_file_path)
    except FileNotFoundError as e:
        print(f"Error: {e}")
        return

    df_merged = pd.merge(
        df_trim, 
        df_pars[['#YORF', 'Max30Pos']], 
        left_on='tx_id', 
        right_on='#YORF', 
        how='left'
    )
    df_merged.drop(columns=['#YORF'], inplace=True)
    df_merged['split'] = 'test'
    df_merged['SystematicName'] = df_merged['Gene']
    cols = list(df_merged.columns)
    df_merged.to_csv(output_file_path, index=False)

    print("-" * 30)
    print(f"File saved as {output_file_path}")
    print(f"Raw data lines: {len(df_trim)}")
    print(f"Merged data lines: {len(df_merged)}")
    print("-" * 30)

if __name__ == "__main__":
    merge_and_enrich_trim_data()

# Calculating Pairing Probability
**Caution: Switch RNA2d mamba env**

In [None]:
import pandas as pd
import subprocess
import os
from tqdm import tqdm

def predict_structure(sequence):
    """
    Predicting RNA secondary structure
    """
    try:
        process = subprocess.Popen(
            ['RNAfold', '--noPS'], 
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        stdout, stderr = process.communicate(input=sequence)
        
        if stderr and "error" in stderr.lower():
            print(f"RNAfold error: {stderr}")
            return None

        lines = stdout.strip().split('\n')
        if len(lines) >= 2:
            structure_line = lines[1]
            dot_bracket = structure_line.split()[0]
            return dot_bracket
        return None
    except Exception as e:
        print(f"Execution Error: {e}")
        return None

def main():
    input_file = 'data/TRIM_Ded1_Dataset_Enhanced.csv'
    output_file = '3_3_Plot/step1_full_structures_genomewide.csv'

    # input_file = 'data/test_input.txt'
    # output_file = '3_3_Plot/step1_full_structures.csv'
    
    print(f"Loading {input_file}...")
    df = pd.read_csv(input_file)
    
    # 1. Sequence Concatenate
    df['full_sequence'] = df['utr5_sequence'].astype(str) + df['cds_sequence'].astype(str)
    structures = []
    
    print("Predicting full-length structure...")
    for seq in tqdm(df['full_sequence']):
        if 'nan' in seq.lower():
            structures.append(None)
            continue
            
        struct = predict_structure(seq)
        structures.append(struct)
        
    df['structure'] = structures
    df_clean = df.dropna(subset=['structure'])
    df_clean['utr_len'] = df_clean['utr5_sequence'].astype(str).apply(len)
    df_clean.to_csv(output_file, index=False)
    print(f"Results saved to {output_file}")

if __name__ == "__main__":
    main()

# Calculating Gradient Points
**Switch to plot33 mamba env**

In [None]:
import sys
import types

class OmniMock:
    def __init__(self, *args, **kwargs):
        self.__spec__ = types.SimpleNamespace(origin="mock")
        self.__path__ = []
        self.__version__ = "9.9.9"
        self.__file__ = "mock_file.py"
    
    def __getattr__(self, name):
        return OmniMock()
    
    def __call__(self, *args, **kwargs):
        return OmniMock()
    
    def __getitem__(self, key):
        return OmniMock()
    
    def __contains__(self, key):
        return False
mock_obj = OmniMock()
BLOCK_LIST = [
    "torchvision",
    "torchvision.ops",
    "torchvision.transforms",
    "torchvision.models",
    "matplotlib", "matplotlib.pyplot", "matplotlib.colors", 
    "matplotlib.cm", "matplotlib.collections", "matplotlib.figure",
    "matplotlib.image", "matplotlib.axes", "mpl_toolkits", 
    "mpl_toolkits.axes_grid1",
    "scipy", "scipy.stats", "scipy.signal", "scipy.interpolate",
    "scipy.spatial", "scipy.optimize", "scipy.sparse", "scipy.linalg",
    "sklearn", "sklearn.preprocessing", "sklearn.utils", "sklearn.base"
]
for lib_name in BLOCK_LIST:
    sys.modules[lib_name] = mock_obj

# =================================================================

import os
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
from captum.attr import IntegratedGradients
from src.model_5U import Expert_5U

NUC2IDX = {"A": 0, "T": 1, "C": 2, "G": 3, "U": 1}

# ====================== Configuration ======================
def get_cfg():
    return {
        "in_csv_path": "3_3_Plot/step1_full_structures_genomewide.csv",
        "ckpt_path": "outputs_5U/logs/version_0/checkpoints/local200-epoch=160-val/r2_reg=0.785.ckpt",
        "out_csv_path": "3_3_Plot/step2_gradient_scores_genomewide.csv",
        "L": 200,
        "utr_tail_len": 150,
        "cds_head_len": 50,
    }

# ====================== Encoding Function ======================

def encode_utr_cds_to_x200_with_text(utr_seq: str, cds_seq: str, L: int, utr_tail_len: int, cds_head_len: int):
    utr = (str(utr_seq) if pd.notna(utr_seq) else "").upper().replace("U", "T")
    cds = (str(cds_seq) if pd.notna(cds_seq) else "").upper().replace("U", "T")

    if len(utr) >= utr_tail_len:
        utr_tail = utr[-utr_tail_len:]
    else:
        utr_tail = utr

    cds_head = cds[:cds_head_len]
    merged = utr_tail + cds_head
    merged = merged[-L:] 
    
    x = np.zeros((4, L), dtype=np.float32)
    start_pos = L - len(merged)

    for i, base in enumerate(merged):
        pos = start_pos + i
        idx = NUC2IDX.get(base, None)
        if idx is not None and 0 <= pos < L:
            x[idx, pos] = 1.0

    return x, merged, start_pos

# ====================== Load Model ======================

def load_expert5u(ckpt_path: str, device: torch.device) -> Expert_5U:
    if not os.path.isfile(ckpt_path):
        raise FileNotFoundError(f"Checkpoint not found: {ckpt_path}")
    
    print(f"[INFO] Loading checkpoint: {ckpt_path}")
    model = Expert_5U.load_from_checkpoint(ckpt_path, map_location=device)
    model.to(device)
    model.eval() 
    return model

# ====================== Gradient Calculation ======================

def run_gradient_analysis():
    cfg = get_cfg()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"[INFO] Device Used: {device}")

    if not os.path.exists(cfg["in_csv_path"]):
        raise FileNotFoundError(f"Input not found: {cfg['in_csv_path']}")
        
    df = pd.read_csv(cfg["in_csv_path"])
    model = load_expert5u(cfg["ckpt_path"], device)
    ig = IntegratedGradients(model)

    L = cfg["L"]
    utr_tail_len = cfg["utr_tail_len"]
    cds_head_len = cfg["cds_head_len"]
    res_scores = []
    res_seqs = []
    res_start_pos = []

    print("[INFO] Calculating Saliency Maps...")
    
    for index, row in tqdm(df.iterrows(), total=len(df)):
        tx_id = row['tx_id']
        utr = row['utr5_sequence']
        cds = row['cds_sequence']
        x_numpy, effective_seq, start_pos = encode_utr_cds_to_x200_with_text(
            utr, cds, L, utr_tail_len, cds_head_len
        )
        
        input_tensor = torch.from_numpy(x_numpy).unsqueeze(0).to(device)
        input_tensor.requires_grad = True 
        
        try:
            # Gradient Calculating
            attributions = ig.attribute(input_tensor, target=None, n_steps=50, internal_batch_size=1)
            
            # Fusion
            attr_score_tensor = torch.sum(torch.abs(attributions), dim=1)
            attr_score = attr_score_tensor.squeeze(0)
            
            # Normalization
            score_np = attr_score.detach().cpu().numpy()
            denom = score_np.max() - score_np.min()
            if denom == 0:
                score_norm = np.zeros_like(score_np)
            else:
                score_norm = (score_np - score_np.min()) / denom
            
            res_scores.append(score_norm.tolist())
            res_seqs.append(effective_seq)
            res_start_pos.append(start_pos)
            
        except Exception as e:
            print(f"Error processing {tx_id}: {e}")
            res_scores.append(None)
            res_seqs.append(None)
            res_start_pos.append(None)

    df['gradient_scores'] = res_scores
    df['effective_sequence'] = res_seqs
    df['start_pos'] = res_start_pos
    df_clean = df.dropna(subset=['gradient_scores'])
    os.makedirs(os.path.dirname(cfg["out_csv_path"]), exist_ok=True)
    df_clean.to_csv(cfg["out_csv_path"], index=False)
    
    print(f"[INFO] Gradient calculated. Files saved to {cfg['out_csv_path']}")

if __name__ == "__main__":
    run_gradient_analysis()

# Give out final plot
**Caution: Switch to draw mamba env**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ast

def analyze_trim_gradient_overlap(file_path, top_percent=10):
    """
    Analyze the coverage of TRIM gradient with MAX30 PARS region.
    
    PARAMS:
    - file_path: CSV Path
    - top_percent: Threshold of high gradient, e.g. 5 refers Top 5%ã€‚
    """
    print(f"Loading files {file_path}, for top {top_percent}% gradient points.")
    
    try:
        df = pd.read_csv(file_path)
    except Exception as e:
        print(f"Loading failed: {e}")
        return

    # --- Data Preprocessing ---
    df_clean = df.dropna(subset=['Max30Pos']).copy()
    print(f"Valid lines with Max30Pos: {len(df_clean)} / {len(df)}")

    # Parsing lists
    df_clean['gradient_scores'] = df_clean['gradient_scores'].apply(
        lambda x: ast.literal_eval(x) if isinstance(x, str) else x
    )

    CDS_LEN = 50
    aligned_gradients = []
    hit_counts = 0
    total_top_points = 0
    region_grads_accum = []
    global_grads_accum = []
    
    for index, row in df_clean.iterrows():
        try:
            full_grads = np.array(row['gradient_scores'])
            utr_len = len(row['utr5_sequence'])
            seq_len = utr_len + CDS_LEN
            
            # Cut for valid length
            if len(full_grads) >= seq_len:
                effective_grads = full_grads[-seq_len:]
            else:
                effective_grads = full_grads
                
            # Normalization to (0-1)
            g_min, g_max = np.min(effective_grads), np.max(effective_grads)
            if g_max - g_min == 0:
                norm_grads = effective_grads - g_min
            else:
                norm_grads = (effective_grads - g_min) / (g_max - g_min)

            # Position Max30
            max30_rel_pos = int(row['Max30Pos'])
            start_idx = utr_len + max30_rel_pos
            end_idx = start_idx + 30
            
            # Boundary Check
            start_idx = max(0, start_idx)
            end_idx = min(len(effective_grads), end_idx)
            
            if start_idx >= end_idx:
                continue

            # Summarizing Top-K
            top_k = max(1, int(len(effective_grads) * (top_percent / 100.0)))
            top_indices = np.argsort(effective_grads)[-top_k:]
            
            hits = np.sum((top_indices >= start_idx) & (top_indices < end_idx))
            hit_counts += hits
            total_top_points += top_k
            
            region_grads_accum.extend(norm_grads[start_idx:end_idx])
            global_grads_accum.extend(norm_grads)

            # Preparing plot data (from -50 to +80 nt)
            window_left = 50
            window_right = 80
            extract_start = start_idx - window_left
            extract_end = start_idx + window_right
            
            aligned_window = np.full(window_left + window_right, np.nan)
            
            src_s = max(0, extract_start)
            src_e = min(len(effective_grads), extract_end)
            dst_s = src_s - extract_start
            dst_e = dst_s + (src_e - src_s)
            
            if src_e > src_s:
                aligned_window[dst_s:dst_e] = norm_grads[src_s:src_e]
                aligned_gradients.append(aligned_window)

        except Exception:
            continue

    # --- Calculating Statistic Index ---
    avg_hit_rate = hit_counts / total_top_points if total_top_points > 0 else 0
    print("-" * 30)
    print(f"Analysising finished")
    print(f"Probabilities of top {top_percent}% gradients falling into Max30 regions: {avg_hit_rate:.2%}")

    # --- Plot ---
    grad_matrix = np.array(aligned_gradients)
    mean_profile = np.nanmean(grad_matrix, axis=0)
    std_profile = (np.nanstd(grad_matrix, axis=0))**2
    fig, ax = plt.subplots(figsize=(12, 5), dpi=300, facecolor='white')
    ax.set_facecolor('white')
    x_axis = np.arange(-50, 80)

    ax.fill_between(x_axis, 
                    mean_profile - std_profile, 
                    mean_profile + std_profile, 
                    color='#0072B2', 
                    alpha=0.2, 
                    label='Variance')
    
    # Main Curve (Deep Blue)
    ax.plot(x_axis, mean_profile, color='#0072B2', linewidth=3, label='Mean Gradient')
    
    # Max30 Region
    ax.axvspan(0, 30, color='#D55E00', alpha=0.15, label='Max30 Region')
    ax.axvline(0, color='black', linestyle='--', linewidth=1.5, alpha=0.5)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.spines['bottom'].set_color('black')
    ax.spines['bottom'].set_linewidth(1.2)
    ax.spines['left'].set_color('black')
    ax.spines['left'].set_linewidth(1.2)
    
    ax.tick_params(axis='x', colors='black', labelsize=18, width=1.2)
    ax.tick_params(axis='y', colors='black', labelsize=18, width=1.2)
    ax.set_xlabel('Position relative to Max30 (nt)', fontweight="bold", fontsize=24, color='black', labelpad=10)
    ax.set_ylabel('Gradient Score', fontweight="bold", fontsize=24, color='black', labelpad=10)
    ax.legend(frameon=False, loc='upper right', fontsize=20)
    
    plt.tight_layout()
    plt.show()

analyze_trim_gradient_overlap('3_3_Plot/step2_gradient_scores_genomewide.csv', top_percent=5)