In [None]:
!pip install opendatasets timm lightning albumentations --upgrade --quiet

In [None]:
import opendatasets as od

# Assign the Kaggle data set URL into variable
dataset = 'https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia/data'
od.download(dataset)

In [None]:
# custom modules
%load_ext autoreload
%autoreload 2
import utilities
#import model_functions
import model_factory

#lightning modules and callbacks
import lightning_data
import lightning_model
import train_info
import learning_curves
import confusion_matrix

import os
import copy

# timm models
import timm

# torch modules (temporarily)
import torch.nn as nn
import torch
# pytorch lightning (for checkpointing callbacks)
import pytorch_lightning as pl
from pytorch_lightning.loggers import CSVLogger

In [None]:
# necessary, as checkpoints will be saved on GDrive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# ResNet50

resnet50_config = {
    'model_name': 'resnet50', # name of the pretrained model
    'classifier_name': 'linear', # name of the classifier (e.g. linear/nonlinear)
    'classifier_type': None, # leave it None
    'layers': 'all', # layers to train (e.g. first (starting from last), second (starting from last), all)
    'augmentation': 'strong', # augmentation type (e.g. normal or strong)
    'classes_weight': None, # weights for each class
    'batch_size': 128,
    'val_split': 0.1,
    'n_epochs': 20,
    'optimizer': 'Adam',
    'scheduler': '', # leave it empty to not use any scheduling
    'ensemble': True,
    'image_size': None,
    'mean': None,
    'std': None
    }

resnet50_config['classifier_type'] = model_factory.get_linear_classifer if resnet50_config['classifier_name'] == 'linear' else model_factory.get_simple_non_linear_classifier

resnet50_ckpt = '/content/drive/MyDrive/models/resnet50/linear all strong Adam /epoch=4-step=180.ckpt'
l_resnet50_model = lightning_model.PneumoniaModel.load_from_checkpoint(resnet50_ckpt, h=resnet50_config)

In [None]:
# DenseNet121

densenet121_config = {
    'model_name': 'densenet121', # name of the pretrained model
    'classifier_name': 'linear', # name of the classifier (e.g. linear/nonlinear)
    'classifier_type': None, # leave it None
    'layers': 'all', # layers to train (e.g. first (starting from last), second (starting from last), all)
    'augmentation': 'strong', # augmentation type (e.g. normal or strong)
    'classes_weight': None, # weights for each class
    'batch_size': 64,
    'val_split': 0.1,
    'n_epochs': 20,
    'optimizer': 'SGD',
    'scheduler': 'CosineAnnealingLR10', # leave it empty to not use any scheduling
    'ensemble': True,
    'image_size': None,
    'mean': None,
    'std': None
    }

densenet121_config['classifier_type'] = model_factory.get_linear_classifer if densenet121_config['classifier_name'] == 'linear' else model_factory.get_simple_non_linear_classifier

densenet121_ckpt = '/content/drive/MyDrive/models/densenet121/linear all strong SGD CosineAnnealingLR10/epoch=8-step=639.ckpt'
l_densenet121_model = lightning_model.PneumoniaModel.load_from_checkpoint(densenet121_ckpt, h=densenet121_config)

In [None]:
# EfficientNet_b0

efficientnet_b0_config = {
    'model_name': 'efficientnet_b0', # name of the pretrained model
    'classifier_name': 'linear', # name of the classifier (e.g. linear/nonlinear)
    'classifier_type': None, # leave it None
    'layers': 'all', # layers to train (e.g. first (starting from last), second (starting from last), all)
    'augmentation': 'strong', # augmentation type (e.g. normal or strong)
    'classes_weight': None, # weights for each class
    'batch_size': 64,
    'val_split': 0.1,
    'n_epochs': 20,
    'optimizer': 'SGD',
    'scheduler': '', # leave it empty to not use any scheduling
    'ensemble': True,
    'image_size': None,
    'mean': None,
    'std': None
    }

efficientnet_b0_config['classifier_type'] = model_factory.get_linear_classifer if efficientnet_b0_config['classifier_name'] == 'linear' else model_factory.get_simple_non_linear_classifier

efficientnet_b0_ckpt = '/content/drive/MyDrive/models/efficientnet_b0/linear all strong SGD /epoch=9-step=710.ckpt'
l_efficientnet_b0_model = lightning_model.PneumoniaModel.load_from_checkpoint(efficientnet_b0_ckpt, h=efficientnet_b0_config)

In [None]:
resnet50_model = copy.deepcopy(l_resnet50_model.model)
densenet121_model = copy.deepcopy(l_densenet121_model.model)
efficientnet_b0_model = copy.deepcopy(l_efficientnet_b0_model.model)

In [None]:
resnet50_model.fc = nn.Sequential(*list(resnet50_model.fc.children())[:-1])
densenet121_model.classifier = nn.Sequential(*list(densenet121_model.classifier.children())[:-1])
efficientnet_b0_model.classifier = nn.Sequential(*list(efficientnet_b0_model.classifier.children())[:-1])

In [None]:
# dataloader
pneumonia_data_resnet50 = lightning_data.PneumoniaDataModule(resnet50_config)
pneumonia_data_resnet50.setup()

In [None]:
test_set = pneumonia_data_resnet50.test_set
print(len(test_set))

In [None]:
meta_train_set = pneumonia_data_resnet50.meta_set
print(len(meta_train_set))

In [None]:
# Can I do this?
'''
pneumonia_data.setup()
test_set = pneumonia_data.test_set
print(len(test_set))
'''


In [None]:
import torch
import numpy as np

def create_meta_data(data_loader, model_list):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    meta_X = []
    meta_Y = []

    for inputs, labels in data_loader:
        inputs = inputs.unsqueeze(0)
        inputs = inputs.to(device)

        logits_list = []

        for model in model_list:
            model.to(device)
            model.eval()

            with torch.no_grad():
                logits = model(inputs)

            logits_list.append(logits.cpu().numpy())

        meta_X.append(np.concatenate(logits_list, axis=1))
        meta_Y.append(labels)

    meta_X = np.concatenate(meta_X, axis=0)
    meta_Y = np.array(meta_Y)

    indices = np.arange(meta_X.shape[0])
    np.random.shuffle(indices)

    meta_X = meta_X[indices]
    meta_Y = meta_Y[indices]

    return meta_X, meta_Y

In [None]:
# models = [model1, model2, model3]
models = [resnet50_model, densenet121_model, efficientnet_b0_model]

X_train, y_train = create_meta_data(meta_train_set, models)
X_test, y_test = create_meta_data(test_set, models)

In [None]:
print(X_train.shape)
print(X_train)

In [None]:
print(y_train.shape)
print(y_train)

In [None]:
print(X_test.shape)
print(X_test)

In [None]:
print(y_test.shape)
print(y_test)

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

# Step 1: Initialize the logistic regression model
model = LogisticRegression()

# Step 2: Train the model on the training set
model.fit(X_train, y_train)

# Step 3: Make predictions on the test set
y_pred = model.predict(X_test)

# Step 4: Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

print(f"Accuracy on the test set: {accuracy}")
print(f"Precision on the test set: {precision}")
print(f"Recall on the test set: {recall}")
print(f"F1 score on the test set: {f1}")
print(f"AUC on the test set: {auc}")

In [None]:
from sklearn.svm import SVC

# Step 1: Initialize the SVM model
model = SVC(probability=True)  # Set probability=True to enable probability estimates

# Step 2: Train the model on the training set
model.fit(X_train, y_train)

# Step 3: Make predictions on the test set
y_pred = model.predict(X_test)

# Step 4: Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

print(f"Accuracy on the test set: {accuracy}")
print(f"Precision on the test set: {precision}")
print(f"Recall on the test set: {recall}")
print(f"F1 score on the test set: {f1}")
print(f"AUC on the test set: {auc}")

In [None]:
from sklearn.tree import DecisionTreeClassifier

# Step 1: Initialize the Decision Tree model
model = DecisionTreeClassifier()

# Step 2: Train the model on the training set
model.fit(X_train, y_train)

# Step 3: Make predictions on the test set
y_pred = model.predict(X_test)

# Step 4: Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

print(f"Accuracy on the test set: {accuracy}")
print(f"Precision on the test set: {precision}")
print(f"Recall on the test set: {recall}")
print(f"F1 score on the test set: {f1}")
print(f"AUC on the test set: {auc}")

In [None]:
from sklearn.neural_network import MLPClassifier

# Step 1: Initialize the MLP classifier model
model = MLPClassifier(hidden_layer_sizes=(6,), max_iter=1000)  # One hidden layer with 100 units

# Step 2: Train the model on the training set
model.fit(X_train, y_train)

# Step 3: Make predictions on the test set
y_pred = model.predict(X_test)

# Step 4: Evaluate the model's performance
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])

print(f"Accuracy on the test set: {accuracy}")
print(f"Precision on the test set: {precision}")
print(f"Recall on the test set: {recall}")
print(f"F1 score on the test set: {f1}")
print(f"AUC on the test set: {auc}")