<a href="https://colab.research.google.com/github/zhixin-Xiong/DeLVR/blob/main/DeLIVR_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from torch import nn, optim

# Load data
from google.colab import drive
drive.mount("/content/drive")

simulated_IV = pd.read_csv('/content/drive/MyDrive/colab_data/simulated_IV.txt', sep=' ',header=None).values
s1_beta = pd.read_csv('/content/drive/MyDrive/colab_data/s1_beta.txt', sep=' ',header=None).values.squeeze()

# Convert to PyTorch tensors
simulated_IV = torch.tensor(simulated_IV, dtype=torch.float32)
s1_beta = torch.tensor(s1_beta, dtype=torch.float32)

s1_beta.shape

def generate_data(IV, beta, true_g, gamma, seed=0):
    # Existing code for generate_data function ...
    torch.manual_seed(seed)
    s1_size = IV.shape[0] // 10
    indices = torch.randperm(IV.shape[0])
    s1_idx, s2_idx = indices[:s1_size], indices[s1_size:]

    s1_IV = IV[s1_idx]
    s2_IV = IV[s2_idx]

    # Stage 1 data
    s1_expr = torch.matmul(s1_IV, beta) + torch.randn(s1_size)
    # Stage 2 data
    e = torch.randn(s2_IV.shape[0], 2)
    s2_expr = torch.matmul(s2_IV, beta) + e[:, 0]
    print(s2_expr)
    s2_pheno = true_g(s2_expr) + e[:, 1]
    print(s2_pheno)
    s2_expr = s2_expr.reshape(-1,1)

    return s1_IV, s1_expr.view(-1, 1), s2_IV, s2_pheno.view(-1, 1)


class LinearModel(nn.Module):
    def __init__(self, input_size):
        super(LinearModel, self).__init__()
        self.linear = nn.Linear(input_size, 1)

    def forward(self, x):
        return self.linear(x)

class Stage2Model(nn.Module):
    def __init__(self, input_shape, layer_dims, l2):
        super(Stage2Model, self).__init__()
        layers = []
        for i in range(len(layer_dims)):
            layers.append(nn.Linear(input_shape if i == 0 else layer_dims[i-1], layer_dims[i]))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(layer_dims[i]))
        layers.append(nn.Linear(layer_dims[-1], 1))
        self.model = nn.Sequential(*layers)
        self.regularization = l2

    def forward(self, x):
        return self.model(x)

    def l2_regularization(self):
        l2_reg = None
        for W in self.model.parameters():
            if l2_reg is None:
                l2_reg = W.norm(2)
            else:
                l2_reg = l2_reg + W.norm(2)
        return self.regularization * l2_reg


def train_stage1(model, X_train, y_train, epochs=100, learning_rate=0.01):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

def train_stage2(model, X_train, y_train, X_val, y_val, epochs=8000, learning_rate=0.00033):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train) + model.l2_regularization()
        loss.backward()
        optimizer.step()

def evaluate_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        predictions = model(X_test)
        mse = nn.MSELoss()(predictions, y_test)
        mae = torch.mean(torch.abs(predictions - y_test))
    return mse.item(), mae.item()

def main():
    true_g = lambda x: 3 * x ** 2  # Replace with your true function
    gamma = 0.7  # Replace with your gamma value

    s1_IV, s1_expr, s2_IV, s2_pheno = generate_data(simulated_IV, s1_beta, true_g, gamma)

    # Split s2_IV and s2_pheno into training and testing sets
    s2_IV_train, s2_IV_test, s2_pheno_train, s2_pheno_test = train_test_split(s2_IV, s2_pheno, test_size=0.2)

    # Stage 1
    stage1_model = LinearModel(s1_IV.shape[1])
    train_stage1(stage1_model, s1_IV, s1_expr)

    # Use stage 1 model to predict s2_IV
    with torch.no_grad():
        s2_expr_train = stage1_model(s2_IV_train)
        s2_expr_test = stage1_model(s2_IV_test)

    # Stage 2
    stage2_model = Stage2Model(s2_IV_train.shape[1], [32, 16, 8, 8, 8], 0.05)
    train_stage2(stage2_model, s2_IV_train, s2_pheno_train, None, None)  # Assuming no validation set for simplicity

    # Evaluate models
    mse_stage1, mae_stage1 = evaluate_model(stage1_model, s2_IV_test, s2_expr_test)
    mse_stage2, mae_stage2 = evaluate_model(stage2_model, s2_IV_test, s2_pheno_test)

    print(f"Stage 1 - MSE: {mse_stage1}, MAE: {mae_stage1}")
    print(f"Stage 2 - MSE: {mse_stage2}, MAE: {mae_stage2}")

if __name__ == '__main__':
    main()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
tensor([ 1.9670, -1.7020,  3.3290,  ...,  0.9864, -0.0566,  0.9445])
tensor([10.5890,  9.6268, 32.7296,  ...,  2.6634,  0.6466,  5.1600])
Stage 1 - MSE: 0.0, MAE: 0.0
Stage 2 - MSE: 554.4716186523438, MAE: 13.506820678710938
