In [None]:
import random
import warnings

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from lightgbm import LGBMClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import (
    average_precision_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.utils.validation import check_X_y
from torch.utils.data import DataLoader, TensorDataset, WeightedRandomSampler
from tqdm import tqdm
from xgboost import XGBClassifier

warnings.filterwarnings("ignore")


In [None]:
data = pd.read_csv("./clean_data_dropped.csv")
targets = data.pop(list(data.columns)[-1])
X_train, X_test, y_train, y_test = train_test_split(
    data, targets, test_size=0.3, random_state=121, stratify=targets
)
X_val, X_test, y_val, y_test = train_test_split(
    X_test, y_test, test_size=0.67, random_state=121, stratify=y_test
)
data = {}
data["train"] = [X_train, y_train]
data["val"] = [X_val, y_val]
data["test"] = [X_test, y_test]

unique, counts = np.unique(y_train, return_counts=True)
class_counts = dict(zip(unique, counts))
class_weights = {}
total_samples = np.sum(counts)
for cls in class_counts:
    class_weights[cls] = total_samples / (len(class_counts) * class_counts[cls])
sample_weights = np.array([class_weights[cls] for cls in y_train])


In [None]:
xgb = XGBClassifier()
lgbm = LGBMClassifier()
svc = SVC(probability=True)
xgb_calibrated = CalibratedClassifierCV(xgb, method="sigmoid", cv=5)
lgbm_calibrated = CalibratedClassifierCV(lgbm, method="sigmoid", cv=5)
svc_calibrated = CalibratedClassifierCV(svc, method="sigmoid", cv=5)


In [None]:
class VotingClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, estimators, voting="hard"):
        self.estimators = estimators
        self.voting = voting

    def fit(self, X, y):
        X, y = check_X_y(X, y)
        for estimator in self.estimators:
            estimator.fit(X, y)

    def predict(self, X):
        # Use majority vote for hard voting
        if self.voting == "hard":
            votes = np.asarray([estimator.predict(X) for estimator in self.estimators])
            return np.median(votes, axis=0)

        # Use weighted average for soft voting
        elif self.voting == "soft":
            probs = np.asarray(
                [estimator.predict_proba(X) for estimator in self.estimators]
            )
            avg_probs = np.average(probs, axis=0)
            return np.argmax(avg_probs, axis=1)

    def predict_proba(self, X):
        probs = np.asarray(
            [estimator.predict_proba(X) for estimator in self.estimators]
        )
        avg_probs = np.average(probs, axis=0)
        return avg_probs

    def optimize_threshold(self, X, y_true):
        y_true = np.array(y_true)
        y_prob = self.predict_proba(X)
        fprs, tprs, thresholds = roc_curve(y_true, y_prob[:, 1])
        opt_idx = np.argmax(tprs - fprs)
        self.threshold = thresholds[opt_idx]

    def predict_with_threshold(self, X):
        if not hasattr(self, "threshold"):
            raise AttributeError(
                "You need to call optimize_threshold before using predict_with_threshold"
            )
        probs = self.predict_proba(X)
        y_pred = np.where(probs[:, 1] > self.threshold, 1, 0)
        return y_pred


def report_metrics(y_true, y_pred_prob, threshold=0.5):
    y_pred = (y_pred_prob > threshold).astype(int)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    auc_roc = roc_auc_score(y_true, y_pred_prob)
    auc_pr = average_precision_score(y_true, y_pred_prob)

    metrics_dict = {
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "auc_roc": auc_roc,
        "auc_pr": auc_pr,
        "threshold": threshold,
    }

    return metrics_dict


In [None]:
# voting_clf = VotingClassifier(
#     estimators=[xgb_calibrated, lgbm_calibrated, svc_calibrated],
#     voting="hard",
# )
# voting_clf.fit(data["train"][0].values, data["train"][1].values)
# voting_clf.optimize_threshold(data["train"][0].values, data["train"][1].values)


In [None]:
# y_pred = voting_clf.predict_with_threshold(X=data["train"][0].values)
# y_pred_probs = voting_clf.predict_proba(X=data["train"][0].values)
# y_pred_probs = torch.softmax(
#     torch.Tensor(voting_clf.predict_proba(X=data["train"][0].values)), dim=1
# ).numpy()[:, 1]

# report_metrics(
#     y_true=data["train"][1].values,
#     y_pred_prob=y_pred_probs,
#     threshold=voting_clf.threshold,
# )


In [None]:
# y_pred = voting_clf.predict_with_threshold(X=data["val"][0].values)
# y_pred_probs = voting_clf.predict_proba(X=data["val"][0].values)
# y_pred_probs = torch.softmax(
#     torch.Tensor(voting_clf.predict_proba(X=data["val"][0].values)), dim=1
# ).numpy()[:, 1]

# report_metrics(
#     y_true=data["val"][1].values,
#     y_pred_prob=y_pred_probs,
#     threshold=voting_clf.threshold,
# )


### Neuralnets

In [None]:
class DeepNetwork(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(DeepNetwork, self).__init__()
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.output_size = output_size

        self.fc_layers = nn.ModuleList()
        for i, hidden_size in enumerate(hidden_sizes):
            if i == 0:
                self.fc_layers.append(nn.Linear(input_size, hidden_size))
            else:
                self.fc_layers.append(nn.Linear(hidden_sizes[i - 1], hidden_size))
            self.fc_layers.append(nn.BatchNorm1d(hidden_size))
            self.fc_layers.append(nn.Dropout(0.5))
            self.fc_layers.append(nn.ReLU())

        self.fc_layers.append(nn.Linear(hidden_sizes[-1], output_size))
        self.softmax = nn.Softmax()

    def forward(self, x):
        for layer in self.fc_layers:
            if isinstance(layer, nn.Linear):
                x = x.type_as(layer.weight)
            x = layer(x)
        return nn.functional.softmax(x)

    def predict_proba(self, X):
        X = torch.tensor(X, dtype=torch.float)
        with torch.no_grad():
            logits = self.forward(X)
            # probs = self.softmax(logits)
        return logits.numpy()

    def optimize_threshold(self, X, y_true):
        y_true = np.array(y_true)
        y_prob = self.predict_proba(X)[:, 1]
        fprs, tprs, thresholds = roc_curve(y_true, y_prob)
        opt_idx = np.argmax(tprs - fprs)
        self.threshold = thresholds[opt_idx]

    def predict_with_threshold(self, X):
        if not hasattr(self, "threshold"):
            raise AttributeError(
                "You need to call optimize_threshold before using predict_with_threshold"
            )
        probs = self.predict_proba(X)[:, 1]
        y_pred = np.where(probs > self.threshold, 1, 0)
        return y_pred


input_size = data["train"][0].shape[
    1
]  # replace this with the actual input size of your data
hidden_sizes = sorted([2 ** random.randint(3, 10) for k in range(10)])[::-1]
output_size = 1


In [None]:
network = DeepNetwork(
    input_size=data["train"][0].shape[1], hidden_sizes=hidden_sizes, output_size=1
)


In [None]:
unique_labels, label_counts = np.unique(y_train, return_counts=True)
class_weights = 1.0 / label_counts
class_weights /= np.sum(class_weights)
batch_size = 64

train_set = TensorDataset(
    torch.from_numpy(data["train"][0].values), torch.from_numpy(data["train"][1].values)
)
sampler = WeightedRandomSampler(
    weights=class_weights, num_samples=len(train_set), replacement=True
)
train_loader = DataLoader(train_set, batch_size=batch_size, sampler=sampler)

val_set = TensorDataset(
    torch.from_numpy(data["val"][0].values), torch.from_numpy(data["val"][1].values)
)

val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)


In [None]:
next(iter(train_loader))[-1]

# tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#         0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DeepNetwork(input_size, hidden_sizes, output_size).to(device)
criterion = nn.BCELoss(weight=torch.tensor(class_weights).to(device))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 1
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    train_correct = 0
    for batch in train_loader:
        X, targets = batch[0].to(device), batch[1].to(device)
        outputs = model(X)
        print(targets)
        loss = criterion(outputs, targets.unsqueeze(1).float())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * X.size(0)
        train_correct += torch.sum(torch.argmax(outputs, dim=1) == targets)

    train_loss /= len(train_loader.sampler)
    train_acc = train_correct.double() / len(train_loader.sampler)

    model.eval()
    val_loss = 0
    val_correct = 0
    with torch.no_grad():
        for batch in val_loader:
            X, targets = batch[0].to(device), batch[1].to(device)

            outputs = model(X)
            loss = criterion(outputs, targets)

            val_loss += loss.item() * X.size(0)
            val_correct += torch.sum(torch.argmax(outputs, dim=1) == targets)

    val_loss /= len(val_loader.sampler)
    val_acc = val_correct.double() / len(val_loader.sampler)
    print(
        f"Epoch [{epoch+1}/{num_epochs}], "
        f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
        f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
    )
