<a href="https://www.kaggle.com/code/rahulchauhan016/rna-3d-structure-prediction?scriptVersionId=298667313" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# ðŸ§¬ RNA 3D Structure Prediction
**Competition:** Stanford RNA 3D Folding 2  
**Approach:** Homology-based template matching with adaptive geometry refinement  


## 1. Environment Setup
Install BioPython from the offline Kaggle dataset (no internet required).

In [6]:
import subprocess, sys

WHEEL = (
    '/kaggle/input/biopython-cp312/biopython-1.86-cp312-cp312-'
    'manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl'
)

import os
if os.path.exists(WHEEL):
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--no-index', WHEEL])
    print('BioPython installed from offline wheel.')
else:
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'biopython'])
    print('BioPython installed from PyPI.')

BioPython installed from PyPI.


## 2. Imports & Data Loading
Load train/test sequences and ground-truth coordinates.  
We also attach a robust FASTA parser and stoichiometry utilities.

In [7]:
import pandas as pd
import numpy as np
import random
import time
import warnings
import os, sys

warnings.filterwarnings('ignore')

DATA_PATH = '/kaggle/input/stanford-rna-3d-folding-2/'
train_seqs = pd.read_csv(DATA_PATH + 'train_sequences.csv')
test_seqs  = pd.read_csv(DATA_PATH + 'test_sequences.csv')
train_labels = pd.read_csv(DATA_PATH + 'train_labels.csv')

sys.path.append(os.path.join(DATA_PATH, "extra"))
print(f"Train sequences : {len(train_seqs)}")
print(f"Test  sequences : {len(test_seqs)}")
print(f"Train labels    : {len(train_labels)}")

Train sequences : 5716
Test  sequences : 28
Train labels    : 7794971


## 3. FASTA Parser & Stoichiometry Utilities
Parse multi-chain sequences and stoichiometry strings (e.g. `A:2;B:1`) to
identify per-chain residue segments in the full concatenated sequence.

In [8]:
# --- Robust import for Kaggle's extra/parse_fasta_py.py ---
try:
    import typing as _typing
    import builtins as _builtins
    _builtins.Dict  = getattr(_typing, "Dict")
    _builtins.Tuple = getattr(_typing, "Tuple")
    _builtins.List  = getattr(_typing, "List")
    from parse_fasta_py import parse_fasta as _parse_fasta_raw
    def parse_fasta(fasta_content: str):
        d = _parse_fasta_raw(fasta_content)
        out = {}
        for k, v in d.items():
            out[k] = v[0] if isinstance(v, tuple) else v
        return out
except Exception:
    def parse_fasta(fasta_content: str):
        out = {}
        cur = None
        seq_parts = []
        for line in str(fasta_content).splitlines():
            line = line.strip()
            if not line: continue
            if line.startswith(">"):
                if cur is not None: out[cur] = "".join(seq_parts)
                header = line[1:]
                cur = header.split()[0]
                seq_parts = []
            else: seq_parts.append(line.replace(" ", ""))
        if cur is not None: out[cur] = "".join(seq_parts)
        return out

def parse_stoichiometry(stoich: str):
    if pd.isna(stoich) or str(stoich).strip() == "": return []
    out = []
    for part in str(stoich).split(';'):
        ch, cnt = part.split(':')
        out.append((ch.strip(), int(cnt)))
    return out

def get_chain_segments(row):
    seq    = row['sequence']
    stoich = row.get('stoichiometry', '')
    all_seq = row.get('all_sequences', '')
    if pd.isna(stoich) or pd.isna(all_seq) or str(stoich).strip()=="" or str(all_seq).strip()=="":
        return [(0, len(seq))]
    try:
        chain_dict = parse_fasta(all_seq)
        order = parse_stoichiometry(stoich)
        segs = []
        pos = 0
        for ch, cnt in order:
            base = chain_dict.get(ch)
            if base is None: return [(0, len(seq))]
            for _ in range(cnt):
                segs.append((pos, pos + len(base)))
                pos += len(base)
        if pos != len(seq): return [(0, len(seq))]
        return segs
    except Exception: return [(0, len(seq))]

def build_segments_map(df):
    seg_map, stoich_map = {}, {}
    for _, r in df.iterrows():
        tid = r['target_id']
        seg_map[tid]   = get_chain_segments(r)
        stoich_map[tid] = str(r.get('stoichiometry', '') if not pd.isna(r.get('stoichiometry', '')) else '')
    return seg_map, stoich_map

train_segs_map, train_stoich_map = build_segments_map(train_seqs)
test_segs_map,  test_stoich_map  = build_segments_map(test_seqs)

def process_labels(labels_df):
    coords_dict = {}
    prefixes = labels_df['ID'].str.rsplit('_', n=1).str[0]
    for id_prefix, group in labels_df.groupby(prefixes):
        coords_dict[id_prefix] = group.sort_values('resid')[['x_1', 'y_1', 'z_1']].values
    return coords_dict

train_coords_dict = process_labels(train_labels)
print(f"Loaded {len(train_coords_dict)} training coordinate sets")

Loaded 5716 training coordinate sets


## 4. Sequence Alignment (BioPython PairwiseAligner)
We use **global alignment** with strong gap penalties to prevent residue index
sliding â€” a critical requirement because the evaluation uses TM-score which is
sensitive to residue correspondence.

In [9]:
from Bio.Align import PairwiseAligner

aligner = PairwiseAligner()
aligner.mode = 'global'
aligner.match_score      =  2
aligner.mismatch_score   = -1.5
aligner.open_gap_score   = -8
aligner.extend_gap_score = -0.4
# Penalise terminal gaps to prevent end-gap semi-global behaviour
for side in ['query_left','query_right','target_left','target_right']:
    setattr(aligner, f'{side}_open_gap_score',   -8)
    setattr(aligner, f'{side}_extend_gap_score', -0.4)

print("PairwiseAligner configured (global, strong gap penalties)")

PairwiseAligner configured (global, strong gap penalties)


## 5. Template Search & Coordinate Transfer
For each test sequence:
1. **`find_similar_sequences`** â€” score all training templates via length-filtered
   fast alignment scoring and return the top-N candidates.
2. **`adapt_template_to_query`** â€” vectorised alignment mapping transfers 3-D
   coordinates from template to query positions; gaps are filled by linear
   interpolation.

In [10]:
def find_similar_sequences(query_seq, train_seqs_df, train_coords_dict, top_n=5):
    similar_seqs = []
    for _, row in train_seqs_df.iterrows():
        target_id, train_seq = row['target_id'], row['sequence']
        if target_id not in train_coords_dict: continue
        if abs(len(train_seq) - len(query_seq)) / max(len(train_seq), len(query_seq)) > 0.3: continue
        raw_score = aligner.score(query_seq, train_seq)
        normalized_score = raw_score / (2 * min(len(query_seq), len(train_seq)))
        similar_seqs.append((target_id, train_seq, normalized_score, train_coords_dict[target_id]))
    similar_seqs.sort(key=lambda x: x[2], reverse=True)
    return similar_seqs[:top_n]

def adapt_template_to_query(query_seq, template_seq, template_coords):
    alignment = next(iter(aligner.align(query_seq, template_seq)))
    new_coords = np.full((len(query_seq), 3), np.nan)
    for (q_start, q_end), (t_start, t_end) in zip(*alignment.aligned):
        t_chunk = template_coords[t_start:t_end]
        if len(t_chunk) == (q_end - q_start):
            new_coords[q_start:q_end] = t_chunk
    # Interpolate / extrapolate unmapped residues
    for i in range(len(new_coords)):
        if np.isnan(new_coords[i, 0]):
            prev_v = next((j for j in range(i-1,-1,-1) if not np.isnan(new_coords[j,0])), -1)
            next_v = next((j for j in range(i+1, len(new_coords)) if not np.isnan(new_coords[j,0])), -1)
            if prev_v >= 0 and next_v >= 0:
                w = (i - prev_v) / (next_v - prev_v)
                new_coords[i] = (1-w)*new_coords[prev_v] + w*new_coords[next_v]
            elif prev_v >= 0: new_coords[i] = new_coords[prev_v] + [3, 0, 0]
            elif next_v >= 0: new_coords[i] = new_coords[next_v] + [3, 0, 0]
            else:             new_coords[i] = [i*3, 0, 0]
    return np.nan_to_num(new_coords)

## 6. Adaptive Geometry Refinement
After coordinate transfer we apply within-chain physical constraints:
- **Bond correction** â€” nudge consecutive residue pairs toward ~5.95 Ã…
- **i,i+2 distance** â€” soft target ~10.2 Ã… to enforce local backbone curvature
- **Laplacian smoothing** â€” removes sharp kinks
- **Self-avoidance** â€” repels residues closer than 3.2 Ã… (for long chains)

Correction strength scales inversely with template similarity confidence.

In [11]:
def adaptive_rna_constraints(coordinates, target_id, confidence=1.0, passes=2):
    coords = coordinates.copy()
    segments = test_segs_map.get(target_id, [(0, len(coords))])
    strength = max(0.75 * (1.0 - min(confidence, 0.97)), 0.02)
    for _ in range(passes):
        for (s, e) in segments:
            X = coords[s:e]
            L = e - s
            if L < 3: coords[s:e] = X; continue
            # (1) bond i,i+1 â†’ ~5.95Ã…
            d = X[1:] - X[:-1]
            dist = np.linalg.norm(d, axis=1) + 1e-6
            adj = (d * ((5.95 - dist)/dist)[:, None]) * (0.22 * strength)
            X[:-1] -= adj;  X[1:] += adj
            # (2) i,i+2 â†’ ~10.2Ã…
            d2 = X[2:] - X[:-2]
            dist2 = np.linalg.norm(d2, axis=1) + 1e-6
            adj2 = (d2 * ((10.2 - dist2)/dist2)[:, None]) * (0.10 * strength)
            X[:-2] -= adj2; X[2:] += adj2
            # (3) Laplacian smoothing
            lap = 0.5*(X[:-2] + X[2:]) - X[1:-1]
            X[1:-1] += (0.06 * strength) * lap
            # (4) Self-avoidance (large chains only)
            if L >= 25:
                k = min(L, 160) if L > 220 else L
                idx = np.linspace(0, L-1, k).astype(int) if k < L else np.arange(L)
                P = X[idx]
                diff = P[:, None, :] - P[None, :, :]
                distm = np.linalg.norm(diff, axis=2) + 1e-6
                sep   = np.abs(idx[:, None] - idx[None, :])
                mask  = (sep > 2) & (distm < 3.2)
                if np.any(mask):
                    force = (3.2 - distm) / distm
                    vec   = (diff * force[:,:,None] * mask[:,:,None]).sum(axis=1)
                    X[idx] += (0.015 * strength) * vec
            coords[s:e] = X
    return coords

## 7. Structural Diversity Generators
The competition evaluates 5 predictions per target (best-of-5 TM-score).  
We generate diversity through three complementary perturbations:
- **Hinge rotation** â€” rigid rotation of one chain segment around a pivot
- **Chain jitter** â€” independent rigid-body rotation + translation per chain
- **Smooth wiggle** â€” low-frequency spline-interpolated displacement field

In [12]:
def _rotmat(axis, ang):
    axis = np.asarray(axis, float)
    axis = axis / (np.linalg.norm(axis) + 1e-12)
    x, y, z = axis
    c, s = np.cos(ang), np.sin(ang); C = 1.0 - c
    return np.array([[c+x*x*C, x*y*C-z*s, x*z*C+y*s],
                      [y*x*C+z*s, c+y*y*C, y*z*C-x*s],
                      [z*x*C-y*s, z*y*C+x*s, c+z*z*C]], dtype=float)

def apply_hinge(coords, seg, rng, max_angle_deg=25):
    s, e = seg; L = e - s
    if L < 30: return coords
    pivot = s + int(rng.integers(10, L - 10))
    R = _rotmat(rng.normal(size=3), np.deg2rad(float(rng.uniform(-max_angle_deg, max_angle_deg))))
    X = coords.copy(); p0 = X[pivot].copy()
    X[pivot+1:e] = (X[pivot+1:e] - p0) @ R.T + p0
    return X

def jitter_chains(coords, segments, rng, max_angle_deg=12, max_trans=1.5):
    X = coords.copy()
    global_center = X.mean(axis=0, keepdims=True)
    for (s, e) in segments:
        R = _rotmat(rng.normal(size=3), np.deg2rad(float(rng.uniform(-max_angle_deg, max_angle_deg))))
        shift = rng.normal(size=3); shift = shift/(np.linalg.norm(shift)+1e-12)*float(rng.uniform(0,max_trans))
        c = X[s:e].mean(axis=0, keepdims=True)
        X[s:e] = (X[s:e] - c) @ R.T + c + shift
    X -= X.mean(axis=0, keepdims=True) - global_center
    return X

def smooth_wiggle(coords, segments, rng, amp=0.8):
    X = coords.copy()
    for (s, e) in segments:
        L = e - s
        if L < 20: continue
        ctrl_x = np.linspace(0, L-1, 6)
        ctrl_disp = rng.normal(0, amp, size=(6, 3))
        t = np.arange(L)
        disp = np.vstack([np.interp(t, ctrl_x, ctrl_disp[:, k]) for k in range(3)]).T
        X[s:e] += disp
    return X

## 8. Main Prediction Pipeline
`predict_rna_structures` orchestrates the full workflow per target:
1. Retrieve top-30 template candidates via fast alignment scoring
2. Generate **5 diverse predictions** using the transforms above
3. Refine each prediction with the adaptive geometry constraints

In [13]:
def predict_rna_structures(row, train_seqs_df, train_coords_dict, n_predictions=5):
    tid = row['target_id']
    seq = row['sequence']
    assert set(seq).issubset(set("ACGU")), f"Non-ACGU in {tid}"
    segments = test_segs_map.get(tid, [(0, len(seq))])
    cands = find_similar_sequences(seq, train_seqs_df, train_coords_dict, top_n=30)
    assert all(len(c[3]) == len(c[1]) for c in cands)
    predictions = []; used = set()
    for i in range(n_predictions):
        seed = (abs(hash(tid)) + i * 10007) % (2**32)
        rng = np.random.default_rng(seed)
        if not cands:
            coords = np.zeros((len(seq), 3), dtype=float)
            for (s, e) in segments:
                for j in range(s+1, e): coords[j] = coords[j-1] + [5.95, 0, 0]
            predictions.append(coords); continue
        if i == 0:
            t_id, t_seq, sim, t_coords = cands[0]
        else:
            K = min(12, len(cands))
            sims = np.array([cands[k][2] for k in range(K)], float)
            w = np.exp((sims - sims.max()) / 0.08)
            for k in range(K):
                if cands[k][0] in used: w[k] *= 0.10
            w = w / (w.sum() + 1e-12)
            k = int(rng.choice(np.arange(K), p=w))
            t_id, t_seq, sim, t_coords = cands[k]
        used.add(t_id)
        adapted = adapt_template_to_query(seq, t_seq, t_coords)
        if   i == 0: X = adapted
        elif i == 1: X = adapted + rng.normal(0, max(0.01, (0.40-sim)*0.06), adapted.shape)
        elif i == 2: X = apply_hinge(adapted, max(segments, key=lambda se: se[1]-se[0]), rng, 22)
        elif i == 3: X = jitter_chains(adapted, segments, rng, 10, 1.0)
        else:        X = smooth_wiggle(adapted, segments, rng, 0.7)
        predictions.append(adaptive_rna_constraints(X, tid, confidence=sim, passes=2))
    return predictions

## 9. Generate Submission
Iterate over all test targets, produce 5 coordinate sets each, then write
`submission.csv` in the competition-required format.

In [14]:
all_predictions = []
start_time = time.time()
for idx, row in test_seqs.iterrows():
    if idx % 10 == 0: print(f"Processing {idx} | {time.time()-start_time:.1f}s")
    tid, seq = row['target_id'], row['sequence']
    preds = predict_rna_structures(row, train_seqs, train_coords_dict)
    for j in range(len(seq)):
        res = {'ID': f"{tid}_{j+1}", 'resname': seq[j], 'resid': j+1}
        for i in range(5):
            res[f'x_{i+1}'], res[f'y_{i+1}'], res[f'z_{i+1}'] = preds[i][j]
        all_predictions.append(res)

sub = pd.DataFrame(all_predictions)
cols = ['ID', 'resname', 'resid'] + [f'{c}_{i}' for i in range(1,6) for c in ['x','y','z']]
coord_cols = [c for c in cols if c.startswith(('x_','y_','z_'))]
sub[coord_cols] = sub[coord_cols].clip(-999.999, 9999.999)
sub[cols].to_csv('submission.csv', index=False)
print("submission.csv saved!")

Processing 0 | 0.0s
Processing 10 | 114.6s
Processing 20 | 117.3s
submission.csv saved!
