# Phase 2 - Unimodal Deep Learning Models
---

## Import all necessary libraries

In [1]:
## Import required libraries and modules
# Add src to path
import sys
import os

# Add the src directory to the Python path
sys.path.append(os.path.abspath(os.path.join("..", "src")))

import logging
import torch
import pandas as pd

from utils import load_config

# Load Config to ensure reproducibility and syncing with other scripts
config = load_config("../config.yaml")

# Set logging configurations
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(message)s",
)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

logging.info(f"Device: {device}")

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
from preprocess.preprocess import split_data

# from models import FlexibleFCNN

2024-12-13 19:54:12,697 - Device: cuda


In [2]:
## Load, Split and Preprocess Dataset
# Load the dataset
tf_df = pd.read_csv(config["data_paths"]["preprocessed_tf_file"])
landmark_df = pd.read_csv(config["data_paths"]["preprocessed_landmark_file"])

# Define chunk size
chunk_size = 1000

# Initialize an empty list to store processed chunks
chunks = []

# Read the CSV file in chunks
for chunk in pd.read_csv(
    config["data_paths"]["preprocessed_gene_file"], chunksize=chunk_size):
    # Optionally process the chunk (e.g., drop columns, filter rows)
    chunks.append(chunk)

# Combine chunks into a single DataFrame (if needed)
gene_df = pd.concat(chunks, axis=0)

# Only sample a subset of the data for faster training
# tf_df = tf_df.sample(n=10000, random_state=42)
# landmark_df = landmark_df.sample(n=10000, random_state=42)

# Split data into train/validation/test sets
X_tf_train, y_tf_train, X_tf_val, y_tf_val, X_tf_test, y_tf_test = split_data(
    tf_df, target_name="viability", config=config
)
(
    X_landmark_train,
    y_landmark_train,
    X_landmark_val,
    y_landmark_val,
    X_landmark_test,
    y_landmark_test,
) = split_data(landmark_df, target_name="viability", config=config)
X_gene_train, y_gene_train, X_gene_val, y_gene_val, X_gene_test, y_gene_test = (
    split_data(gene_df, target_name="viability", config=config)
)


# Convert data to PyTorch tensors
def create_dataloader(X, y, batch_size=32):
    dataset = TensorDataset(
        torch.tensor(X.values, dtype=torch.float32),
        torch.tensor(y.values, dtype=torch.float32),
    )
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)


tf_train_loader = create_dataloader(X_tf_train, y_tf_train)
tf_val_loader = create_dataloader(X_tf_val, y_tf_val)
tf_test_loader = create_dataloader(X_tf_test, y_tf_test)

landmark_train_loader = create_dataloader(X_landmark_train, y_landmark_train)
landmark_val_loader = create_dataloader(X_landmark_val, y_landmark_val)
landmark_test_loader = create_dataloader(X_landmark_test, y_landmark_test)

gene_train_loader = create_dataloader(X_gene_train, y_gene_train)
gene_val_loader = create_dataloader(X_gene_val, y_gene_val)
gene_test_loader = create_dataloader(X_gene_test, y_gene_test)

In [3]:
feature_sets = {
    "TF Data": (X_tf_train, y_tf_train, tf_train_loader, tf_val_loader, tf_test_loader),
    "Landmark Data": (
        X_landmark_train,
        y_landmark_train,
        landmark_train_loader,
        landmark_val_loader,
        landmark_test_loader,
    ),
    "Gene Data": (
        X_gene_train,
        y_gene_train,
        gene_train_loader,
        gene_val_loader,
        gene_test_loader,
    ),
}

# Initialize a DataFrame to store results
combined_metrics = []

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import logging

ACTIVATION_MAP = {
    "relu": nn.ReLU,
    "leakyrelu": nn.LeakyReLU,
    "tanh": nn.Tanh,
    "sigmoid": nn.Sigmoid,
    "gelu": nn.GELU,
    "identity": nn.Identity,
    "prelu": nn.PReLU,
    "elu": nn.ELU,
}


class FlexibleFCNN(nn.Module):
    def __init__(
        self,
        input_dim,
        hidden_dims,
        output_dim,
        activation_fn="relu",
        dropout_prob=0.0,
        residual=False,
        use_batchnorm=True,
    ):
        """
        Flexible Fully-Connected Neural Network

        Args:
            input_dim (int): Dimensionality of input features.
            hidden_dims (list of int): List with the size of each hidden layer.
            output_dim (int): Dimension of the output.
            activation_fn (str or callable): Activation function to use.
                If str, must be in ACTIVATION_MAP. If callable, it should be a nn.Module or lambda returning Tensor.
            dropout_prob (float): Dropout probability. 0.0 means no dropout.
            residual (bool): Whether to use residual connections when possible.
            use_batchnorm (bool): Whether to use BatchNorm after each linear layer.
        """
        super(FlexibleFCNN, self).__init__()
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        self.residual = residual
        self.use_batchnorm = use_batchnorm
        self.dropout_prob = dropout_prob

        # Determine the activation function
        if isinstance(activation_fn, str):
            if activation_fn.lower() not in ACTIVATION_MAP:
                raise ValueError(f"Unknown activation function {activation_fn}")
            self.activation_fn = ACTIVATION_MAP[activation_fn.lower()]()
        else:
            # Assume callable
            self.activation_fn = activation_fn

        # Construct layers
        layer_dims = [input_dim] + hidden_dims
        self.layers = nn.ModuleList()
        self.bns = nn.ModuleList() if use_batchnorm else None
        self.dropouts = nn.ModuleList()

        for i in range(len(layer_dims) - 1):
            in_dim = layer_dims[i]
            out_dim = layer_dims[i + 1]
            self.layers.append(nn.Linear(in_dim, out_dim))

            if use_batchnorm:
                self.bns.append(nn.BatchNorm1d(out_dim))

            if dropout_prob > 0.0:
                self.dropouts.append(nn.Dropout(dropout_prob))
            else:
                # Placeholder for consistency
                self.dropouts.append(nn.Identity())

        self.output_layer = nn.Linear(layer_dims[-1], output_dim)

        # Initialize weights
        self._initialize_weights()

    def _initialize_weights(self):
        # Xavier initialization
        for layer in self.layers:
            nn.init.xavier_uniform_(layer.weight)
            if layer.bias is not None:
                nn.init.zeros_(layer.bias)

        nn.init.xavier_uniform_(self.output_layer.weight)
        if self.output_layer.bias is not None:
            nn.init.zeros_(self.output_layer.bias)

    def forward(self, x):
        # Keep track of input for possible residual connections
        for i, layer in enumerate(self.layers):
            residual_input = x
            x = layer(x)

            # BatchNorm
            if self.use_batchnorm:
                x = self.bns[i](x)

            # Dropout before activation
            x = self.dropouts[i](x)

            # Activation
            x = self.activation_fn(x)

            # Residual connection (only if dimensions match)
            if self.residual and residual_input.shape == x.shape:
                x = x + residual_input

        x = self.output_layer(x)
        return x

    def get_regularization_loss(self, l1_lambda=0.0, l2_lambda=0.0):
        """
        Compute L1 and L2 regularization losses for all linear layers.
        l1_lambda and l2_lambda are coefficients for L1 and L2 penalties respectively.

        Args:
            l1_lambda (float): Weight for L1 regularization.
            l2_lambda (float): Weight for L2 regularization.

        Returns:
            reg_loss (Tensor): The regularization loss (scalar).
        """
        reg_loss = torch.tensor(0.0, device=next(self.parameters()).device)
        if l1_lambda == 0.0 and l2_lambda == 0.0:
            return reg_loss

        for layer in self.layers:
            if l1_lambda > 0.0:
                reg_loss += l1_lambda * torch.sum(torch.abs(layer.weight))
            if l2_lambda > 0.0:
                reg_loss += l2_lambda * torch.sum(layer.weight**2)

        # Also consider output layer
        if l1_lambda > 0.0:
            reg_loss += l1_lambda * torch.sum(torch.abs(self.output_layer.weight))
        if l2_lambda > 0.0:
            reg_loss += l2_lambda * torch.sum(self.output_layer.weight**2)

        return reg_loss

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

from evaluation import evaluate_model
from training import train_model

# Step 1: Prepare the DataLoaders

# Step 2: Model Configuration
input_dim = tf_train_loader.dataset[0][0].shape[0]  # Automatically detect input size
hidden_layers = (512, 256, 128)  # Define architecture
dropout_rate = 0.3  # Dropout rate
activation = nn.GELU  # Activation function
output_activation = None  # No output activation for regression

model = FlexibleFCNN(
    input_dim=input_dim,
    hidden_dims=[512, 256, 128, 64],
    output_dim=1,
    activation_fn="prelu",
    dropout_prob=0.2,
    residual=True,
)


# Step 3: Define Training Components
device = "cuda" if torch.cuda.is_available() else "cpu"
criterion = nn.MSELoss()  # Loss function
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)  # Optimizer
scheduler = ReduceLROnPlateau(
    optimizer, mode="min", patience=5, verbose=True
)  # LR Scheduler

# Step 4: Train the Model
num_epochs = 20
train_losses, val_losses = train_model(
    model=model,
    train_loader=tf_train_loader,
    val_loader=tf_val_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    epochs=num_epochs,
    device=device,
    gradient_clipping=1.0,
    early_stopping_patience=10,
)

# Step 5: Evaluate the Model
test_loss, y_true, y_pred, metrics = evaluate_model(
    model=model,
    test_loader=tf_test_loader,
    criterion=criterion,
    device=device,
    calculate_metrics=True,
)

# Display evaluation metrics
print("Test Loss:", test_loss)
print("Evaluation Metrics:", metrics)

# Step 6: Save the Model (Optional)
torch.save(model.state_dict(), "fcnn_model.pth")

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/20 - Model, Train Loss: 0.1925, Val Loss: 0.0609
Epoch 2/20 - Model, Train Loss: 0.0649, Val Loss: 0.0606
Epoch 3/20 - Model, Train Loss: 0.0612, Val Loss: 0.0618
Epoch 4/20 - Model, Train Loss: 0.0599, Val Loss: 0.0600
Epoch 5/20 - Model, Train Loss: 0.0592, Val Loss: 0.0602
Epoch 6/20 - Model, Train Loss: 0.0588, Val Loss: 0.0600
Epoch 7/20 - Model, Train Loss: 0.0585, Val Loss: 0.0599
Epoch 8/20 - Model, Train Loss: 0.0581, Val Loss: 0.0599
Epoch 9/20 - Model, Train Loss: 0.0582, Val Loss: 0.0607
Epoch 10/20 - Model, Train Loss: 0.0577, Val Loss: 0.0607
Epoch 11/20 - Model, Train Loss: 0.0578, Val Loss: 0.0599
Epoch 12/20 - Model, Train Loss: 0.0577, Val Loss: 0.0599
Epoch 13/20 - Model, Train Loss: 0.0576, Val Loss: 0.0602
Epoch 14/20 - Model, Train Loss: 0.0575, Val Loss: 0.0598
Epoch 15/20 - Model, Train Loss: 0.0574, Val Loss: 0.0598
Epoch 16/20 - Model, Train Loss: 0.0574, Val Loss: 0.0599
Epoch 17/20 - Model, Train Loss: 0.0573, Val Loss: 0.0598
Epoch 18/20 - Model, Tr

  return F.mse_loss(input, target, reduction=self.reduction)
