## Import packages you need

In [None]:
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import random_split
import csv

In [None]:
myseed=666
torch.manual_seed(myseed)
np.random.seed(myseed)

## Helper Functions

In [None]:
class EarlyStopper:
    """Early stopping utility to stop training when validation loss stops improving."""

    def __init__(self, num_trials, save_path):
        """Initialize EarlyStopper.
        
        Args:
            num_trials: Number of trials to wait before stopping
            save_path: Path to save the best model weights
        """
        self.num_trials = num_trials
        self.trial_counter = 0
        self.best_loss = 1000000.0
        self.save_path = save_path

    def is_continuable(self, model, loss):
        """Check if training should continue.
        
        Args:
            model: Model to save if loss improves
            loss: Current validation loss
            
        Returns:
            bool: True if training should continue, False otherwise
        """
        if loss < self.best_loss:
            self.best_loss = loss
            self.trial_counter = 0
            torch.save(model.state_dict(), self.save_path)
            return True
        elif self.trial_counter + 1 < self.num_trials:
            self.trial_counter += 1
            return True
        return False


def calculate_loss(loader, model, criterion):
    """Calculate loss over entire data loader.
    
    Args:
        loader: Data loader
        model: Model to evaluate
        criterion: Loss function
        
    Returns:
        torch.Tensor: Computed loss
    """
    predictions = []
    labels = []
    for field, label in loader:
        field = field.float()
        label = label.float()
        prediction = model(field)
        predictions.append(prediction)
        labels.append(label)
    predictions = torch.cat(predictions, dim=0)
    labels = torch.cat(labels, dim=0)
    return criterion(predictions, labels)


def predict(test_loader, model):
    """Generate predictions for test data.
    
    Args:
        test_loader: Test data loader
        model: Model to use for prediction
        
    Returns:
        numpy.ndarray: Array of predictions
    """
    predictions = []
    for field in test_loader:
        field = field.float()
        prediction = model(field)
        predictions.append(prediction)
    predictions = torch.cat(predictions, dim=0)
    return predictions.detach().numpy()

In [None]:
class ReadDataset(Dataset):
    """Dataset class for reading and preprocessing CSV data."""
    
    def __init__(self, path, mean=None, std=None, is_test=False, standardize=False):
        """Initialize the dataset.
        
        Args:
            path: Path to CSV file
            mean: Mean values for standardization (required for test set if standardize=True)
            std: Standard deviation values for standardization
            is_test: Whether this is a test dataset
            standardize: Whether to standardize the features
        """
        super().__init__()
        self.is_test = is_test
        self.field = pd.read_csv(path, index_col=0)
        self.field.dropna(axis=0, how='any', inplace=True)
        self.standardize = standardize
        assert not self.field.isnull().values.any()
        
        # For training set, get mean and std
        if not is_test:
            self.labels = self.field.values[:, -1]
            self.features = self.field.values[:, :-1]
            self.mean = np.mean(self.features, axis=0)
            self.std = np.std(self.features, axis=0)
        # For test set, use mean and std from training set to normalize
        else:
            self.features = self.field.values
            self.labels = None
            if standardize:
                if mean is None or std is None:
                    raise ValueError(
                        "Mean and std must be provided for test dataset when standardize=True."
                    )
                self.mean = mean
                self.std = std

    def __len__(self):
        """Return the number of samples in the dataset."""
        return len(self.features)

    def __getitem__(self, idx):
        """Get a sample from the dataset.
        
        Args:
            idx: Index of the sample to retrieve
            
        Returns:
            Tuple of (features, label) if not test set, otherwise just features
        """
        if self.standardize:
            normalized_features = (self.features[idx] - self.mean) / self.std
            if not self.is_test:
                return normalized_features, self.labels[idx]
            return normalized_features
        
        if not self.is_test:
            return self.features[idx], self.labels[idx]
        return self.features[idx]

## Define DNN by pytorch

In [None]:
class CustomNeuralNetwork(nn.Module):
    def __init__(self, input_size=40, output_size=1, hidden_layers=[64, 32], activation='relu', dropout_rate=0.5, leakyrelu_negative_slope=0.01, early_stopper_num_trials=20, save_path='./model.pt', scheduler=True):
        """
        Custom Neural Network for hyperparameter tuning.

        Args:
            input_size (int): Size of the input features (default: 40).
            output_size (int): Size of the output (default: 1).
            hidden_layers (list): List of integers representing the number of neurons in each hidden layer.
            activation (str): Activation function to use ('relu', 'sigmoid', 'tanh', or 'leakyrelu').
            dropout_rate (float): Dropout rate for regularization (default: 0.5).
            leakyrelu_negative_slope (float): Negative slope for LeakyReLU (default: 0.01).
        """
        super().__init__()
        
        self.early_stopper = EarlyStopper(early_stopper_num_trials, save_path)

        self.hidden_layers = nn.ModuleList()
        self.batchnorm_layers = nn.ModuleList()
        
        self.dropout_rate = dropout_rate
        
        if dropout_rate:
            self.dropout = nn.Dropout(dropout_rate)
            
        self.sig = nn.Sigmoid()

        # Input layer
        self.hidden_layers.append(nn.Linear(input_size, hidden_layers[0]))
        self.batchnorm_layers.append(nn.BatchNorm1d(input_size))
        

        # Hidden layers
        for i in range(1, len(hidden_layers)):
            self.hidden_layers.append(nn.Linear(hidden_layers[i - 1], hidden_layers[i]))
            self.batchnorm_layers.append(nn.BatchNorm1d(hidden_layers[i-1]))
            

        # Output layer
        self.output_layer = nn.Linear(hidden_layers[-1], output_size)

        # Activation function
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'sigmoid':
            self.activation = nn.Sigmoid()
        elif activation == 'tanh':
            self.activation = nn.Tanh()
        elif activation == 'leakyrelu':
            self.activation = nn.LeakyReLU(negative_slope=leakyrelu_negative_slope)
        elif activation == 'swish':
            self.activation = lambda x: x * self.sig(x)
        else:
            raise ValueError("Activation function not supported. Choose 'relu', 'sigmoid', 'tanh', or 'leakyrelu', 'swish'.")

    def forward(self, x):
        """
        Forward pass through the network.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, input_size).

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, output_size).
        """
        
        
        for i, layer in enumerate(self.hidden_layers):
            x = self.batchnorm_layers[i](x)
            x = layer(x)
            x = self.activation(x)
            if self.dropout_rate:
                x = self.dropout(x)
        x = self.output_layer(x)
        x = self.sig(x).squeeze()
        return x

    def train_model(self, train_loader, val_loader, optimizer='adam', learning_rate=0.001, epochs=50, criterion=nn.BCELoss(), l2_lambda=0.0005, lr_switch=30):
        """
        Train the model.

        Args:
            train_loader (torch.utils.data.DataLoader): DataLoader for training data.
            val_loader (torch.utils.data.DataLoader): DataLoader for validation data.
            optimizer (str): Optimizer to use ('adam', 'adagrad', 'sgd', or 'rmsprop').
            learning_rate (float): Learning rate for the optimizer (default: 0.001).
            epochs (int): Number of epochs to train (default: 50).
            criterion (torch.nn.Module): Loss function (default: BCELoss for binary classification).
        """
        
        # Define optimizer
        if optimizer == 'adam':
            optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate, weight_decay=l2_lambda)
        elif optimizer == 'adagrad':
            optimizer = torch.optim.Adagrad(self.parameters(), lr=learning_rate, weight_decay=l2_lambda)
        elif optimizer == 'sgd':
            optimizer = torch.optim.SGD(self.parameters(), lr=learning_rate, momentum=0.9, weight_decay=l2_lambda)
        elif optimizer == 'rmsprop':
            optimizer = torch.optim.RMSprop(self.parameters(), lr=learning_rate, weight_decay=l2_lambda)
        else:
            raise ValueError("Optimizer not supported. Choose 'adam', 'adagrad', 'sgd', or 'rmsprop'.")
        
        lr_lambda = lambda epoch: (epoch/lr_switch) if epoch < lr_switch else 0.5*(np.cos((epoch-lr_switch)/(learning_rate-lr_switch)*np.pi)+1)
        scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda = lr_lambda)


        for epoch in range(epochs):
            self.train()
            train_loss = 0
            for inputs, targets in train_loader:
                 # Convert inputs and targets to Float
                
                inputs = inputs.float()
                targets = targets.float()
                
                outputs = self(inputs)
                
                optimizer.zero_grad()
                
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            # Average training loss for the epoch
            train_loss /= len(train_loader)

            # Validation
            if len(val_loader) != 0:
                self.eval()
                val_loss = 0
                with torch.no_grad():
                    for inputs, targets in val_loader:
                        inputs = inputs.float()
                        targets = targets.float()
                        
                        outputs = self(inputs)
                        val_loss += criterion(outputs, targets).item()
                        
                val_loss /= len(val_loader)
                val_loss = f"{val_loss:.4f}"
            else:
                if epoch % 25 == 0:
                    val_loss = "Training on whole training set, no Val Data"
                else:
                    val_loss = "..."
            
            scheduler.step()
            if not self.early_stopper.is_continuable(self, loss):
                print("Early stop due to no further progress!")
                break
            
            
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss}")

# Selecting Appropriate Hyperparameters

In [None]:
# Loading dataset
standardize = True
split_ratio = 1
batch_size = 512

# model parameters
input_size = 40
output_size = 1
hidden_layers = [64, 32, 8]
activation = 'swish'

dropout_rate = 0
epochs = 50

optimizer = 'adagrad'
learning_rate = 0.01
l2_lambda = 0.0005
lr_switch = 30

early_stopper_num_trials = 365

pos_threshold = 0.5

save_path = "./model.pt"

sampler_on = True
class_weights = [1,3]

In [None]:

if not standardize:
    train_data = ReadDataset("./data/train.csv", standardize=standardize)
    test_data = ReadDataset("./data/test.csv", is_test=True, standardize=standardize)
else:
    train_data = ReadDataset("./data/train.csv", standardize=True)
    test_data = ReadDataset("./data/test.csv", mean=train_data.mean, std=train_data.std, is_test=True, standardize=True)


# Calculate total number of training samples
len_train = len(train_data)

# Split training data into training and validation sets
split_num = [
    int(len_train * split_ratio),  # Training portion size
    len_train - int(len_train * split_ratio)  # Validation portion size
]
train_data, val_data = random_split(
    dataset=train_data,
    lengths=split_num,
    generator=torch.Generator().manual_seed(myseed))  # Fixed random seed for reproducibility

# Class Weighting (Using Weighted Sampler)
if sampler_on: # Adjusting based on dataset imbalance
    sample_weights = [class_weights[int(label)] for _, label in train_data]
    # Create weighted sampler to balance class distribution during training
    sampler = torch.utils.data.WeightedRandomSampler(sample_weights, len(sample_weights), replacement=True) # Allows resampling of minority classes
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=False, sampler=sampler)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
    
else:
    # Create standard data loaders without class balancing
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)


print("Num of Samples: Train: {}, Validation: {}, Test: {}".format(len(train_data), len(val_data), len(test_data)))

Num of Samples: Train: 14971, Validation: 0, Test: 9982


In [8]:
# Initialize model
model = CustomNeuralNetwork(input_size=input_size, output_size=output_size,
                            hidden_layers=hidden_layers, activation=activation, 
                            dropout_rate=dropout_rate, early_stopper_num_trials=early_stopper_num_trials)

# Train the model
model.train_model(train_loader, val_loader, optimizer=optimizer, learning_rate=learning_rate, epochs=epochs, l2_lambda=l2_lambda, lr_switch=lr_switch)

Epoch 1/50, Train Loss: 0.6925, Val Loss: Training on whole training set, no Val Data
Epoch 2/50, Train Loss: 0.6864, Val Loss: ...
Epoch 3/50, Train Loss: 0.6740, Val Loss: ...
Epoch 4/50, Train Loss: 0.6595, Val Loss: ...
Epoch 5/50, Train Loss: 0.6471, Val Loss: ...
Epoch 6/50, Train Loss: 0.6309, Val Loss: ...
Epoch 7/50, Train Loss: 0.6155, Val Loss: ...
Epoch 8/50, Train Loss: 0.5996, Val Loss: ...
Epoch 9/50, Train Loss: 0.5824, Val Loss: ...
Epoch 10/50, Train Loss: 0.5589, Val Loss: ...
Epoch 11/50, Train Loss: 0.5412, Val Loss: ...
Epoch 12/50, Train Loss: 0.5210, Val Loss: ...
Epoch 13/50, Train Loss: 0.5003, Val Loss: ...
Epoch 14/50, Train Loss: 0.4822, Val Loss: ...
Epoch 15/50, Train Loss: 0.4566, Val Loss: ...
Epoch 16/50, Train Loss: 0.4436, Val Loss: ...
Epoch 17/50, Train Loss: 0.4253, Val Loss: ...
Epoch 18/50, Train Loss: 0.4045, Val Loss: ...
Epoch 19/50, Train Loss: 0.3838, Val Loss: ...
Epoch 20/50, Train Loss: 0.3677, Val Loss: ...
Epoch 21/50, Train Loss: 0.35

In [9]:
test_predict = predict(test_loader, model)>=0.5
print("Test predictions: ", ([int(x) for x in test_predict]))

Test predictions:  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0,

In [None]:
from sklearn.metrics import confusion_matrix

def get_confusion_matrix(model, data_loader, positive_threshold=0.5):
    """Calculate confusion matrix for a binary classification model.
    
    Args:
        model: Trained PyTorch model to evaluate
        data_loader: DataLoader containing the dataset to evaluate
        positive_threshold: Decision threshold for positive class (default: 0.5)
        
    Returns:
        numpy.ndarray: 2x2 confusion matrix [[TN, FP], [FN, TP]]
    """
    predictions = []
    true_labels = []
    
    # Iterate through data loader to collect predictions and labels
    for features, labels in data_loader:
        # Ensure correct data types
        features = features.float()
        labels = labels.float()
        
        # Get model predictions
        batch_predictions = model(features)
        predictions.append(batch_predictions)
        true_labels.append(labels)
    
    # Concatenate all batch results and convert to numpy
    predictions = torch.cat(predictions, dim=0).cpu().detach().numpy()
    true_labels = torch.cat(true_labels, dim=0).cpu().detach().numpy()
    
    # Convert probabilities to binary predictions using threshold
    binary_predictions = (predictions > positive_threshold).astype(int)
    
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, binary_predictions)
    return cm

In [11]:
get_confusion_matrix(model, train_loader, 0.5)

array([[12570,   253],
       [ 1039,  1109]], dtype=int64)

In [12]:
np.array(test_predict).sum() 

869

In [13]:
def save_pred(preds, file):
    print('Saving results to {}'.format(file))
    with open(file, 'w') as fp:
        writer = csv.writer(fp)
        writer.writerow(['id', 'tested_positive'])
        for i, p in enumerate(preds):
            writer.writerow([i, p])
save_pred([int(x) for x in test_predict], 'prediction_team11.csv')         # save prediction file to pred.csv

Saving results to prediction_team11.csv
