In [1]:
import torch
import torchmetrics

import numpy  # NOTE: install v1.26.4
import sklearn

In [2]:
dtype_torch_f32 = torch.float32

In [3]:
class BinaryLogisticRegression(torch.nn.Module):
    
    def __init__(self, input_dim: int, output_dim: int = 1):
        
        super().__init__()

        self.linear = torch.nn.Linear(in_features=input_dim, out_features=output_dim, bias=True)
        self.metrics = {'accuracy': torchmetrics.Accuracy(task="binary")}

        self.device = torch.device("cpu")
        self.to(self.device)


    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.linear(x).squeeze(-1)  # logits; squeeze() makes [x, 1] -> [x] i.e., just formatting 


    def reg_loss(self, l1: float) -> torch.Tensor:
        
        loss = torch.tensor(0.0, device=self.device)
        
        if l1 > 0.0:
            loss += l1 * torch.sum(abs(self.linear.weight)) 

        return loss


    # inference
    def compute_prob(self, x) -> numpy.ndarray:
        self.eval()

        if isinstance(x, numpy.ndarray):
            x = torch.from_numpy(x).float()

        elif not isinstance(x, torch.Tensor):
            raise TypeError("Instance must have type: 1) numpy.ndarray or 2) torch.Tensor")
                
        with torch.no_grad():
            logits = self(x.to(self.device))
            
        return torch.sigmoid(logits).cpu().numpy().ravel()


    @torch.no_grad()
    def predict(self, x, cl_threshold: float = 0.5) -> numpy.ndarray:

        return (self.compute_prob(x) > cl_threshold).astype(int)
    

    # test
    def test(self, dataloader: torch.utils.data.DataLoader, loss_fun, metrics=None):

        if metrics is None:
            metrics = []

        self.eval()

        loss = 0
        with torch.no_grad():
            
            for X, y in dataloader:
                
                X, y = X.to(self.device, dtype=dtype_torch_f32), y.to(self.device, dtype=dtype_torch_f32)
                
                prediction = self(X)
                
                loss += loss_fun(prediction, y).item() * len(X)
                
                for m in metrics:
                    m.update(torch.sigmoid(prediction), y.int())        

        return loss / len(dataloader.dataset)


    # training
    def train_epoch(self, dataloader: torch.utils.data.DataLoader, optim: torch.optim.Optimizer, loss_fun: callable, l1: float, l2: float):

        # train mode
        self.train()

        train_loss = 0
        for _, (X, y) in enumerate(dataloader):

            X, y = X.to(self.device, dtype=dtype_torch_f32), y.to(self.device, dtype=dtype_torch_f32)    

            # reset gradients
            optim.zero_grad()                      

            # prediction error
            loss  = loss_fun(self(X), y)
            loss += self.reg_loss(l1)

            # training loss
            train_loss += loss.item() * len(X)

            # backpropagation
            loss.backward()
            optim.step()

        return train_loss / len(dataloader.dataset)


    def train_model(self, 
                    train_dataloader: torch.utils.data.DataLoader, 
                    validation_dataloader: torch.utils.data.DataLoader,
                    learning_rate: float, 
                    l1: float = 0.0,
                    l2: float = 0.0,
                    momentum: float = 0.9,
                    loss_fun: callable = torch.nn.BCEWithLogitsLoss(),
                    optimizer: str = "Adam",
                    num_epochs: int = 100):

            train_losses = []
            validation_losses = []
            
            # Initialize dictionary to store results
            metrics_on_validation_data = {k: [] for k in ['accuracy', 'precision', 'prauc', 'recall', 'f1', 'auroc', 'specificity', 'mcc', 'logauc']}

            # Group metrics into a ModuleList or a standard list for easier management
            # Note: torchmetrics.MetricCollection is designed exactly for this purpose!
            val_metrics = torchmetrics.MetricCollection(
                {
                    'accuracy':    torchmetrics.Accuracy(task="binary"),
                    'precision':   torchmetrics.Precision(task="binary"),
                    'prauc':       torchmetrics.AveragePrecision(task="binary"),
                    'recall':      torchmetrics.Recall(task="binary"),
                    'f1':          torchmetrics.F1Score(task="binary"),
                    'auroc':       torchmetrics.AUROC(task="binary"),
                    'specificity': torchmetrics.Specificity(task="binary"),
                    'mcc':         torchmetrics.MatthewsCorrCoef(task="binary"),
                    'logauc':      torchmetrics.LogAUC(task="binary")
                }
            ).to(self.device)


            # NOTE: configuring weight_decay > 0.0 in torch.optim effectively applies L2 regularization, hence the variable l2 is passed 
            if optimizer == 'Adam':
                optim = torch.optim.Adam(self.parameters(), lr=learning_rate)   #NOTE: only applies L1 (LASSO) if l1 > 0.0

            elif optimizer == 'AdamW':
                optim = torch.optim.Adam(self.parameters(), lr=learning_rate, weight_decay=l2)

            elif optimizer == 'RMS':
                optim = torch.optim.SGD(self.parameters(),  lr=learning_rate,  weight_decay=l2, momentum=momentum)
            
            else:
                raise ValueError("@optimizer should be either 'Adam' (ADAM optimizer), 'AdamW' (idem. weight decay) or 'RMS' for RMSProp")

            for _ in range(num_epochs):
                
                train_loss = self.train_epoch(dataloader=train_dataloader, loss_fun=loss_fun, optim=optim, l1=l1, l2=l2)
                train_losses.append(train_loss)

                if validation_dataloader is not None:
                    
                    # Reset all metrics at the start of validation
                    val_metrics.reset()
                    
                    metric_list = list(val_metrics.values())
                    
                    validation_loss = self.test(dataloader=validation_dataloader, loss_fun=loss_fun, metrics=metric_list)
                    validation_losses.append(validation_loss)

                    computed_metrics = val_metrics.compute()
                    
                    for k, v in computed_metrics.items():
                        metrics_on_validation_data[k].append(v.item())

            return train_losses, validation_losses, metrics_on_validation_data
        

In [6]:
import torch
import numpy as np
from torch.utils.data import Subset, DataLoader
from sklearn.model_selection import KFold, ParameterGrid
from tqdm import tqdm

def grid_search_cv(training_dataset, 
                   param_dict, 
                   loss_factory: callable, 
                   k_folds=5, 
                   num_epochs=20, 
                   batch_size=32, 
                   device="cpu"):
    """
    Performs K-Fold Cross Validation over a grid of hyperparameters.
    """
    
    # 0. get input dimension 
    INPUT_DIM = training_dataset.tensors[0].shape[1]

    # 1. Create the grid of all hyperparameter combinations
    grid = list(
            ParameterGrid(param_dict)
        )
    
    # 2. Setup K-Fold splitter (Shuffle=True is important!)
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    
    results = []
    
    # Outer Loop: Iterate over hyperparameter combinations
    print(f"Starting Grid Search with {len(grid)} candidates...")
    
    for params in tqdm(grid, desc="Grid Search"):
        
        fold_scores = []
        
        # Inner Loop: Cross Validation
        # kfold.split yields indices directly, no need for manual math
        for _, (train_idx, val_idx) in enumerate(kfold.split(training_dataset)):
            
            # Create Subsets and Loaders
            train_sub = Subset(training_dataset, train_idx)
            val_sub   = Subset(training_dataset, val_idx)
            
            train_loader = DataLoader(train_sub, batch_size=batch_size, shuffle=True)
            val_loader   = DataLoader(val_sub,   batch_size=batch_size, shuffle=False)
            
            # TODO: read input_dim dynamically 
            model = BinaryLogisticRegression(input_dim=INPUT_DIM)
            model.to(device)
            
            # Create a fresh loss function (in case it holds state)
            loss_fn = loss_factory()

            _, _, metrics = model.train_model(
                train_dataloader=train_loader,
                validation_dataloader=val_loader,
                loss_fun=loss_fn,
                num_epochs=num_epochs,
                **params 
            )
            
            # Store the metric of interest (e.g., PR-AUC from the last epoch)
            # You might want to take max() instead of last index [-1] depending on stability
            fold_scores.append(metrics['prauc'][-1])
        
        # Average score across folds
        avg_score = np.mean(fold_scores)
        std_score = np.std(fold_scores)
        
        results.append({
            'params': params,
            'avg_score': avg_score,
            'std_score': std_score
        })
    
    # TODO: update 
    results.sort(key=lambda x: x['avg_score'], reverse=True)
    
    best_result = results[0]
    return best_result['params'], best_result['avg_score'], results

In [17]:
# TODO: load data from .xlsx to X_training, y_training, X_test and y_test <'numpy.ndarray'>
#X_tensor_training = torch.tensor(X_training, dtype=torch.float32)
#y_tensor_training = torch.tensor(y_training, dtype=torch.float32)

#training_tensor_dataset = torch.utils.data.TensorDataset(X_tensor_training, y_tensor_training)

#X_tensor_testing = torch.tensor(X_testing, dtype=torch.float32)
#y_tensor_testing = torch.tensor(y_testing, dtype=torch.float32)

#testing_tensor_dataset = torch.utils.data.TensorDataset(X_tensor_testing, y_tensor_testing)


## DUMMY DATA 

from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

INPUT_DIM = 20

X, y = make_classification(n_samples=1000, n_features=INPUT_DIM, weights=[0.9, 0.1], random_state=42)

X = numpy.array(X)
y = numpy.array(y)


# TODO: replace this with respect to time
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y  
)


scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
# use: X_test_scaled = scaler.transform(X_test) 

# y not scaled due to binary 1/0 

print(type(X), type(y))

training_tensor_dataset_scaled = torch.utils.data.TensorDataset(torch.tensor(X_train_scaled, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))


# B. Define Hyperparameter Grid
parameter_grid = {
    'learning_rate': [0.01, 0.001],
    'l1': [0.0, 0.01],
    'l2': [0.0, 0.01]
}
    
# TODO: import from log_reg_bin_loss_factories
# DiceLoss, FocalLoss, JaccardLoss, OptimizedAsymmetricFocalLoss, TverskyBCELoss, TverskyLoss
from log_reg_bin_loss_factories.TverskyBCELoss import *
loss_factory = ComboLoss

# D. Run Grid Search
optimum_parameters, result_optimum_parameters, result_grid = grid_search_cv(training_tensor_dataset_scaled, param_dict=parameter_grid, loss_factory=loss_factory)

print(f"WINNER PARAMS: {optimum_parameters}")
print(f"WINNER PR-AUC: {result_optimum_parameters:.4f}")


<class 'numpy.ndarray'> <class 'numpy.ndarray'>
Starting Grid Search with 8 candidates...


  bounds = torch.log10(torch.tensor(fpr_range))
Grid Search: 100%|██████████| 8/8 [00:14<00:00,  1.76s/it]

WINNER PARAMS: {'l1': 0.01, 'l2': 0.0, 'learning_rate': 0.01}
WINNER PR-AUC: 0.7066





In [None]:
## POLARS

import polars as pl
import torch

# Example Polars DF
df = pl.read_excel("your_file.xlsx")  # or pl.read_csv(), etc.

# Separate features and target (adjust column names)
X_pl = df.select(pl.col("^feature.*$"))  # or explicit list of feature columns
y_pl = df.select("target")

# Direct to torch tensor
X_tensor = X_pl.to_torch()          # Shape: (n_samples, n_features), dtype=torch.float32 if f64
y_tensor = y_pl.to_torch().float()  # Ensure float32 for labels

