In [None]:
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.models import squeezenet1_1, resnet18
from torchvision.models.squeezenet import SqueezeNet1_1_Weights
from torchvision.models.resnet import ResNet18_Weights

import pandas as pd
import numpy as np
import glob
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import matplotlib.patches as mpatches
import seaborn as sns

from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.svm import SVC
from sklearn.manifold import TSNE

import itertools
from tqdm import tqdm
import pickle
import gc
import os

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
class MIDmodel2(nn.Module):
    def __init__(self):
        super(MIDmodel2, self).__init__()
        self.pretrained = resnet18(weights = ResNet18_Weights.DEFAULT)
        # remove the last fully connected layer
        self.pretrained.fc = nn.Identity()
        self.fc1 = nn.Linear(512, 4)

    def get_embeddings(self, x):
        x = self.pretrained(x)
        return x

    def forward(self, x):
        x = self.pretrained(x)
        x = self.fc1(x)
        return x
    
class ImageDataset(Dataset):
    def __init__(self, root_dir, patients_ids, patients_df, transform=None):
        self.patients_df = patients_df[patients_df["patient_id"].isin(patients_ids)]
        self.root_dir = root_dir
        self.transform = transform
        
    def __len__(self):
        return len(self.patients_df)

    def __getitem__(self, idx):
        row = self.patients_df.iloc[idx]
        path = os.path.join(self.root_dir, row["patient_id"] + "_" + row["exam_id"] + "_" + row["spot"] + "_" + row["frame_number"] + "_" + row["score"] + ".png")
        image = Image.open(path)
        label = row["score"]

        if self.transform:
            image = self.transform(image)
            
        image = transforms.ToTensor()(image)
        image = transforms.Resize((224, 224))(image)
        image = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(image)

        return image, int(label), path

In [None]:
class MIDbinary(nn.Module):
    def __init__(self):
        super(MIDbinary, self).__init__()
        self.fc1 = nn.Linear(4, 2)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.sigmoid(x)
        return x
    
class BinaryDataset(Dataset):
    def __init__(self, preds):
        self.preds = preds

    def __len__(self):
        return len(self.preds)

    def __getitem__(self, idx):
        label = self.preds.iloc[idx]["correct"]
        sms = self.preds.iloc[idx][["sm0", "sm1", "sm2", "sm3"]]
        sms = torch.tensor(list(sms.values), dtype=torch.float32)
        return sms, int(label), self.preds.iloc[idx]["path"]

# TO TEST

Resnet cm

Binary cm

t-SNE behaviour cm

final cm

In [None]:
data_dir = "images/"
images_paths = glob.glob(f"{data_dir}*.png", recursive=True)
images_df = pd.DataFrame([path[len(data_dir):-4].split("_") for path in images_paths], columns=["patient_id", "exam_id", "spot", "frame_number", "score"])
images_df["score"] = images_df["score"].astype(str)
images_df["frame_number"] = images_df["frame_number"].astype(str)
images_df["spot"] = images_df["spot"].astype(str)
images_df["patient_id"] = images_df["patient_id"].astype(str)
images_df["exam_id"] = images_df["exam_id"].astype(str)
patients_ids = set(images_df["patient_id"])
patients_ids = list(patients_ids)

# select the 8 patients for training based on the most balanced distribution of the scores
combs = list(itertools.combinations(patients_ids, 8))
stds = []
for i, c in enumerate(combs):
    stds.append((images_df[images_df["patient_id"].isin(c)].groupby("score").count()["patient_id"].std(), i))
# sort the stds
train_patients = [x for x in combs[min(stds)[1]]]
print(train_patients)
print(images_df[images_df["patient_id"].isin(train_patients)].groupby("score").count()["patient_id"])
test_patients = [x for x in patients_ids if x not in train_patients]
print(test_patients)
print(images_df[images_df["patient_id"].isin(test_patients)].groupby("score").count()["patient_id"])

In [None]:
for patient in test_patients:
    # print the number of images for each score based on image_df
    print("Number of images for each score for patient", patient)
    result = images_df[images_df["patient_id"] == patient]["score"].value_counts()
    # sort the result by score
    result = result.sort_index()
    print(result)

max_number = 29
selected_df_test = pd.DataFrame()
for patient in test_patients:
    for score in range(0,4):
        # get the images for the patient and score
        patient_score_df = images_df[(images_df["patient_id"] == patient) & (images_df["score"] == f"{score}")]
        # if the number of images is less than 40, select all of them
        if len(patient_score_df) < max_number:
            selected_df_test = pd.concat([selected_df_test, patient_score_df])
        else:
            # select 29 images randomly
            selected_df_test = pd.concat([selected_df_test, patient_score_df.sample(n=max_number, random_state=42)])
print(len(selected_df_test))
# print the number of images for each score based on selected_df
print("Number of images for each score for the selected images")
result = selected_df_test["score"].value_counts()
# sort the result by score
result = result.sort_index()
print(result)

# create a new dataframe using each entry of the selected_df as path
paths_test_df = pd.DataFrame()
for index, row in selected_df_test.iterrows():
    # get the path
    path = os.path.join("images/", row["patient_id"] + "_" + row["exam_id"] + "_" + row["spot"] + "_" + row["frame_number"] + "_" + row["score"] + ".png" )
    # create a new dataframe with the path
    tmp_df = pd.DataFrame({"path": [path]})
    # add the new dataframe to the values dataframe
    paths_test_df = pd.concat([paths_test_df, tmp_df])

In [None]:
def getPath(row):
    return os.path.join("images/", row["patient_id"] + "_" + row["exam_id"] + "_" + row["spot"] + "_" + row["frame_number"] + "_" + row["score"] + ".png")

In [None]:
test_images_tmp = images_df[images_df["patient_id"].isin(test_patients)]
test_images_df = test_images_tmp.reset_index(drop=True)
test_images_df["path"] = test_images_df.apply(getPath, axis=1)
test_images_df = test_images_df[test_images_df["path"].isin(paths_test_df["path"])]
test_dataset = ImageDataset(data_dir, test_patients, test_images_df)

values_train = pd.read_csv("data/values_train.csv") # to build t-SNE
values_test = pd.read_csv("data/values_test.csv")
values_test = values_test[values_test["path"].isin(paths_test_df["path"])]
values_test = values_test.drop(values_test[values_test["correct"] == 1].sample(n=48, random_state=42).index)
values_test = values_test.drop(columns=["path"])


In [None]:
# confusion matrix of the models/MIDmodel2.pt
model = MIDmodel2()
model.load_state_dict(torch.load("models/MIDmodel2.pt"))
model.eval()
model.to(device)
test_dataset = ImageDataset(data_dir, test_patients, test_images_df)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)
y_true = []
y_pred = []
for i, (image, label, path) in tqdm(enumerate(test_loader)):
    image = image.to(device)
    label = label.to(device)
    output = model(image)
    _, predicted = torch.max(output.data, 1)
    y_true.append(label.item())
    y_pred.append(predicted.item())

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.show()

In [None]:
# confusion matrix of the models/binary_model.pt
model = MIDbinary()
model.load_state_dict(torch.load("models/binary_model.pt"))
model.eval()
model.to(device)
test_dataset = BinaryDataset(values_test)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)
y_true = []
y_pred = []
for i, (image, label, path) in tqdm(enumerate(test_loader)):
    image = image.to(device)
    label = label.to(device)
    output = model(image)
    _, predicted = torch.max(output.data, 1)
    y_true.append(label.item())
    y_pred.append(predicted.item())

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.show()

In [None]:
# confusion matrix of t-SNE
y_true = []
y_pred = []
for element in tqdm(values_test.iterrows()):
    row = element[1].drop(["true", "predicted", "correct"])
    values_test = pd.concat([values_test, row])
    points = values_train[["sm0", "sm1", "sm2", "sm3"]].values
    tsne = TSNE(n_components=2, verbose=0, perplexity=40, n_iter=400)
    tsne_results = tsne.fit_transform(points)
    distances = np.linalg.norm(tsne_results - tsne_results[len(tsne_results)-1], axis=1)
    points_num = 12
    nearest_points = np.argsort(distances)[:points_num]
    nearest_points = np.delete(nearest_points, 0)
    scores = []
    for k in range(len(nearest_points)):
        scores.append(values_train.iloc[nearest_points[k]]["true"])
    mode_val = max(set(scores), key=scores.count)
    y_true.append(element["true"])
    y_pred.append(mode_val)

cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.show()