In [22]:
import os 
import numpy as np
import pandas as pd 
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets

from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Dataset
from PIL import Image

# PATHS

In [80]:
cwd = os.getcwd()
data_path = os.path.join(cwd, "data")
trainset_path = os.path.join(data_path, "trainset")
testset_path = os.path.join(data_path, "testset")

# TOOLS - Custom classes and functions 

## Data augmentation 

In [28]:
# Define transformations for data augmentation
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize image to 224x224
    transforms.RandomRotation(degrees=30),  # Random rotation by up to 30 degrees
    transforms.RandomHorizontalFlip(),      # Random horizontal flipping
    transforms.RandomVerticalFlip(),        # Random vertical flipping
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Adjust color jitter
    # transforms.RandomGrayscale(p=0.1),     # Randomly convert images to grayscale with probability 0.1
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),  # Random affine transformation
    #transforms.ToTensor(),                  # Convert PIL image to tensor
])


def create_fake_neg_patients(nb_patients, neg_patients, size=(80, 120), path=trainset_path, transform=data_transform):
    """
    Data augmentation step.

    :params nb_patients: int
        Nb of fake patients to create
    :params neg_patient: 
        List of negative patients from which to sample.
    :params size:
        Range in which sample the nb of images per fake patient
    :params path: 
        Path to folders with patients 
    :params transform: 
        Transformation step for images
    """
    min_, max_ = size
    for i in range(nb_patients):

        # select an arbitrary nb of images for neg patient
        nb_img = np.random.randint(min_, max_+1)

        # create folder 
        fake_path = os.path.join(path, f"Fake_P{i}")
        os.mkdir(fake_path)
        
        for j in range(nb_img):
            # choose a random patient
            neg_patient = np.random.choice(neg_patients)
            imgs = os.listdir(os.path.join(path, neg_patient))
            img_path = os.path.join(path, neg_patient, np.random.choice(imgs))

            # lozd image
            img = Image.open(img_path).convert('RGB')

            # transform and save image
            tf_img = transform(img)  # .numpy().transpose((1, 2, 0)) 
            tf_img.save(os.path.join(fake_path, f"{j}.jpg"))

        print(f"Fake_P{i} created")

## Data loaders 

In [9]:
class PatientDataset(Dataset):
    def __init__(self, root_dir, labels, patients, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.patients = patients
        self.labels = labels

    def __len__(self):
        return len(self.patients)

    def __getitem__(self, idx):
        patient_folder = os.path.join(self.root_dir, self.patients[idx])
        images = []
        for filename in os.listdir(patient_folder):
            image_path = os.path.join(patient_folder, filename)
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            images.append(image)
        label = self.labels[self.patients[idx]]
        label_tensor = torch.tensor([label], dtype=torch.float32)
    
        return torch.stack(images), label_tensor, self.patients[idx]

In [100]:
class ImageLevelDataset(Dataset):
    def __init__(self, root_dir, list_images, labels, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.list_img = list_images
        self.labels = labels

    def __len__(self):
        return len(self.list_img)

    def __getitem__(self, idx):
        img_path, patient = self.list_img[idx]

        # get image and label
        label = self.labels[patient]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:  # transform image
                image = self.transform(image)

        # transfrom label to right format
        label_tensor = torch.tensor([label], dtype=torch.float32)
        return image, label_tensor

## Models

In [13]:
# Define your CNN model
class CNN(nn.Module):
    def __init__(self, num_classes=1):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 28 * 28, 64)
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = x.view(-1, 64 * 28 * 28)
        x = torch.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x
    
    def embedding(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = x.view(-1, 64 * 28 * 28)
        x = self.fc1(x)
        return x

## Loss

In [110]:
class SymmetricCrossEntropyLoss(nn.Module):
    """ 
    Symmetric Cross entropy loss with weights is more robust to 
    non reliable 1 labels. 
    """
    def __init__(self, alpha=0.75, beta=1.0):
        """
        Define wieghts
        """
        super(SymmetricCrossEntropyLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.cross_entropy = torch.nn.BCELoss()

    def forward(self, preds, targets):
        # preds = preds.squeeze()  # Squeeze the output to remove extra dimension
        loss = self.beta * (1 - targets) * self.cross_entropy(preds, targets) 
        loss += self.alpha * targets * self.cross_entropy(preds, targets)
        return torch.mean(loss)

# Introduction 

In [15]:
annotation_file = os.path.join(data_path, "clinical_annotation.csv")
df_ann = pd.read_csv(annotation_file)
df_ann.drop("Unnamed: 0", axis=1, inplace=True)

### RAPID PREPROCESSING

# compute age 
def compute_age(x):
    year = int(x[-4:])
    return 2024 - year
    
df_ann["age"] = df_ann.DOB.apply(compute_age)
df_ann.drop("DOB", axis=1, inplace=True)

#encode gender
df_ann["GENDER"] = df_ann["GENDER"].replace('f', "F")
label_encoder = LabelEncoder()
df_ann['GENDER'] = label_encoder.fit_transform(df_ann['GENDER'])

df_ann.set_index("ID", inplace=True)

In [16]:
df_ann.head()

Unnamed: 0_level_0,LABEL,GENDER,LYMPH_COUNT,age
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
P26,1,1,11.2,91
P183,1,1,12.8,82
P89,1,1,9.6,89
P123,1,1,122.6,93
P61,1,0,11.6,93


In [17]:
sub_patients = list(df_ann.loc[df_ann.LABEL==-1].index)

## Train-test split 

Isolate as soon as possible our future test data

In [19]:
test_patients = list(df_ann[df_ann.LABEL==0].sample(15).index)
test_patients += list(df_ann[df_ann.LABEL==1].sample(15).index)
print("Test size: ", len(test_patients))

Test size:  30


Define submission, train and test set 

In [20]:
sub_df = df_ann[df_ann.index.isin(sub_patients)]  # submission patients
test_df = df_ann[df_ann.index.isin(test_patients)]  # test patients

# train patients are the others
train_patients = list(set(df_ann.index) - set(test_patients) - set(sub_patients))
train_df = df_ann[df_ann.index.isin(train_patients)]

# Data augmentation 

Due to imbalance between positive and negative class, create fake negative patients 

In [29]:
# define negative patients

create_fake_neg_patients(
    nb_patients=20, 
    neg_patients=list(train_df.index)
)

Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created
Fake_P{i} created


# Train Classifier

### train test split

In [111]:
train_clf_patients = list(train_df[train_df.LABEL == 1].sample(12).index)
train_clf_patients += list(train_df[train_df.LABEL == 0].sample(15).index)

test_clf_patients = list(train_df[~train_df.index.isin(train_clf_patients)].index)

## Data loader 

In [112]:
# preparation of data loader
labels = train_df[train_df.index.isin(train_clf_patients)].LABEL.to_dict()
transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])

list_images = []
for pat in train_clf_patients:
    for img in os.listdir(os.path.join(trainset_path, pat)):
        list_images.append((os.path.join(trainset_path, pat, img), pat))
        
print(list_images)

[('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000000.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000001.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000002.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000003.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000004.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P67\\000005.jpg', 'P67'), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification

In [113]:
dataset = ImageLevelDataset("./data/trainset/", list_images, labels, transform) 

## Training 

In [114]:
# Check if GPU is available and set device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [117]:
# Assuming you have your dataset in a variable named 'dataset'
# Split dataset into train and test sets
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize the model
model = CNN(num_classes=1).to(device)

# Define loss function (use SymmetricCrossEntropyLoss)
criterion = torch.nn.BCELoss()

# Define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train the model
num_epochs = 2
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

# Evaluate the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        predicted = (outputs >= 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total}%")


Epoch 1/2, Loss: 0.6931398717256693
Epoch 2/2, Loss: 0.6731483592436864
Test Accuracy: 59.26829268292683%


In [118]:
overall_sum = 0
tot = 0
for _, labels in test_loader:
    overall_sum += labels.sum()
    tot += labels.size(0)

overall_sum / tot

tensor(0.5927)

In [59]:
torch.save(model.state_dict(), 'model.pt')

## Embedding

In [62]:
emb_labels = train_df[train_df.index.isin(test_clf_patients)].LABEL.to_dict()
list_images_emb = []
for pat in test_clf_patients:
    for img in os.listdir(os.path.join(trainset_path, pat)):
        list_images_emb.append((os.path.join(trainset_path, pat, img), emb_labels[pat]))
        
print(list_images_emb)

[('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000000.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000001.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000002.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000003.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000004.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\000005.jpg', 1), ('d:\\MOI\\CentraleSupelec\\Cours CS\\3A\\SDI\\DLMI\\challenge\\code\\DLMI---Lymphocytosis-classification\\data\\trainset\\P26\\0

In [65]:
emb_dataset = ImageLevelDataset("./data/trainset/", list_images_emb, emb_labels, transform) 
emb_loader = DataLoader(emb_dataset, batch_size=32, shuffle=False)

correct = 0
total = 0
with torch.no_grad():
    for images, labels in emb_loader:
        outputs = model(images)
        predicted = (outputs >= 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total}%")

Test Accuracy: 100.0%


# For submission

In [73]:
sub_df.head()

Unnamed: 0_level_0,LABEL,GENDER,LYMPH_COUNT,age
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
P71,-1,1,5.76,78
P16,-1,1,32.0,84
P114,-1,1,4.6,95
P170,-1,1,4.8,91
P98,-1,0,6.24,54


In [81]:
sub_labels = sub_df.LABEL.to_dict()
sub_dataset = PatientDataset(testset_path, sub_labels, patients=sub_patients, transform=transform)
sub_loader = DataLoader(sub_dataset)

In [86]:
dico_sub = {}

with torch.no_grad():
    for data in sub_loader:
        images, labels, pat = data
        images = torch.squeeze(images, dim=0)
        outputs = model(images)
        predicted = outputs.float().mean()
        dico_sub[pat] = predicted

In [87]:
dico_sub

{('P71',): tensor(0.),
 ('P16',): tensor(0.),
 ('P114',): tensor(0.),
 ('P170',): tensor(0.),
 ('P98',): tensor(0.),
 ('P69',): tensor(0.),
 ('P92',): tensor(0.),
 ('P132',): tensor(0.),
 ('P81',): tensor(0.),
 ('P73',): tensor(0.),
 ('P143',): tensor(0.),
 ('P175',): tensor(0.),
 ('P56',): tensor(0.),
 ('P139',): tensor(0.),
 ('P152',): tensor(0.),
 ('P203',): tensor(0.),
 ('P75',): tensor(0.),
 ('P9',): tensor(0.),
 ('P24',): tensor(0.),
 ('P4',): tensor(0.),
 ('P32',): tensor(0.),
 ('P120',): tensor(0.),
 ('P138',): tensor(0.),
 ('P172',): tensor(0.),
 ('P57',): tensor(0.),
 ('P195',): tensor(0.),
 ('P68',): tensor(0.),
 ('P133',): tensor(0.),
 ('P14',): tensor(0.),
 ('P119',): tensor(0.),
 ('P7',): tensor(0.),
 ('P49',): tensor(0.),
 ('P93',): tensor(0.),
 ('P178',): tensor(0.),
 ('P58',): tensor(0.),
 ('P108',): tensor(0.),
 ('P197',): tensor(0.),
 ('P196',): tensor(0.),
 ('P86',): tensor(0.),
 ('P18',): tensor(0.),
 ('P188',): tensor(0.),
 ('P148',): tensor(0.)}

# TEST -- to be removed 

In [3]:
cwd = os.getcwd()


In [6]:
df_ann = pd.read_csv(annotation_file)
df_ann.drop("Unnamed: 0", axis=1, inplace=True)
"""
# compute age 
def compute_age(x):
    year = int(x[-4:])
    return 2024 - year
    
df_ann["age"] = df_ann.DOB.apply(compute_age)
df_ann.drop("DOB", axis=1, inplace=True)

#encode gender
df_ann["GENDER"] = df_ann["GENDER"].replace('f', "F")
label_encoder = LabelEncoder()
df_ann['GENDER'] = label_encoder.fit_transform(df_ann['GENDER'])

df_ann.set_index("ID", inplace=True)"""

df_ann.loc[df_ann.LABEL >= 0].groupby("LABEL").count()

Unnamed: 0_level_0,ID,GENDER,DOB,LYMPH_COUNT
LABEL,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,50,50,50,50
1,113,113,113,113
