In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import VarianceThreshold
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, f_classif

In [None]:
RAW_PATH = r"/kaggle/input/processed-gene-mri-subject-id/Processed_gene_mri_subject_id (2).csv"
df = pd.read_csv(RAW_PATH, sep=',')

In [None]:
df.shape

In [None]:
df

In [None]:
RAW_PATH_2 = r"/kaggle/input/adnisnp/ADNI_qc_pruned_target.raw"      
OUT_EMBED_CSV = r"genotype_embeddings_1028.csv"

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
df_2 = pd.read_csv(RAW_PATH_2, sep='\s+')

In [None]:
df_2.shape

In [None]:
df_2

In [None]:
labels_subset = df[['IID', 'DXNORM', 'DXMCI', 'DXAD']]

df_2 = pd.merge(df_2, labels_subset, on='IID', how='inner')

In [None]:
print(df_2.shape)

In [None]:
df_2

In [None]:
df_2['Diagnosis'] = df_2[['DXNORM', 'DXMCI', 'DXAD']].idxmax(axis=1).map({
    'DXNORM': 0,
    'DXMCI': 1,
    'DXAD': 2
})

In [None]:
meta_cols = ['FID', 'IID', 'PAT', 'MAT', 'SEX', 'PHENOTYPE', 
             'DXNORM', 'DXMCI', 'DXAD', 'Diagnosis']


X = df_2.drop(columns=[c for c in meta_cols if c in df_2.columns])
y = df_2['Diagnosis']

In [None]:
X.shape, y.shape

In [None]:
df_2['Diagnosis']

In [None]:
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

In [None]:
from sklearn.impute import SimpleImputer
imputer = SimpleImputer(strategy='mean')


X_train_imputed = pd.DataFrame(imputer.fit_transform(X_train), columns=X_train.columns)
X_val_imputed = pd.DataFrame(imputer.transform(X_val), columns=X_val.columns)

In [None]:
selector = SelectKBest(f_classif, k=2000)
X_train_selected = selector.fit_transform(X_train_imputed, y_train)
X_val_selected = selector.transform(X_val_imputed)

selected_indi = selector.get_support(indices=True)
selected_snp = X_train.columns[selected_indices]

In [None]:
X_train_selected.shape


In [None]:
list(selected_snp[:5])

In [None]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_selected)
X_val_scaled = scaler.transform(X_val_selected)


X_train_tensor = torch.from_numpy(X_train_scaled).float()
X_val_tensor = torch.from_numpy(X_val_scaled).float().to(device)

In [None]:

# variances = np.var(g_im.values, axis=0)

# top_k = 15000
# top_indices = np.argsort(variances)[-top_k:]
# g_filtered = g_im.iloc[:, top_indices]
# g_filtered = g_filtered.reindex(sorted(g_filtered.columns), axis=1)

# selector = VarianceThreshold(threshold=0.05) 
# g_filtered = selector.fit_transform(g_im)
# print(g_filtered.shape)

In [None]:
keep_cols = g_im.columns[selector.get_support()]
geno_filtered = g_im[keep_cols]

In [None]:
g_df.shape[1], g_im.shape[1]

In [None]:
g_im

In [None]:
g_filtered

In [None]:
g_filtered.shape

In [None]:
g_filtered

In [None]:
X_tensor

In [None]:
class GenotypeAE(nn.Module):
    def __init__(self, input_dim, latent_dim=32, p_drop=0.4): 
        super().__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(p_drop),
            
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            
            nn.Linear(128, latent_dim)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        xhat = self.decoder(z)
        return xhat, z

In [None]:
class ShallowAE(nn.Module):
    def __init__(self, input_dim, latent_dim=64): 
        super().__init__()
    
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.Tanh(),
            nn.Linear(256, latent_dim)
        )
        
        # Decoder: Direct reconstruction
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 256),
            nn.Tanh(),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        xhat = self.decoder(z)
        return xhat, z

In [None]:
Latent_dim = 64
LR = 0.001
BATCH_SIZE = 32

In [None]:
input_dim = X_train_tensor.shape[1]
print(input_dim)

In [None]:
#model = GenotypeAE(input_dim=input_dim, latent_dim=Latent_dim).to(device)
model = ShallowAE(input_dim=input_dim, latent_dim=Latent_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.MSELoss()

In [None]:
Epoch = 300  
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4) 

for epoch in range(1, Epoch+1):
    model.train()
    epoch_loss = 0.0
    for xb, _ in train_loader:
        xb = xb.to(device)
        
        noise = 0.005 * torch.randn_like(xb)   
        xb_noisy = xb + noise
        
        optimizer.zero_grad()
        xhat, _ = model(xb_noisy)
        loss = loss_fn(xhat, xb)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item() * xb.size(0)
        
    epoch_loss /= len(train_loader.dataset)
    
    
    if epoch % 10 == 0 or epoch==1:
        print(f"Epoch {epoch}/{Epoch} â€” Train MSE: {epoch_loss:.6f}")

In [None]:
from sklearn.model_selection import train_test_split
train_loader = DataLoader(TensorDataset(X_train_tensor, X_train_tensor), batch_size=32, shuffle=True)

In [None]:
X_train_tensor.shape, X_val_tensor.shape

In [None]:
len(train_loader.dataset)

In [None]:
model.eval()
with torch.no_grad():
    x_val_pred, _ = model(X_val_tensor.to(device))
    val_loss = loss_fn(x_val_pred, X_val_tensor.to(device))
    print(f"Final Validation MSE: {val_loss.item():.6f}")

In [None]:
model.eval()
embeddings = []
with torch.no_grad():
    for xb, _ in full_loader:
        xb = xb.to(device)
        _, z = model(xb)
        embeddings.append(z.cpu().numpy())
embeddings = np.vstack(embeddings)   

In [None]:
embeddings.shape

In [None]:
emb_df = pd.DataFrame(embeddings, index=iid, columns=[f"g_emb_{i}" for i in range(1, embeddings.shape[1]+1)])
emb_df.index.name = IID_col
emb_df.reset_index(inplace=True)
emb_df.to_csv(OUT_EMBED_CSV, index=False)

In [None]:
emb_df 

In [None]:
import numpy as np
from sklearn.decomposition import PCA


latent_dim = 128  
pca = PCA(n_components=latent_dim, svd_solver='randomized')
X_pca = pca.fit_transform(X_train_scaled)
Xhat_pca = pca.inverse_transform(X_pca)
mse_pca = np.mean((X_train_scaled - Xhat_pca)**2)
print("PCA MSE:", mse_pca)


In [None]:
pca = PCA(n_components=128, svd_solver='randomized', random_state=42)
pca.fit(X_train_scaled)

X_val_pca = pca.transform(X_val_scaled)
X_val_reconstructed = pca.inverse_transform(X_val_pca)

In [None]:
mse_pca_val = np.mean((X_val_scaled - X_val_reconstructed)**2)

In [None]:
from sklearn.metrics import mean_squared_error
mse_pca_val = mean_squared_error(X_val_scaled, X_val_reconstructed)

In [None]:
mse_pca_val

In [None]:
baseline = np.mean(X_val_scaled**2)
print("MSE:", baseline)