## preprocessing

In [3]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, TensorDataset

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(0)  # ensure reproducibility
np.random.seed(0)
BATCH_SIZE = 32

In [4]:
from sklearn.metrics import confusion_matrix, make_scorer

def evaluate_error(preds, gt):
    # Define the cost matrix
    cost_matrix = np.array([[0, 1, 2], 
                            [1, 0, 1], 
                            [2, 1, 0]])
    
    # Calculate the confusion matrix
    conf_matrix = confusion_matrix(gt, preds)
    
    # Calculate the error value
    err = np.sum(conf_matrix * cost_matrix) / len(gt)
    
    return err


def get_scorer():
    return make_scorer(evaluate_error,greater_is_better=False)

In [7]:
class DataTransformer():
    def __init__(self, val_size=0.25,
                 encodeTarget=False,
                 dummies=False,
                 composite_features=False,
                 scaler="standard",
                 imputer="simple",
                 only_use_compounds=False):
        
        # self.group_dictionary = pd.read_csv('data/group_dictionary.csv', sep=';')
        self.test_data_no_target = pd.read_csv('../data/test_data_no_target.csv', sep=';', decimal=',')
        self.training_data = pd.read_csv('../data/training_data.csv', sep=';', decimal=',')
        # self.column_names_dictionary = pd.read_csv('data/column_names_dictionary.csv', sep=';')

        if encodeTarget:
            self.labelEncodeTarget()

        if dummies:
            self.makeDummies()
        
        self.X = self.training_data.drop(columns=['Class','Perform'])  
        self.y = self.training_data[['Class']]

        
        # collect cat_cols, bin_cols, num_cols
        # num_cols are then used for scaling
        # cat_cols are gonna be encoded
        self.cat_cols = []
        self.bin_cols = []
        self.num_cols = []
        for col in self.X.columns.tolist():
            if len(self.X[col].value_counts()) > 2 and len(self.X[col].value_counts())<10:
                self.cat_cols.append(col)
            elif len(self.X[col].value_counts()) == 2:
                self.bin_cols.append(col)
            else:
                self.num_cols.append(col)

    
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(self.X, self.y, test_size=val_size, random_state=42)

        if imputer == "simple":
            self.simpleImpute(0)

        if composite_features == True:
            self.create_composite_features()
            for col in self.X.columns.tolist():
                if len(self.X[col].value_counts())>10:
                    self.num_cols.append(col)
        
        if scaler == "standard":
            self.standardScale()
        
        elif scaler == "minmax":
            self.minMaxScale()

        
    
    def get_X_train(self):
        return self.X_train

    
    def get_y_train(self):
        return self.y_train

    
    def get_X_val(self):
        return self.X_val

    
    def get_y_val(self):
        return self.y_val

    
    def makeDummies(self):
        self.training_data = pd.concat((self.training_data.drop(columns=['Group']), pd.get_dummies(self.training_data['Group'])), axis=1)
        self.test_data_no_target = pd.concat((self.test_data_no_target.drop(columns=['Group']), pd.get_dummies(self.test_data_no_target['Group'])), axis=1)
        

    def knnImpute(self):
        self.knnImputer = KNNImputer(n_neighbors=4)
        self.X_train[self.num_cols] = self.knnImputer.fit_transform(self.X_train[self.num_cols])
        self.X_val[self.num_cols] = self.knnImputer.transform(self.X_val[self.num_cols])
        self.test_data_no_target[self.num_cols] = self.knnImputer.transform(self.test_data_no_target[self.num_cols])


    def simpleImpute(self, value):
        self.simpleImputer = SimpleImputer(strategy='mean')
        self.X_train[self.num_cols] = self.simpleImputer.fit_transform(self.X_train[self.num_cols])
        self.X_val[self.num_cols] = self.simpleImputer.transform(self.X_val[self.num_cols])
        self.test_data_no_target[self.num_cols] = self.simpleImputer.transform(self.test_data_no_target[self.num_cols])


    def standardScale(self):
        self.stdScaler = StandardScaler()
        self.X_train[self.num_cols] = self.stdScaler.fit_transform(self.X_train[self.num_cols])
        self.X_val[self.num_cols] = self.stdScaler.transform(self.X_val[self.num_cols])
        self.test_data_no_target[self.num_cols] = self.stdScaler.transform(self.test_data_no_target[self.num_cols])
        

    def minMaxScale(self):
        self.minMaxScale = MinMaxScaler()
        self.X_train[self.num_cols] = self.minMaxScale.fit_transform(self.X_train[self.num_cols])
        self.X_val[self.num_cols] = self.minMaxScale.transform(self.X_val[self.num_cols])
        self.test_data_no_target[self.num_cols] = self.minMaxScale.transform(self.test_data_no_target[self.num_cols])

    
    def PCASelection(self):
        pass

    
    def labelEncodeTarget(self):
        # label encodes target to be 0, 1, 2
        self.target_encoder = LabelEncoder()
        self.training_data['Class'] = self.training_data[['Class']].apply(self.target_encoder.fit_transform)
        

    def labelDecodeTarget(self, data):
        # returns python list decoding back to -1, 0, 1
        return self.target_encoder.inverse_transform(data.ravel()).tolist()


    def getCompundFeatures(self):
        if create_composite_features:
            self.create_composite_features()
    



    def create_composite_features(self):
        # Liquidity Ratios
        self.X_train['Liquidity Ratio'] = self.X_train[['I49', 'I50', 'I52']].mean(axis=1)
        self.X_val['Liquidity Ratio'] = self.X_val[['I49', 'I50', 'I52']].mean(axis=1)
        self.test_data_no_target['Liquidity Ratio'] = self.test_data_no_target[['I49', 'I50', 'I52']].mean(axis=1)
        
        # Efficiency Ratios
        self.X_train['Efficiency Ratio'] = self.X_train[['I21', 'I22', 'I24', 'I25', 'I28']].mean(axis=1)
        self.X_val['Efficiency Ratio'] = self.X_val[['I21', 'I22', 'I24', 'I25', 'I28']].mean(axis=1)
        self.test_data_no_target['Efficiency Ratio'] = self.test_data_no_target[['I21', 'I22', 'I24', 'I25', 'I28']].mean(axis=1)
        
        # Calculate Total Equity
        self.X_train['Total Equity'] = self.X_train['I19'] / self.X_train['I53']
        self.X_val['Total Equity'] = self.X_val['I19'] / self.X_val['I53']
        self.test_data_no_target['Total Equity'] = self.test_data_no_target['I19'] / self.test_data_no_target['I53']

        # Calculate Total Assets
        self.X_train['Total Assets'] = self.X_train['Total Equity'] / (1 - self.X_train['I54'])
        self.X_val['Total Assets'] = self.X_val['Total Equity'] / (1 - self.X_val['I54'])
        self.test_data_no_target['Total Assets'] = self.test_data_no_target['Total Equity'] / (1 - self.test_data_no_target['I54'])

        # Calculate Equity Multiplier
        self.X_train['Equity Multiplier'] = self.X_train['Total Assets'] / self.X_train['Total Equity']
        self.X_val['Equity Multiplier'] = self.X_val['Total Assets'] / self.X_val['Total Equity']
        self.test_data_no_target['Equity Multiplier'] = self.test_data_no_target['Total Assets'] / self.test_data_no_target['Total Equity']

        # Leverage Ratios
        self.X_train['Leverage Ratio'] = self.X_train[['I17', 'I19', 'I55', 'I54', 'Equity Multiplier']].mean(axis=1)
        self.X_val['Leverage Ratio'] = self.X_val[['I17', 'I19', 'I55', 'I54', 'Equity Multiplier']].mean(axis=1)
        self.test_data_no_target['Leverage Ratio'] = self.test_data_no_target[['I17', 'I19', 'I55', 'I54', 'Equity Multiplier']].mean(axis=1)
        
        # Profitability Ratios
        self.X_train['Profitability Ratio'] = self.X_train[['I1', 'I2', 'I6', 'I11', 'I34', 'I35', 'I37', 'I32', 'I33', 'I38']].mean(axis=1)
        self.X_val['Profitability Ratio'] = self.X_val[['I1', 'I2', 'I6', 'I11', 'I34', 'I35', 'I37', 'I32', 'I33', 'I38']].mean(axis=1)
        self.test_data_no_target['Profitability Ratio'] = self.test_data_no_target[['I1', 'I2', 'I6', 'I11', 'I34', 'I35', 'I37', 'I32', 'I33', 'I38']].mean(axis=1)

        # Price Ratios
        self.X_train['Price Ratios'] = self.X_train[['I41', 'I42', 'I43', 'I56', 'I58', 'I44']].mean(axis=1)
        self.X_val['Price Ratios'] = self.X_val[['I41', 'I42', 'I43', 'I56', 'I58', 'I44']].mean(axis=1)
        self.test_data_no_target['Price Ratios'] = self.test_data_no_target[['I41', 'I42', 'I43', 'I56', 'I58', 'I44']].mean(axis=1)


        
        

In [9]:
data = DataTransformer(encodeTarget=True, dummies=True, composite_features=False)
X_train = data.get_X_train()
X_val = data.get_X_val()
y_train = data.get_y_train()
y_val = data.get_y_val()


## modelling

In [12]:
# final approach will be neural network with compounds

import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

# Assuming you have preprocessed X_train and y_train as numpy arrays or torch tensors

# Convert your data into PyTorch tensors
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)  # Assuming classification, adjust dtype if needed

# Create a TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)

# Set batch size
batch_size = 64

# Create a DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define your neural network model
class MyModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(MyModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.fc4(x)
        return x

# Define your model, loss function, and optimizer
input_size = X_train.shape[1]  # Assuming X_train is a 2D array
hidden_size = 128  # You can adjust this as needed
num_classes = len(set(y_train))  # Assuming y_train contains class labels
model = MyModel(input_size, hidden_size, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10  # You can adjust this as needed
for epoch in range(num_epochs):
    running_loss = 0.0
    custom_loss = 0.0
    for inputs, labels in train_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        # custom_loss += evaluate_error(out)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')


Epoch [1/10], Loss: 0.0000
Epoch [2/10], Loss: 0.0000
Epoch [3/10], Loss: 0.0000
Epoch [4/10], Loss: 0.0000
Epoch [5/10], Loss: 0.0000
Epoch [6/10], Loss: 0.0000
Epoch [7/10], Loss: 0.0000
Epoch [8/10], Loss: 0.0000
Epoch [9/10], Loss: 0.0000
Epoch [10/10], Loss: 0.0000
