In [161]:
import pandas as pd
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import LinearSVC
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import classification_report, roc_curve, auc
from scipy.sparse import hstack
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from datasets import load_dataset, Image, concatenate_datasets
from torch.utils.data import DataLoader, Dataset, TensorDataset

In [182]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.flattening = nn.Flatten()
        self.fc1 = nn.Linear(4, 8)
        self.fc2 = nn.Linear(8, 4)
        self.dropout = nn.Dropout(p=0.5)
        self.fc3 = nn.Linear(4, 2)

    def forward(self, x):
        x = self.flattening(x)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.softmax(self.fc3(x), dim=1)
        return x
    
def fuse_proba(arr1, arr2):
    assert arr1.size == arr2.size
    return np.hstack([arr1, arr2])

def train(X, Y, batch_size = 64, num_epochs = 100):
    # input_dim =  X.shape[1]
    model = MLP()
    criterion = nn.BCELoss()  # binary cross-entropy loss
    optimizer = torch.optim.Adam(model.parameters())
    
    train_dataset = TensorDataset(torch.Tensor(X), torch.Tensor(Y))
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    
    # Training loop:
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader: 
            optimizer.zero_grad()
            outputs = model(inputs)
            labels = labels.view(-1, 1)  # reshape labels to match output shape
            loss = criterion(torch.unsqueeze(outputs[:, 1], dim=1), labels)  # calculate the loss using binary cross-entropy with the positive class probability and the binary label
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
    return model

def evaluate(model, X, Y):
    test_dataset = TensorDataset(torch.Tensor(X), torch.Tensor(Y))
    test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient calculation
        for inputs, labels in test_loader:
            test_outputs = model(inputs)  # Get the model's predictions
            _, predicted = torch.max(test_outputs.data, 1)  # Get the predicted class by choosing the class with highest probability
            predicted_probabilities = test_outputs[:, 1]  # Get the probability for the positive class

    # Apply a threshold to the predicted probabilities to obtain binary predictions
    threshold = 0.5
    binary_predictions = (predicted_probabilities > threshold).float()

    return binary_predictions

def performance(preds, labels):
    results = []
    report = classification_report(labels, preds, output_dict=True)
    fpr, tpr, thresholds = roc_curve(labels, preds)
    AUROC = auc(fpr, tpr)
    results = {'f1_score': report['macro avg']['f1-score'], 
               'precision': report['macro avg']['precision'], 
               'recall': report['macro avg']['recall'], 
               'accuracy': report['accuracy'], 
               'AUROC': AUROC
              }
    df_results = pd.DataFrame(results, index=[0]) 
    df_results = df_results.sort_values(by='AUROC', ascending=False)
    return df_results

def late_fuse_MLP(X_train, Y_train, X_test, Y_test):
    # print('training ...')
    model = train(X_train, Y_train)
    # print('predicting')
    pred = evaluate(model, X_test, Y_test)
    results_df = performance(pred, Y_test)
    return results_df

In [179]:
def late_fuse_MLP(X_train, Y_train, X_test, Y_test):
    # print('training ...')
    model = train(X_train, Y_train)
    # print('predicting')
    pred = evaluate(model, X_test, Y_test)
    results_df = performance(pred, Y_test)
    return results_df

In [180]:
arr1 = np.random.rand(1000,2)
arr2 = np.random.rand(1000,2)
Y = np.random.randint(2, size=1000)
arr = fuse_proba(arr1, arr2)

test_arr1 = np.random.rand(500,2)
test_arr2 = np.random.rand(500,2)
test_arr = fuse_proba(test_arr1, test_arr2)
Y_test = np.random.randint(2, size=500)

In [181]:
late_fuse_MLP(arr, Y, test_arr, Y_test)

Unnamed: 0,f1_score,precision,recall,accuracy,AUROC
0,0.531741,0.555883,0.547323,0.552,0.547323
