In [3]:
import numpy as np
import pandas as pd
from pathlib import Path

DATA = Path("../data")
FIGS = Path("../reports/figures")

merged = pd.read_csv(DATA/"fake_variants_merged.csv")  # has seq31 + label
merged.head()

Unnamed: 0,chrom,pos,ref,alt,label,is_transition,consequence,gene,seq31,gc31
0,chr11,27658369,G,A,benign,1,missense,BDNF,GTAGGTAAGCGGGGTATTTGCACTTCCCTTA,0.483871
1,chr1,10123,A,G,pathogenic,1,synonymous,GENE1,ATCCATAAGGGCTTTTGCCGCGTGTTAGAGG,0.516129
2,chr11,27658370,C,T,benign,1,intron,BDNF,AAGCTATCCCACACTTGTGTATGGCATCTTC,0.451613
3,chr7,5501234,T,C,pathogenic,1,missense,CTNNA2,CCCCTCAGCCTCCCTCGTGTCGTACTATACG,0.612903
4,chr3,992211,G,A,benign,1,5_prime_UTR,GENE3,ATCATTTAAAGAAAGATATTTGGGATGGAGA,0.290323


In [4]:
def gc_content_vec(seq: str) -> float:
    s = np.frombuffer(seq.upper().encode(), dtype=np.uint8)
    #ASCII codes: G=71, C=67
    return np.mean((s==71) | (s == 67))

#Apply to all sequences
merged["gc31_vec"] = merged["seq31"].apply(gc_content_vec)
merged [["gc31", "gc31_vec"]].head() # should match previous gc31 closely

Unnamed: 0,gc31,gc31_vec
0,0.483871,0.483871
1,0.516129,0.516129
2,0.451613,0.451613
3,0.612903,0.612903
4,0.290323,0.290323


In [5]:
BASE_TO_INT = np.full(256, -1, dtype=np.int8)
BASE_TO_INT[ord("A")] = 0
BASE_TO_INT[ord("C")] = 1
BASE_TO_INT[ord("G")] = 2
BASE_TO_INT[ord("T")] = 3
BASE_TO_INT[ord("a")] = 0
BASE_TO_INT[ord("c")] = 1
BASE_TO_INT[ord("g")] = 2
BASE_TO_INT[ord("t")] = 3

def seq_to_int_vec(seq: str) -> np.ndarray:
    b = np.frombuffer(seq.encode(), dtype=np.uint8)
    return BASE_TO_INT[b] #shape: (L,)

In [6]:
seq = merged["seq31"].iloc[0]
ints = seq_to_int_vec(seq)
ints[:10], ints.shape

(array([2, 3, 0, 2, 2, 3, 0, 0, 2, 1], dtype=int8), (31,))

In [11]:
EYE4 = np.eye(4, dtype=np.float32)

In [15]:
def one_hot_vec(seq: str) -> np.ndarray:
    idx = seq_to_int_vec(seq)                     # (L,)
    # Mask unknowns (-1) to zeros
    valid = idx >= 0
    oh = np.zeros((idx.shape[0], 4), dtype=np.float32)
    oh[valid] = EYE4[idx[valid]]
    return oh    # shape: (L, 4)

# Sanity check
oh = one_hot_vec(merged["seq31"].iloc[0])
oh.shape, oh[:4]

((31, 4),
 array([[0., 0., 1., 0.],
        [0., 0., 0., 1.],
        [1., 0., 0., 0.],
        [0., 0., 1., 0.]], dtype=float32))

In [17]:
seqs = merged["seq31"].tolist()
X = np.stack([one_hot_vec(s) for s in seqs], axis=0) #(N, L, 4)

#Labels: map"pathogenic" =1, "benign" = 0
y = (merged["label"].str.lower() == "pathogenic").astype(np.int64).to_numpy()

X.shape, y.shape[:8]

((8, 31, 4), (8,))

In [24]:
pd.Series(y).value_counts()

0    4
1    4
Name: count, dtype: int64

In [25]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()

y = le.fit_transform(merged["label"].str.lower())
pd.Series(y).value_counts()

0    4
1    4
Name: count, dtype: int64

In [26]:
from sklearn.model_selection import train_test_split

X_train, X_tmp, y_train, y_tmp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_tmp, y_tmp, test_size=0.5, random_state=42, stratify=y_tmp
)

(X_train.shape, X_val.shape, X_test.shape), (y_train.mean(), y_val.mean(), y_test.mean())


ValueError: The least populated class in y has only 1 member, which is too few. The minimum number of groups for any class cannot be less than 2.

In [27]:
from sklearn.model_selection import train_test_split

# First split: Train (4) vs temp (4)
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.5, random_state=42, stratify=y
)

# Second split: Val (2) vs Test (2)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

In [28]:
print(pd.Series(y_train).value_counts(), "← Train")
print(pd.Series(y_val).value_counts(), "← Val")
print(pd.Series(y_test).value_counts(), "← Test")

1    2
0    2
Name: count, dtype: int64 ← Train
1    1
0    1
Name: count, dtype: int64 ← Val
1    1
0    1
Name: count, dtype: int64 ← Test


In [29]:
from sklearn.model_selection import train_test_split

# First split: Train (50%) vs Temp (50%)
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.5, random_state=42, stratify=y
)

# Second split: Val (50% of temp) vs Test (50% of temp)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

# Check class balance
print("Train:", pd.Series(y_train).value_counts().to_dict())
print("Val:  ", pd.Series(y_val).value_counts().to_dict())
print("Test: ", pd.Series(y_test).value_counts().to_dict())

Train: {1: 2, 0: 2}
Val:   {1: 1, 0: 1}
Test:  {1: 1, 0: 1}


In [31]:
np.savez_compressed(
    DATA/"preproc_fake_seq31.npz",
    X_train=X_train, y_train=y_train,
    X_val=X_val,     y_val=y_val,
    X_test=X_test,   y_test=y_test,
)
    

In [32]:
chk = np.load(DATA/"preproc_fake_seq31.npz")
list(chk.keys()), chk["X_train"].shape, chk["y_train"].shape

(['X_train', 'y_train', 'X_val', 'y_val', 'X_test', 'y_test'],
 (4, 31, 4),
 (4,))

In [38]:
def iterate_minibatches(Xa, ya, batch_size=8, shuffle=True):
    n = Xa.shape[0]
    idx = np.arange(n)
    if shuffle:
        rng = np.random.default_rng(42)
        rng.shuffle(idx)
    for start in range(0, n, batch_size):
        sel = idx[start:start+batch_size]
        yield Xa[sel], ya[sel]

#Quicl smoke test
bX, by = next(iterate_minibatches(X_train, y_train, batch_size=4))
bX.shape, by

((4, 31, 4), array([1, 0, 0, 1]))

In [39]:
#silent bug checks
assert X_train.ndim == 3 and X_train.shape[2] == 4
assert set(np.unique(y_train)).issubset({0, 1})
assert not np.isnan(X_train).any()
assert (X_train.sum(axis=2) == 1).all()  # one-hot rows sum to 1


In [41]:
def is_transition(ref, alt):
    return (ref, alt) in {("A","G"),("G","A"),("C","T"),("T","C")}

merged["is_transition"] = [
    int(is_transition(r, a)) for r, a in zip(merged["ref"], merged["alt"])
]
merged["is_transition"].value_counts()

is_transition
1    8
Name: count, dtype: int64