In [1]:
from sklearn.model_selection import KFold, cross_val_score
import numpy as np

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

In [23]:
# FUNCTIONS TO GENERATE RANDOM BINARY FEATURE TABLES

def random_binary_vector(vector_len=64):
    rng = np.random.default_rng()
    # return ''.join(rng.integers(low=0, high=2, size=vector_len).astype(str))
    return rng.integers(low=0, high=2, size=vector_len)

def concat_random_binary_vector(num_vectors=6,vector_len=64):
    return [random_binary_vector(vector_len) for i in range(num_vectors)]

def random_input_hash_matrix(num_individuals=2600,num_vector_per_i=6,vector_len=64):
    rng = np.random.default_rng()    
    return [concat_random_binary_vector(num_vector_per_i,vector_len) for i in range(num_individuals)]

def random_output_vector(out_len=2600):
    rng = np.random.default_rng()    
    return rng.random(size=out_len)

In [28]:
num_individuals=2600
num_vector_per_i=6
vector_len=64

X = random_input_hash_matrix(num_individuals,num_vector_per_i,vector_len)
y = random_output_vector(num_individuals)

In [45]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error

random_state = 42
n_splits = 5

# Holdout test
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y, test_size=0.1, random_state=random_state)

# KFold
kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)

In [55]:
def predict(X_tensor, alpha, u, bias):
    """Forward pass using learned parameters."""
    u_norm = u / (u.norm(dim=1, keepdim=True) + 1e-12)
    proj = (X_tensor * u_norm.unsqueeze(0)).sum(dim=2)  # (n_samples, n_blocks)
    y_pred = proj @ alpha + bias
    return y_pred

def linear_regression_projection_model(X,y,loss_thresh=0.01,num_epochs=1000,lr=0.001):
    
    X_tensor = torch.tensor(np.array(X).astype(float)).float()
    n_samples, n_features, d = X_tensor.shape
    
    y_tensor = torch.tensor(y).float()
    y_tensor = y_tensor.view(-1,1)
    
    alpha = nn.Parameter(torch.randn(n_features,1))   # (n_features, 1) - haploblock specific
    u = nn.Parameter(torch.randn(n_features,d)).float()       # (n_features, d) - projection vector
    bias = nn.Parameter(torch.zeros(1))
    
    optimizer = optim.Adam([alpha, u, bias], lr=lr,weight_decay=1e-4)
    loss_fn = nn.MSELoss()

    results = {'alpha': alpha, 'u': u, 'bias': bias}
    
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        y_pred = predict(X_tensor,alpha,u,bias)
        loss = loss_fn(y_pred, y_tensor)
        loss.backward()
        optimizer.step()
        
        if loss < loss_thresh:
            print(f'End loop at {epoch} epochs!')
            break
        if epoch == num_epochs-1:
            print(f'Ending at loss {loss}')

    print_flag=False
    if print_flag:
        print("Learned alpha:", alpha.detach().numpy().flatten())
        print("Learned u vectors:\n", u.detach().numpy())
        print("Bias:", bias.item())

    results['alpha'] = alpha.detach().numpy().flatten()
    results['u'] = u.detach().numpy()
    results['bias'] = bias.item()
    
    results['train_r2'] = r2_score(y,y_pred.detach().numpy())
    results['train_mse'] = mean_squared_error(y, y_pred.detach().numpy())    

    return results

In [62]:
def compute_prediction_metrics(X_val, y_val, train_results,metric_string):    
    
    alpha = train_results['alpha']
    u = train_results['u']
    bias = train_results['bias']

    return_results = train_results.copy()
    
    # Compute validation metrics if provided
    if X_val is not None and y_val is not None:
        
        X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
        y_val_tensor = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)

        y_val_pred = predict(X_val_tensor, alpha, u, bias).detach().numpy()
        
        return_results[f'r2_{metric_string}'] = r2_score(y_val, y_val_pred)
        return_results[f'mse_{metric_string}'] = mean_squared_error(y_val, y_val_pred)
        
    return return_results

In [184]:
np.array(X).shape

(2600, 6, 64)

In [9]:
# ARCHIVED PARSING CODE:
# # PARSE INPUT TSV FILES
# import os
# import pandas as pd

# haplo_file_dict = {'hap1' : '../data/example_model_input/tnfa/tnfa_oneHaplo.tsv', 
#                   'hap2' : '../data/example_model_input/tnfa/tnfa_oneHaplo.tsv'}

# hash_table_dict = {}

# for haplo_key in haplo_file_dict.keys():

#     in_tsv = haplo_file_dict[haplo_key]    
#     hash_table = pd.read_csv(in_tsv,sep='\t')
#     hash_table['HASH_ARRAY'] = hash_table['HASH'].map(lambda v : np.array([float(char) for char in v]))

#     hash_table_dict[haplo_key] = hash_table
    
# pd.read_csv('../data/sample_avg_height_map.csv',sep='\t')
# individual_haplo_feat = pd.concat(hash_table_dict).reset_index().groupby('INDIVIDUAL')['HASH_ARRAY'].apply(list)
# X = list(individual_haplo_feat.array)
# y = random_output_vector(len(X))