In [3]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

In [4]:
# dataset class for preprocessing
class LossTriangleDataset(Dataset):
    """
    LossTriangleDataset class for preprocessing the data and creating the dataset.

    Parameters
    ----------
    data : numpy.ndarray
        The data of the dataset.
    labels : numpy.ndarray
        The labels of the dataset.
    """
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        """
        Returns the length of the dataset. This is the number of triangles in the dataset.
        """
        return len(self.data)

    def __getitem__(self, index):
        """
        Returns the data and label of the triangle with the given index. The data is a 2D numpy array and the label is a
        scalar. The data is zero-padded to the maximum number of rows of all triangles in the dataset. The data is
        normalized to have zero mean and unit variance.

        Parameters
        ----------
        index : int
            The index of the triangle.

        Returns
        -------
        data : torch.Tensor
            The data of the triangle.
        label : torch.Tensor
            The label of the triangle.
        """
        # Get data and label
        x = self.data[index]
        x = np.expand_dims(x, axis=0)  # Add channel dimension
        y = self.labels[index]
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

    @staticmethod
    def preprocess_data(dataframe):
        """
        Preprocess the data and create the data and labels. The data is zero-padded to the maximum number of rows of all
        triangles in the dataset. The data is normalized to have zero mean and unit variance.
        """
        # Drop ay column
        dataframe = dataframe.drop('ay', axis=1)

        # Replace infinity values with NaN
        dataframe.replace([np.inf, -np.inf], np.nan, inplace=True)

        # Remove nan values and normalize data
        scaler = StandardScaler()
        for triangle_id in dataframe['triangle_id'].unique():
            df_triangle = dataframe[dataframe['triangle_id'] == triangle_id].iloc[:, 2:]
            df_triangle = df_triangle.dropna(axis=1, how='all')
            dataframe.loc[dataframe['triangle_id'] == triangle_id, df_triangle.columns] = scaler.fit_transform(df_triangle)

        # Find the maximum number of rows
        max_rows = max([len(dataframe[dataframe['triangle_id'] == triangle_id]) for triangle_id in dataframe['triangle_id'].unique()])

        # Replace "nan" values with 0
        dataframe.fillna(0, inplace=True)


        # Create data and labels
        data = []
        labels = []
        for triangle_id in dataframe['triangle_id'].unique():
            df_triangle = dataframe[dataframe['triangle_id'] == triangle_id].iloc[:, 1:]
            label = df_triangle.iloc[-1, 0]
            triangle_data = df_triangle.iloc[:, 1:].values

            # Zero-padding
            nrows = triangle_data.shape[0]
            padded_triangle_data = np.zeros((max_rows, max_rows))
            for row_idx, row_data in enumerate(triangle_data):
                padded_triangle_data[row_idx, :max_rows-row_idx] = row_data[:max_rows-row_idx]

            data.append(padded_triangle_data)
            labels.append(label)

        # Convert data and labels to numpy arrays
        data = np.stack(data, axis=0)
        labels = np.array(labels)

        # Check if there are any "nan" or infinite values in the data and labels
        assert not np.isnan(data).any(), "Data contains 'nan' values"
        assert not np.isinf(data).any(), "Data contains infinite values"
        assert not np.isnan(labels).any(), "Labels contain 'nan' values"
        assert not np.isinf(labels).any(), "Labels contain infinite values"

        return data, labels

    @staticmethod
    def create_datasets(dataframe,
                        train_ratio : float = 0.6,
                        val_ratio : float = 0.2,
                        test_ratio : float = 0.2
                        ) -> (Dataset, Dataset, Dataset):
        """
        Create the train, validation and test datasets. The data is zero-padded to the maximum number of rows of all
        triangles in the dataset. The data is normalized to have zero mean and unit variance.

        Parameters
        ----------
        dataframe : pandas.DataFrame
            The dataframe containing the data.
        train_ratio : float
            The ratio of the data to be used for training.
        val_ratio : float
            The ratio of the data to be used for validation.
        test_ratio : float
            The ratio of the data to be used for testing.

        Returns
        -------
        train_dataset : LossTriangleDataset
            The train dataset.
        val_dataset : LossTriangleDataset
            The validation dataset.
        test_dataset : LossTriangleDataset
            The test dataset.

        Raises
        ------
        AssertionError
            If the sum of the ratios is not equal to 1.
        """
        assert train_ratio + val_ratio + test_ratio == 1, "The sum of the ratios must be equal to 1"

        # Preprocess data
        data, labels = LossTriangleDataset.preprocess_data(dataframe)
        
        # Split data into train and test sets
        train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=test_ratio, random_state=42)

        # Split train data into train and validation sets
        train_data, val_data, train_labels, val_labels = train_test_split(train_data, train_labels, test_size=val_ratio / (train_ratio + val_ratio), random_state=42)

        # Create datasets
        train_dataset = LossTriangleDataset(train_data, train_labels)
        val_dataset = LossTriangleDataset(val_data, val_labels)
        test_dataset = LossTriangleDataset(test_data, test_labels)

        return train_dataset, val_dataset, test_dataset


In [3]:
# cnn model class
class LossTriangleClassifier(nn.Module):
    """
    Loss triangle classifier. The model consists of:
        - 3 convolutional layers
        - 3 batch normalization layers
        - 3 max pooling
    """
    def __init__(self,
                 input_shape : tuple,
                 num_classes : int = 2,
                 dropout_rate : float = 0.5
                 ) -> None:
        super(LossTriangleClassifier, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(2)

        self.flatten = nn.Flatten()

        self.fc1 = nn.Linear(self._get_flattened_size(input_shape), 512)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout2 = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.pool1(F.leaky_relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.leaky_relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.leaky_relu(self.bn3(self.conv3(x))))

        x = self.flatten(x)
        x = F.leaky_relu(self.fc1(x))
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.dropout2(x)

        return torch.sigmoid(x)

    def _get_flattened_size(self, input_shape):
        dummy_input = torch.zeros(1, *input_shape)
        dummy_output = self.pool3(F.leaky_relu(self.bn3(self.conv3(self.pool2(F.leaky_relu(self.bn2(self.conv2(self.pool1(F.leaky_relu(self.bn1(self.conv1(dummy_input))))))))))))
        return dummy_output.numel()


In [4]:
# train model function
def train_model(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss


In [5]:
# validate model function
def validate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss


In [6]:
# train the model:
def main():
    # Load data
    dataframe = pd.read_feather('naic.feather')
    train_dataset, val_dataset, test_dataset = LossTriangleDataset.create_datasets(dataframe)

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Initialize model, criterion, and optimizer
    input_shape = train_dataset[0][0].shape
    model = LossTriangleClassifier(input_shape).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train and validate model
    num_epochs = 10
    for epoch in range(num_epochs):
        train_loss = train_model(model, train_loader, criterion, optimizer, device)
        val_loss = validate_model(model, val_loader, criterion, device)
        print(f'Epoch {epoch+1}/{num_epochs} - Training Loss: {train_loss:.4f} - Validation Loss: {val_loss:.4f}')

    # Test the model
    model.eval()
    test_predictions = []
    with torch.no_grad():
        for inputs, _ in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            test_predictions.extend(torch.argmax(outputs, dim=1).cpu().numpy())

    # Print the predictions
    print("Predictions on the test set:", test_predictions)