In [None]:
# ==== Import libraries ====
import math
import random
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)


<torch._C.Generator at 0x2bfa31bead0>

In [4]:
DATA_CSV = "data/GenomeCRISPR_+_strands.csv"
SEQ_LEN  = 23
VAL_FRAC = 0.10
TEST_FRAC= 0.10

seq_col   = "sequence"
cell_col  = "cellline"
phen_col  = "condition"   # phenotype
chr_col   = "chr"
target_col= "log2fc"      # target


df = pd.read_csv(DATA_CSV)
missing = [c for c in [seq_col, cell_col, phen_col, chr_col, target_col] if c not in df.columns]
if missing:
    raise KeyError(f"Missing expected columns: {missing}. Got: {list(df.columns)}")

df = df[[seq_col, cell_col, phen_col, chr_col, target_col]].copy()
df = df.dropna(subset=[seq_col, cell_col, phen_col, chr_col, target_col])

df[seq_col] = df[seq_col].astype(str).str.upper().str.strip()
df = df[df[seq_col].str.len() == SEQ_LEN]
df = df[df[seq_col].str.match(r"^[ACGT]+$")]


cell_codes, cell_uniques = pd.factorize(df[cell_col].astype(str).str.strip(), sort=True)
phen_codes, phen_uniques = pd.factorize(df[phen_col].astype(str).str.strip(), sort=True)
chr_codes,  chr_uniques  = pd.factorize(df[chr_col].astype(str).str.strip(),  sort=True)

n_cell, n_ph, n_chr = len(cell_uniques), len(phen_uniques), len(chr_uniques)

# One-hot the 23-mer sequences
BASE2IDX = {"A":0, "C":1, "G":2, "T":3}
def onehot_batch(seqs, L=SEQ_LEN):
    N = len(seqs)
    X = np.zeros((N, 4, L), dtype=np.float32)
    for i, s in enumerate(seqs):
        for j, ch in enumerate(s):
            X[i, BASE2IDX[ch], j] = 1.0
    return X

X_seq = onehot_batch(df[seq_col].tolist())
X_cell = cell_codes.astype(np.int64)
X_ph   = phen_codes.astype(np.int64)
X_chr  = chr_codes.astype(np.int64)
y      = df[target_col].astype(np.float32).to_numpy()

# Simple random split
idx_all = np.arange(len(df))
idx_train, idx_test = train_test_split(idx_all, test_size=TEST_FRAC, random_state=42)
idx_train, idx_val  = train_test_split(idx_train, test_size=VAL_FRAC/(1-TEST_FRAC), random_state=42)

def take(a, idx): return a[idx]
Xtr_seq, Xva_seq, Xte_seq = take(X_seq, idx_train), take(X_seq, idx_val), take(X_seq, idx_test)
Xtr_cel, Xva_cel, Xte_cel = take(X_cell, idx_train), take(X_cell, idx_val), take(X_cell, idx_test)
Xtr_ph,  Xva_ph,  Xte_ph  = take(X_ph,  idx_train), take(X_ph,  idx_val), take(X_ph,  idx_test)
Xtr_chr, Xva_chr, Xte_chr = take(X_chr, idx_train), take(X_chr, idx_val), take(X_chr, idx_test)
y_tr,    y_va,    y_te    = take(y,     idx_train), take(y,     idx_val), take(y,     idx_test)

print(f"train={len(idx_train)}  val={len(idx_val)}  test={len(idx_test)}")
print(f"cells={n_cell}  phenotypes={n_ph}  chrs={n_chr}")


  df = pd.read_csv(DATA_CSV)


train=29452509  val=3681564  test=3681564
cells=420  phenotypes=34  chrs=301


In [None]:
class BaselineCNN(nn.Module):
    def __init__(self, out_channels=16, kernel_size=5):
        super().__init__()
        self.conv = nn.Conv1d(4, out_channels, kernel_size=kernel_size, padding=0)
        self.head = nn.Linear(out_channels, 1)

    def forward(self, seq4x23, *_):
        x = self.conv(seq4x23)
        x = F.relu(x)
        x = F.adaptive_max_pool1d(x, 1).squeeze(-1)
        return self.head(x).squeeze(-1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BaselineCNN(out_channels=16, kernel_size=5).to(device)
print("params:", sum(p.numel() for p in model.parameters()))

params: 353


In [7]:
criterion = nn.MSELoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

EPOCHS = 3
BATCH  = 256

def iter_minibatches(indexes, batch_size=256, shuffle=True):
    idx = np.asarray(indexes)
    if shuffle:
        rng = np.random.default_rng(42)
        rng.shuffle(idx)
    for start in range(0, len(idx), batch_size):
        mb = idx[start:start+batch_size]
        yield (
            torch.from_numpy(X_seq[mb]).to(device),
            torch.from_numpy(X_cell[mb]).long().to(device),
            torch.from_numpy(X_ph[mb]).long().to(device),
            torch.from_numpy(X_chr[mb]).long().to(device),
            torch.from_numpy(y[mb]).to(device),
        )

for epoch in range(1, EPOCHS + 1):
    # ---- train ----
    model.train()
    train_sum, n_train = 0.0, 0
    for seq, cl, ph, ch, tgt in iter_minibatches(idx_train, batch_size=BATCH, shuffle=True):
        pred = model(seq, cl, ph, ch)
        loss = criterion(pred, tgt)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_sum += loss.item() * tgt.size(0)
        n_train   += tgt.size(0)

    # ---- val ----
    model.eval()
    val_sum, n_val = 0.0, 0
    with torch.no_grad():
        for seq, cl, ph, ch, tgt in iter_minibatches(idx_val, batch_size=BATCH, shuffle=False):
            pred = model(seq, cl, ph, ch)
            loss = criterion(pred, tgt)
            val_sum += loss.item() * tgt.size(0)
            n_val   += tgt.size(0)

    print(f"Epoch {epoch:02d} | train MSE: {train_sum/n_train:.4f} | val MSE: {val_sum/n_val:.4f}")

Epoch 01 | train MSE: 0.6529 | val MSE: 0.6498
Epoch 02 | train MSE: 0.6519 | val MSE: 0.6505
Epoch 03 | train MSE: 0.6516 | val MSE: 0.6495


In [9]:
def mse_doc(yhat, y):
    # (1/n) * sum (yi - yhat_i)^2
    yhat = np.asarray(yhat, dtype=np.float64)
    y    = np.asarray(y,    dtype=np.float64)
    n = y.size
    return float(np.sum((y - yhat)**2) / n)

def pearson_doc(x, y):
    # r = (n Σxy − (Σx)(Σy)) / sqrt((n Σx^2 − (Σx)^2)(n Σy^2 − (Σy)^2))
    x = np.asarray(x, dtype=np.float64)
    y = np.asarray(y, dtype=np.float64)
    n      = x.size
    sum_x  = np.sum(x)
    sum_y  = np.sum(y)
    sum_xy = np.sum(x * y)
    sum_x2 = np.sum(x * x)
    sum_y2 = np.sum(y * y)
    denom = np.sqrt((n * sum_x2 - sum_x * sum_x) * (n * sum_y2 - sum_y * sum_y))
    return float((n * sum_xy - sum_x * sum_y) / denom) if denom != 0.0 else 0.0

def _ranks_avg(a):
    # average ranks (1..n), stable; supports ties
    a = np.asarray(a, dtype=np.float64)
    order = np.argsort(a, kind="mergesort")
    ranks = np.empty_like(order, dtype=np.float64)
    sa = a[order]
    diff = np.concatenate(([True], sa[1:] != sa[:-1], [True]))
    idx = np.flatnonzero(diff)
    for s, e in zip(idx[:-1], idx[1:]):
        ranks[order[s:e]] = 0.5 * (s + e - 1) + 1.0
    return ranks

def spearman_doc(x, y):
    # ρ_s = 1 − (6 Σ d_i^2)/(n(n^2 − 1)), with d_i = rank(x_i) − rank(y_i)
    rx = _ranks_avg(x)
    ry = _ranks_avg(y)
    d  = rx - ry
    n  = rx.size
    denom = n * (n * n - 1.0)
    return float(1.0 - (6.0 * np.sum(d * d)) / denom) if denom != 0.0 else 0.0

# -- accuracy as a decimal --
def accuracy_direction(yhat, y):
    # fraction where predicted and true share the same sign (no rounding/percent)
    yhat = np.asarray(yhat, dtype=np.float64)
    y    = np.asarray(y,    dtype=np.float64)
    return float(np.mean((yhat >= 0) == (y >= 0)))

@torch.no_grad()
def preds_and_trues(indexes, batch_size=256):
    model.eval()
    ps, ys = [], []
    for seq, cl, ph, ch, tgt in iter_minibatches(indexes, batch_size=batch_size, shuffle=False):
        out = model(seq, cl, ph, ch)
        ps.append(out.detach().cpu().numpy())
        ys.append(tgt.detach().cpu().numpy())
    return np.concatenate(ps), np.concatenate(ys)

def eval_split(indexes):
    yhat, y = preds_and_trues(indexes, batch_size=256)
    return {
        "MSE": mse_doc(yhat, y),
        "Pearson": pearson_doc(yhat, y),
        "Spearman": spearman_doc(yhat, y),
        "Accuracy": accuracy_direction(yhat, y),
    }

# --- print results ---
print("Validation:", eval_split(idx_val))
print("Test:",       eval_split(idx_test))

Validation: {'MSE': 0.6495035677847352, 'Pearson': 0.13991078002324805, 'Spearman': 0.14329448193871497, 'Accuracy': 0.5219727268084977}
Test: {'MSE': 0.6518217131747005, 'Pearson': 0.13975895583427453, 'Spearman': 0.14319985258015533, 'Accuracy': 0.5222514127148136}
