<a href="https://colab.research.google.com/github/elisa641995/po/blob/main/Breast_cancer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##imports

In [None]:
import os
from os import listdir
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score
import os
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sns
from PIL import Image
from sklearn.model_selection import train_test_split
import torch
import torchvision
from torchvision import models
from torch.utils.data import DataLoader
from torchvision import transforms as T
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')
import cv2
from torchvision.ops import box_iou
from torchvision.transforms import v2
from google.colab.patches import cv2_imshow  # For displaying images in Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# Setting up the device for GPU usage
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
device

In [None]:
# Define the base directory where your folders are located
base_dir = '/content/drive/MyDrive/Dataset_BUSI_with_GT'  # Update with the path in your Google Drive

# Define folders and their corresponding labels
folders = {'normal': 'normal', 'malignant': 'malignant', 'benign': 'benign'}

# Initialize an empty list to store file paths and labels
data = []

# Loop through each folder and process images
for folder_name, label in folders.items():
    folder_path = os.path.join(base_dir, folder_name)

    # Traverse the folder
    for filename in os.listdir(folder_path):
        if '_mask' not in filename:  # Ignore files with '_mask' in the name
            file_path = os.path.join(folder_path, filename)
            data.append({'file_path': file_path, 'label': label})

# Create a DataFrame to keep track of file paths and labels
df = pd.DataFrame(data)

# Display the first few rows of the DataFrame
df.head()

# Save the DataFrame to a CSV file if needed
df.to_csv('breast_images_dataset.csv', index=False)


In [None]:
plt.figure(figsize=(10, 10))
plt.subplot
sns.countplot(x='label', data=df)
plt.show()

In [None]:
sample_df = df.sample(n=20, random_state=42)
plt.figure(figsize=(15, 15))
columns = 5
for i, row in enumerate(sample_df.iterrows()):
    file_path = row[1]['file_path']
    label = row[1]['label']
    image = cv2.imread(file_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    plt.subplot(4, columns, i + 1)
    plt.imshow(image)
    plt.title(label)
    plt.axis('off')
plt.show()


##Building clases

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, images: list, labels: list, transform=None):
        super().__init__()
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

#se block

In [None]:
class SE_Block(nn.Module):
    def __init__(self, c, r=16):
        super(SE_Block, self).__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(c, c // r, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(c // r, c, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        bs, c, _, _ = x.size()
        y = self.squeeze(x).view(bs, c)
        y = self.excitation(y).view(bs, c, 1, 1)
        return x * y.expand_as(x)

#Train test split and data loading

In [None]:


img_train = T.Compose(
    [
        T.Resize([250,250]),
        T.RandomHorizontalFlip(),
        T.RandomVerticalFlip(),
        T.RandomRotation(10),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ],)

img_val = T.Compose(
    [
        T.Resize([250,250]),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

    ],
)

img_test = T.Compose(
    [
        T.Resize([250,250]),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

    ],
)

In [None]:
#maping the lables  in df
df['label'] = df['label'].map({'normal': 0, 'malignant': 1, 'benign': 2})

In [None]:
train_df, temp_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)  # 80% train, 20% temp
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)  # 50% val, 50% test
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
print("TRAIN Dataset: {}".format(train_df.shape))
print("VALIDATION Dataset: {}".format(val_df.shape))
print("TEST Dataset: {}".format(test_df.shape))
train_df_2=Dataset(images=train_df['file_path'].values,labels=train_df['label'].values,transform=img_train)
val_df_2=Dataset(images=val_df['file_path'].values,labels=val_df['label'].values,transform=img_val)
test_df_2=Dataset(images=test_df['file_path'].values,labels=test_df['label'].values,transform=img_test)


In [None]:
#hyperparameters
BATCH_SIZE = 2
epochs = 50
LEARNING_RATE = 1e-3                 #learning rate
gradient_clip = 0.1                 #gradient clipping
weight_decay = 1e-4               #weight decay
patience = 3

In [None]:

train_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
                'pin_memory':True
                }

test_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
               'pin_memory':True
                }
Validation_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
                     'pin_memory':True
                }
training_loader = DataLoader(train_df_2, **train_params)
testing_loader = DataLoader(test_df_2, **test_params)
validation_loader = DataLoader(val_df_2, **Validation_params)

#CNN model


Using 4 convolutional layers strikes a balance between model complexity and data limitations. Medical images have subtle details that a shallow model might miss, while 4 layers allow for effective feature extraction without overfitting on a small dataset of 780 images.

In [None]:
class BreastUltrasoundCNN(nn.Module):
    def __init__(self):
        super(BreastUltrasoundCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64,128, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(28800, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 64)
        self.fc4 = nn.Linear(64, 3)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.pool(F.leaky_relu(self.bn1(self.conv1(x))))
        x = self.pool(F.leaky_relu(self.bn2(self.conv2(x))))
        x = self.pool(F.leaky_relu(self.bn3(self.conv3(x))))
        x = self.pool(F.leaky_relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 28800)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.dropout(F.relu(self.fc2(x)))
        x= self.dropout(F.relu(self.fc3(x)))
        x = F.softmax(self.fc4(x), dim=1)
        return x # matrix of probabilitis of batch size*num of clases


#CNN model with se block

In [None]:
class BreastUltrasoundCNN_SE(nn.Module):
    def __init__(self):
        super(BreastUltrasoundCNN_SE, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)
        self.se=SE_Block(16)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.se2=SE_Block(32)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.se3=SE_Block(64)
        self.bn3 = nn.BatchNorm2d(64)
        self.conv4 = nn.Conv2d(64,128, kernel_size=3, stride=1, padding=1)
        self.se4=SE_Block(128)
        self.bn4 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(28800, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 64)
        self.fc4 = nn.Linear(64, 3)
        self.dropout = nn.Dropout(0.3)
    def forward(self,x):
      x = self.pool(F.leaky_relu(self.bn1(self.conv1(x))))
      x = self.se(x)

      x = self.pool(F.leaky_relu(self.bn2(self.conv2(x))))
      x = self.se2(x)

      x = self.pool(F.relu(self.bn3(self.conv3(x))))
      x = self.se3(x)
      x = self.pool(F.relu(self.bn4(self.conv4(x))))
      x = self.se4(x)
      x = x.view(-1, 28800)
      x = self.dropout(F.relu(self.fc1(x)))
      x = self.dropout(F.relu(self.fc2(x)))
      x= self.dropout(F.relu(self.fc3(x)))
      x = F.softmax(self.fc4(x), dim=1)
      return x


In [None]:
my_model_cnn = BreastUltrasoundCNN()
my_model_cnn.to(device)

In [None]:
count_clases=df['label'].value_counts()
count_clases

In [None]:
class_weights = np.array(count_clases/np.sum(count_clases))
class_weights

In [None]:
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
# Creating the loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(params =my_model_cnn.parameters(), lr=LEARNING_RATE,weight_decay=1e-3 )
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, 0.0001, epochs=50, steps_per_epoch=len(training_loader))



#Training base cnn

##Training function

In [None]:
def my_train(model, train_loader, val_loader, optimizer, loss_function, scheduler, epochs, patience=2, device="cuda"):
    #model_save_path = '/content/drive/MyDrive/best_model_breast_cnn.pt'
    #os.makedirs(os.path.dirname(model_save_path), exist_ok=True)
    best_val_loss = float('inf')
    no_improvement_count = 0
    training_losses = []
    validation_losses = []
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        step = 0
        all_preds, all_labels = [], []
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}")
        for step, (images, labels) in progress_bar:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            prediction = torch.argmax(outputs, dim=1)
            all_preds.extend(prediction.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            loss = loss_function(outputs, labels)
            train_loss += loss.item()
            loss.backward()
            optimizer.step()
            scheduler.step()
            step += 1
            if step % 10 == 0:
                print(f"Epoch {epoch+1}, Step {step}, Loss: {train_loss/step:.4f},LR:{scheduler.get_last_lr()}")
        # Compute F1 scores for training
        unique_labels = np.unique(all_labels)
        tr_f1_scores = f1_score(all_preds, all_labels, average=None, labels=unique_labels)
        tr_f1_score_dict = {label: score for label, score in zip(unique_labels, tr_f1_scores)}
        tr_macro_f1 = f1_score(all_preds, all_labels, average='macro', labels=unique_labels)
        train_loss_epoch = train_loss / step
        training_losses.append(train_loss_epoch)
        # Validation
        model.eval()
        val_loss = 0
        nb_val_steps = 0
        all_preds_v, all_labels_v = [], []
        with torch.no_grad():
            for val_step, (val_images, val_labels) in enumerate(val_loader):
                val_images, val_labels = val_images.to(device), val_labels.to(device)
                outputs_v = model(val_images)
                loss_v = loss_function(outputs_v, val_labels)
                val_loss += loss_v.item()
                nb_val_steps += 1
                prediction_v = torch.argmax(outputs_v, dim=1)
                all_preds_v.extend(prediction_v.cpu().numpy())
                all_labels_v.extend(val_labels.cpu().numpy())

        # Compute F1 scores for validation
        val_loss_epoch = val_loss / nb_val_steps
        validation_losses.append(val_loss_epoch)
        unique_labels_v = np.unique(all_labels_v)
        val_f1_scores = f1_score(all_preds_v, all_labels_v, average=None, labels=unique_labels_v)
        val_f1_score_dict = {label: score for label, score in zip(unique_labels_v, val_f1_scores)}
        val_macro_f1 = f1_score(all_preds_v, all_labels_v, average='macro', labels=unique_labels_v)

        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {train_loss/step:.4f}, Validation Loss: {val_loss_epoch:.4f}")
        print(f"Training F1 Scores: {tr_f1_score_dict}, Validation F1 Scores: {val_f1_score_dict}")
        print(f"Training Macro F1: {tr_macro_f1:.4f}, Validation Macro F1: {val_macro_f1:.4f}")

        if val_loss_epoch < best_val_loss:
            best_val_loss = val_loss_epoch
            #torch.save(model.state_dict(), model_save_path)
        else:
            no_improvement_count += 1
        if no_improvement_count >= patience:
            print("Early stopping triggered. No improvement in validation loss for {} epochs.".format(patience))
            break

    # Plot training and validation losses
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(training_losses) + 1), training_losses, label="Training Loss")
    plt.plot(range(1, len(validation_losses) + 1), validation_losses, label="Validation Loss")
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss per Epoch')
    plt.legend()
    plt.grid()
    plt.show()
    print("Training and Validation completed!")

#my_train(my_model, training_loader, validation_loader, optimizer, loss_function, sched, epochs, patience=patience, device=device)


In [None]:
my_train(my_model_cnn, training_loader, validation_loader, optimizer, loss_function, sched, epochs, patience=patience, device=device)


##Test function

In [None]:
#test set function
def test_model(model, test_loader, device):
    model.eval()
    test_loss = 0
    nb_test_steps = 0
    all_preds_s, all_labels_s = [], []
    model.eval()
    with torch.no_grad():
            for test_step, (test_images, test_labels) in enumerate(test_loader):
                test_images, test_labels = test_images.to(device), test_labels.to(device)
                outputs_s = model(test_images)
                prediction_s = torch.argmax(outputs_s, dim=1)
                all_preds_s.extend(prediction_s.cpu().numpy())
                all_labels_s.extend(test_labels.cpu().numpy())
    unique_labels_s = np.unique(all_labels_s)
    test_f1_scores = f1_score(all_preds_s, all_labels_s, average=None, labels=unique_labels_s)
    test_f1_score_dict = {label: score for label, score in zip(unique_labels_s, test_f1_scores)}
    test_macro_f1 = f1_score(all_preds_s, all_labels_s, average='macro', labels=unique_labels_s)
    print(f"Test F1 Scores: {test_f1_score_dict}")
    print(f"Test Macro F1: {test_macro_f1:.4f}")


In [None]:
test_model(my_model_cnn, testing_loader, device)

#Training CNN with SE block

In [None]:
cnn_se=BreastUltrasoundCNN_SE()
cnn_se.to(device)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
# Creating the loss function and optimizer
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(params =cnn_se.parameters(), lr=LEARNING_RATE,weight_decay=1e-3 )
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, 0.0001, epochs=50, steps_per_epoch=len(training_loader))

In [None]:

#training :
my_train(cnn_se, training_loader, validation_loader, optimizer, loss_function, sched, epochs, patience=patience, device=device)

In [None]:
#testing
test_model(cnn_se, testing_loader, device)

#RESNET50

In [None]:
from torchvision.models import resnet50
model_my = resnet50(pretrained=True)


In [None]:
from torchvision.models import ResNet50_Weights
weights = ResNet50_Weights.DEFAULT
preprocess = weights.transforms()
img_train_res=T.Compose([T.RandomHorizontalFlip(),
        T.RandomVerticalFlip(),preprocess])

In [None]:
class ResNet50Model(nn.Module):
  def __init__(self):
    super(ResNet50Model, self).__init__()
    self.resnet =resnet50(pretrained=True)
    self.dropout = torch.nn.Dropout(0.3)
    self.fc1=torch.nn.Linear(1000,3)
  def forward(self,x):
    x=self.resnet(x)
    x=F.relu(x)
    x=self.dropout(x)
    x = F.softmax(self.fc1(x), dim=1)
    return x


In [None]:
my_resnet=ResNet50Model()
my_resnet.to(device)

In [None]:
transform_my = T.Compose([
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
train_df, temp_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)  # 80% train, 20% temp
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)  # 50% val, 50% test
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
print("TRAIN Dataset: {}".format(train_df.shape))
print("VALIDATION Dataset: {}".format(val_df.shape))
print("TEST Dataset: {}".format(test_df.shape))
train_df_2=Dataset(images=train_df['file_path'].values,labels=train_df['label'].values,transform=img_train_res)
val_df_2=Dataset(images=val_df['file_path'].values,labels=val_df['label'].values,transform=preprocess)
test_df_2=Dataset(images=test_df['file_path'].values,labels=test_df['label'].values,transform=preprocess)


In [None]:

train_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
                'pin_memory':True
                }

test_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
               'pin_memory':True
                }
Validation_params = {'batch_size': BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0,
                     'pin_memory':True
                }
training_loader = DataLoader(train_df_2, **train_params)
testing_loader = DataLoader(test_df_2, **test_params)
validation_loader = DataLoader(val_df_2, **Validation_params)

In [None]:
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(params =my_resnet.parameters(), lr=LEARNING_RATE,weight_decay=1e-3 )
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.0001, epochs=50, steps_per_epoch=len(training_loader))

In [None]:
my_train(my_resnet, training_loader, validation_loader, optimizer, loss_function, sched, epochs, patience=2, device=device)

In [None]:
#testing
test_model(my_resnet, testing_loader, device)

#VGG19

In [None]:
model = models.vgg19(pretrained=True)
model

In [None]:
class my_vgg19(nn.Module):
  def __init__(self):
    super(my_vgg19, self).__init__()
    self.vgg =model
    self.dropout = torch.nn.Dropout(0.3)
    self.fc1=nn.Linear(1000,3)
  def forward(self,x):
    x=self.vgg(x)
    x=F.relu(x)
    x=self.dropout(x)
    x = F.softmax(self.fc1(x), dim=1)
    return x
my_vgg=my_vgg19()
my_vgg.to(device)

In [None]:
loss_function = torch.nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(params =my_vgg.parameters(), lr=LEARNING_RATE,weight_decay=1e-3 )
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, 0.00001, epochs=50, steps_per_epoch=len(training_loader))

In [None]:
#training
my_train(my_vgg, training_loader, validation_loader, optimizer, loss_function, sched, epochs, patience=1, device=device)

In [None]:
#testing
test_model(my_vgg, testing_loader, device)

#Results of classifications

In [None]:
data = {
    'CNN': [0, 0.5625, 0.7105263157894737, 0.4243],
    'CNN SE BLOCK': [0, 0.717948717948718, 0.7920792079207921, 0.5033],
    'RESNET50': [0.8, 0.5714285714285714, 0.5714285714285714, 0.7429],
    'VGG19': [0.6829268292682927, 0.7692307692307693, 0.8571428571428571, 0.7453]
}


index = ['normal_f1_score', 'malignant_f1_score', 'benign_f1_score', 'macro_f1_score']


df_results = pd.DataFrame(data, index=index)


df_results

After experimenting with various models, we observe that simple architectures like a basic CNN and an upgraded CNN with an SE block fail to deliver satisfactory results. This is primarily due to the limited amount of training data, making it challenging for these models to learn effectively.

However, when transitioning to more complex and deeper models, particularly pretrained ones, there is a noticeable improvement in performance. Among these, VGG19 stands out as the best-performing model. While its results are not perfect, it demonstrates significant improvement in classification abilities, as reflected in the F1 scores for each class, including the under represented classes "normal" and "malignant."

#Breast Cancer Image Segmentation | Attention UNet

In [None]:
import os
import re
import pandas as pd

base_dir = '/content/drive/MyDrive/Dataset_BUSI_with_GT'
folders = {'normal': 'normal', 'malignant': 'malignant', 'benign': 'benign'}

def get_base_filename(filename):
    return re.sub(r'_mask(_\d+)?\.png$', '.png', filename)

def process_images():
    image_paths = []
    mask_paths = []

    for folder_name, label in folders.items():
        folder_path = os.path.join(base_dir, folder_name)
        files = os.listdir(folder_path)

        # Group files by their base name
        file_groups = {}
        for file in files:
            base_name = get_base_filename(file)
            if base_name not in file_groups:
                file_groups[base_name] = []
            file_groups[base_name].append(file)

        # Process each group
        for base_name, group in file_groups.items():
            if len(group) == 2:  # One original image and one mask
                original_image = next(f for f in group if not f.endswith('_mask.png'))
                mask_image = next(f for f in group if f.endswith('_mask.png'))

                image_paths.append(os.path.join(folder_path, original_image))
                mask_paths.append(os.path.join(folder_path, mask_image))
            elif len(group) > 2:  # More than one mask file
                print(f"Removing group due to multiple masks: {group}")
                for file in group:
                    file_path = os.path.join(folder_path, file)
                    try:
                        os.remove(file_path)
                        print(f"Deleted: {file_path}")
                    except OSError as e:
                        print(f"Error deleting {file_path}: {e}")

    return image_paths, mask_paths

# Process images and create DataFrame
image_paths, mask_paths = process_images()

df_segmentation = pd.DataFrame({
    'image_path': image_paths,
    'mask_path': mask_paths
})

df_segmentation.head()

In [None]:
df_segmentation.shape

In [None]:
#Creating Dataset class for the segmentaiton task
class Dataset_seg(torch.utils.data.Dataset):
    def __init__(self, images: list, masked_image: list, transform=None):
        super().__init__()
        self.images = images
        self.masked_image = masked_image
        self.transform = transform

    def __len__(self):
        return len(self.images)
    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('L')
        masked_img = Image.open(self.masked_image[idx]).convert('L')
        if self.transform:
            image = self.transform(image)
            masked_img = self.transform(masked_img)
        print(image.shape,masked_img.shape)
        return image, masked_img

##The model

In [None]:
class conv_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv=nn.Sequential(
            nn.Conv2d(in_c, out_c, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_c),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_c, out_c, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_c),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
      x=self.conv(x)
      return x

In [None]:
class my_encoder(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv=conv_block(in_c, out_c)
        self.pool=nn.MaxPool2d((2, 2))
    def forward(self, x):
      s=self.conv(x)# s for the skip connection
      p=self.pool(s)
      #print(s,p)
      return s,p

In [None]:
class my_attention_gate(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.Wg = nn.Sequential(
            nn.Conv2d(in_c[0], out_c, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_c)
        )
        self.Ws = nn.Sequential(
            nn.Conv2d(in_c[1], out_c, kernel_size=1, padding=0),
            nn.BatchNorm2d(out_c)
        )
        self.relu = nn.ReLU(inplace=True)
        self.output = nn.Sequential(
            nn.Conv2d(out_c, out_c, kernel_size=1, padding=0),
            nn.Sigmoid()
        )

    def forward(self, g, s):
        Wg = self.Wg(g)
        Ws = self.Ws(s)
        out = self.relu(Wg + Ws)
        out = self.output(out)
        return out * s


In [None]:

class my_decoder(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()

        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.ag = my_attention_gate(in_c, out_c)
        self.c1 = conv_block(in_c[0]+out_c, out_c)

    def forward(self, x, s):
        x = self.up(x)
        s = self.ag(x, s)
        x = torch.cat([x, s], axis=1)
        x = self.c1(x)
        return x


In [None]:
class attention_unet(nn.Module):
    def __init__(self):
        super().__init__()
        self.encodeer_1=my_encoder(1,64)
        self.encodeer_2=my_encoder(64,128)
        self.encodeer_3=my_encoder(128,256)
        self.encodeer_4=my_encoder(256,512)
        self.conv1=conv_block(512,1024)
        self.decodeer_1=my_decoder([1024,512],512)
        self.decodeer_2=my_decoder([512,256],256)
        self.decodeer_3=my_decoder([256,128],128)
        self.decodeer_4=my_decoder([128,64],64)
        self.conv2=nn.Conv2d(64,1 , kernel_size=1,padding=0)
        self.seg=nn.Sigmoid()
    def forward(self, x):
      s1,p1=self.encodeer_1(x)#s_size[batchsize,64,256,256],psize[batchsize,64,128,128]
      #print(s1.shape,p1.shape)
      s2,p2=self.encodeer_2(p1)#s_size[batchsize,128,128,128],psize[batchsize,128,64,64]
      s3,p3=self.encodeer_3(p2)
      s4,p4=self.encodeer_4(p3)
      c1=self.conv1(p4)
      d1=self.decodeer_1(c1,s4)
      d2=self.decodeer_2(d1,s3)
      d3=self.decodeer_3(d2,s2)
      d4=self.decodeer_4(d3,s1)
      output=self.conv2(d4)
      #softmax
      output = self.seg(output)
      return output




In [None]:
model=attention_unet()
model.to(device)

In [None]:
#Data transform
transform_my_train = T.Compose([
    T.Resize([256,256]),
    T.RandomHorizontalFlip(),
        T.RandomVerticalFlip(),
    T.ToTensor()])
transform_test_val=T.Compose([
    T.Resize([256,256]),
    T.ToTensor()])
train_df, temp_df = train_test_split(df_segmentation, test_size=0.2, random_state=42)  # 80% train, 20% temp
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)  # 50% val, 50% test
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
print("TRAIN Dataset: {}".format(train_df.shape))
print("VALIDATION Dataset: {}".format(val_df.shape))
print("TEST Dataset: {}".format(test_df.shape))
train_df_2=Dataset_seg(images=train_df['image_path'].values,masked_image=train_df['mask_path'].values,transform=transform_test_val)
val_df_2=Dataset_seg(images=val_df['image_path'].values,masked_image=val_df['mask_path'].values,transform=transform_test_val)
test_df_2=Dataset_seg(images=test_df['image_path'].values,masked_image=test_df['mask_path'].values,transform=transform_test_val)

In [None]:
orginal_image,mask_image=test_df_2[1]
orginal_image.shape,mask_image.shape


In [None]:
image_np = orginal_image.squeeze().cpu().numpy()

# Plot the image
plt.imshow(image_np, cmap='gray')  # Use 'gray' colormap for grayscale images
plt.axis('off')  # Optional: hide axis
plt.show()

In [None]:
#training loader

train_params = {'batch_size': 4,
                'shuffle': True,
                'num_workers': 0,
                'pin_memory':True
                }

test_params = {'batch_size': 4,
                'shuffle': True,
                'num_workers': 0,
               'pin_memory':True
                }
Validation_params = {'batch_size': 4,
                'shuffle': True,
                'num_workers': 0,
                     'pin_memory':True
                }
training_loader = DataLoader(train_df_2, **train_params)
testing_loader = DataLoader(test_df_2, **test_params)
validation_loader = DataLoader(val_df_2, **Validation_params)

In [None]:
#dice loss
def dice_loss(pred, target, smooth = 1e-6):
    pred = pred.contiguous()
    target = target.contiguous()
    intersection=2*(pred*target).sum()+smooth
    dominator_sum=pred.sum()+target.sum()+smooth
    dice_loss=1-((intersection+smooth)/(dominator_sum+smooth))
    return dice_loss
def bce_dice_loss(pred, target):
    bce = torch.nn.BCELoss()
    bce_loss = bce(pred, target)
    dice = dice_loss(pred, target)
    return bce_loss + dice

In [None]:
#IUO matric function
def iou_score(output, target):
    smooth = 1e-5
    output = output.type(torch.LongTensor)
    target = target.type(torch.LongTensor)
    intersection=(output & target).float().sum((1, 2))
    union=(output | target).float().sum((1, 2))
    iou = (intersection + smooth) / (union + smooth)
    thresholded = torch.clamp(20 * (iou - 0.55), 0, 10).ceil() / 10  # This is equal to comparing with thresolds
    return thresholded.mean()

In [None]:
#optimizer
optimizer = torch.optim.Adam(params =model.parameters(), lr=0.0001,weight_decay=1e-3 )
sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, 0.001, epochs=50, steps_per_epoch=len(training_loader))

##Training

In [None]:
#training function for attention unet
def train_attention_unet(model,optimizer,loss_function,train_loader,val_loader,epochs,device,patience,scheduler):
    best_val_loss = float('inf')
    no_improvement_count = 0
    training_losses = []
    validation_losses = []
    iou_scores_train=[]
    iou_scores_val=[]
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        step = 0
        iou_train=0
        all_preds, all_labels = [], []
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}")
        for step, (images,masked_image ) in progress_bar:
            images, masked_image = images.to(device), masked_image.to(device)
            optimizer.zero_grad()
            outputs=model(images)
            threshold = 0.55
            prediction = (outputs > threshold)
            loss=loss_function(outputs,masked_image)
            iou_my=iou_score(prediction, masked_image)
            #convert to integer

            iou_my =iou_my.cpu().numpy()
            iou_train+=iou_my
            train_loss += loss.item()
            loss.backward()
            optimizer.step()
            sched.step()

            step += 1
            if step % 20 == 0:
                print(f"Epoch {epoch+1}, Step {step}, Loss: {train_loss/step:.4f}, IOU: {iou_train/step},LR:{scheduler.get_last_lr()}")
        #validation loss
        model.eval()
        val_loss = 0
        iou_val=0
        nb_val_steps = 0
        all_preds_v, all_labels_v = [], []
        with torch.no_grad():
          for val_step, (val_image, val_mask) in enumerate(val_loader):
            val_images, val_masks = val_image.to(device), val_mask.to(device)
            outputs_v = model(val_images)
            prediction_v = (outputs_v > 0.55)
            loss_v = loss_function(outputs_v, val_masks)
            val_loss += loss_v.item()
            nb_val_steps += 1
            iou_my_v=iou_score(prediction_v, val_masks)
            iou_my_v=iou_my_v.cpu().numpy()
            iou_val+=iou_my_v

        train_loss_epoch = train_loss/step
        val_loss_epoch = val_loss / nb_val_steps
        iou_train_epoch=iou_train/step
        iou_val_epoch=iou_val/nb_val_steps
        training_losses.append(train_loss_epoch)
        validation_losses.append(val_loss_epoch)
        iou_scores_train.append(iou_train_epoch)
        iou_scores_val.append(iou_val_epoch)
        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {train_loss_epoch:.4f}, Validation Loss: {val_loss_epoch}")
        print(f"Epoch {epoch+1}/{epochs}, Training IOU: {iou_train_epoch:.4f}, Validation IOU: {iou_val_epoch}")
        if val_loss_epoch < best_val_loss:
            best_val_loss = val_loss_epoch
            print(f"Best validation loss {best_val_loss}")
            #torch.save(model.state_dict(), model_save_path)
        else:
            no_improvement_count += 1
            if no_improvement_count >= patience:
              print("Early stopping triggered. No improvement in validation loss for {} epochs.".format(patience))
              break
    plt.figure(figsize=(20,8))
    plt.subplot(1,2,1)
    plt.title("Model Loss")
    plt.plot(training_losses, label="Training")
    plt.plot(validation_losses, label="Validtion")
    plt.legend()
    plt.grid()


    plt.subplot(1,2,2)
    plt.title("Model IoU")
    plt.plot(iou_scores_train, label="Training")
    plt.plot(iou_scores_val, label="Validtion")
    plt.legend()
    plt.grid()
    plt.show()





In [None]:
#training
train_attention_unet(model,optimizer,bce_dice_loss,training_loader,validation_loader,50,device,3,sched)

The results on the validation data outperform those on the training data in terms of IoU. This suggests that the model has the potential to achieve much better performance if provided with additional training data. However, the loss behavior is not ideal, as it increases during the final epochs, ultimately triggering the early stopping rule.

##Testing

In [None]:
#test function
def test_model_attention_unet(model,loss_function, test_loader, device):
    model.eval()
    test_loss = 0
    iou_scores_test = 0
    with torch.no_grad():
        for step, (images, masks) in enumerate(test_loader):
            images, masks = images.to(device), masks.to(device)
            outputs_test = model(images)
            loss=loss_function(outputs_test,masks)
            test_loss += loss.item()
            threshold = 0.55
            prediction_test = (outputs_test > threshold)
            iou_my=iou_score(prediction_test, masks)
            iou_my=iou_my.cpu().numpy()
            iou_scores_test+=iou_my
    iou_scores_test_final=iou_scores_test/len(test_loader)
    test_loss_final = test_loss / len(test_loader)
    print(f"Test Loss: {test_loss_final:.4f}, Test IOU: {iou_scores_test_final}")


In [None]:
test_model_attention_unet(model,bce_dice_loss,testing_loader,device)

In [None]:
model.eval()
with torch.no_grad():
    for step, (images, masks) in enumerate(testing_loader):
        if step == 1:
            break
        images, masks = images.to(device), masks.to(device)
        outputs_test = model(images)
        threshold = 0.55
        prediction_test = (outputs_test > threshold)
        np_images_original = images.cpu().detach().numpy()
        np_images = np.squeeze(np_images_original, axis=1)
        np_mask=np.squeeze(masks.cpu().detach().numpy(),axis=1)
        np_prediction_test = np.squeeze(prediction_test.cpu().detach().numpy(), axis=1)
        outputs_np = np.squeeze(outputs_test.cpu().detach().numpy(), axis=1)

print(np_images.shape)
print(np_mask.shape)
print(np_prediction_test.shape)
print(outputs_np.shape)

#plotting the predicted mask , predicted binary mask and the mask on original images
fig, axes = plt.subplots(4, 3, figsize=(15, 25))
fig.tight_layout(pad=3.0)

for i in range(4):
    # Original image with predicted mask
    axes[i, 0].imshow(np_images[i])
    axes[i, 0].imshow(outputs_np[i], alpha=0.5, cmap='copper')
    axes[i, 0].set_title("Original Image with Predicted Mask")
    axes[i, 0].axis('off')

    # Original image with original mask
    axes[i, 1].imshow(np_images[i])
    axes[i, 1].imshow(np_mask[i], alpha=0.5, cmap='copper')
    axes[i, 1].set_title("Original Image with Original Mask")
    axes[i, 1].axis('off')

    # Original image with binary mask
    axes[i, 2].imshow(np_images[i])
    axes[i, 2].imshow(np_prediction_test[i], alpha=0.5, cmap='copper')
    axes[i, 2].set_title("Original Image with Binary Mask")
    axes[i, 2].axis('off')

plt.show()

The results on the validation data outperform those on the training data in terms of IoU. This suggests that the model has the potential to achieve much better performance if provided with additional training data. However, the loss behavior is not ideal, as it increases during the final epochs, ultimately triggering the early stopping rule.

The results on the test data are reasonably good, as indicated by the IoU score. In the selected images, we observe some accurate "hits" in the predicted masks. While the predictions are not perfect, they are promising and demonstrate potential for further improvement.

# CGANs

In [None]:
device='cpu'

In [None]:
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [None]:
class Dataset_gan(torch.utils.data.Dataset):
    def __init__(self, images: list, labels: list, transform=None):
        super().__init__()
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('L')
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

In [None]:
class c_generator(nn.Module):
    def __init__(self,n_classes):
        super().__init__()
        self.label_conditioned_generator = nn.Sequential(nn.Embedding(n_classes, 100), nn.Linear(100, 16))#for the class input
        self.latent = nn.Sequential(nn.Linear(100, 4*4*512),
                                    nn.LeakyReLU(0.2, inplace=True))#for the noise input
        self.upconv_block = nn.Sequential(nn.ConvTranspose2d(513, 64*16, 4, 2, 1, bias=False),
                                          nn.BatchNorm2d(64*16),
                                          nn.ReLU(inplace=True),
                                          nn.ConvTranspose2d(64*16, 64*8, 4, 2, 1, bias=False),
                                          nn.BatchNorm2d(64*8),
                                          nn.ReLU(inplace=True),
                                          nn.ConvTranspose2d(64*8, 64*4, 4, 2, 1, bias=False),
                                          nn.BatchNorm2d(64*4),
                                          nn.ReLU(inplace=True),
                                          nn.ConvTranspose2d(64*4, 64*2, 4, 2, 1, bias=False),
                                          nn.BatchNorm2d(64*2),
                                          nn.ReLU(inplace=True),
                                          nn.ConvTranspose2d(64*2, 64, 4, 2, 1, bias=False),
                                          nn.BatchNorm2d(64),
                                          nn.ReLU(inplace=True),
                                          nn.ConvTranspose2d(64, 1, 4, 2, 1, bias=False))
        self.seg = nn.Sigmoid()

    def forward(self, noise, class_my):
        label_output = self.label_conditioned_generator(class_my)
        #print('lable shape ',label_output.shape)
        noise_output = self.latent(noise)
        #print('noise.shape',noise_output.shape)
        label_output = label_output.view(-1, 1, 4, 4)
        #print('lable out put shape after view',label_output.shape)
        noise_output = noise_output.view(-1, 512, 4, 4)
        concate = torch.cat((noise_output, label_output), dim=1)
        #print('concate shape ',concate.shape)
        output = self.upconv_block(concate)
        output = self.seg(output)
        return output

#my_generator=c_generator(3)
#n_classes = 3  # Assuming the range is 0 to 2
#tensor = torch.randint(0, n_classes, (4,))
#print(tensor)
#generate the random  noise and the class tensor if batch size 4
#noise=torch.randn(4,100)
#print(noise.shape)
#print(tensor.shape)
#output=my_generator(noise,tensor)
#print(output.shape)


In [None]:
class c_discriminator(nn.Module):
    def __init__(self,n_classes):
        super().__init__()
        self.label_conditioned_discriminator=nn.Sequential(nn.Embedding(n_classes,100),nn.Linear(100,65536))
        self.conv_block_1=nn.Sequential(nn.Conv2d(2,64, kernel_size=4, stride=2, padding=1, bias=False),
                                    nn.LeakyReLU(0.2,inplace=True),
                                    nn.Conv2d(64,128, kernel_size=4, stride=2, padding=1, bias=False),
                                    nn.BatchNorm2d(128,momentum=0.1,  eps=0.8),
                                    nn.LeakyReLU(0.2,inplace=True),
                                    nn.Conv2d(128,256, kernel_size=4, stride=2, padding=1, bias=False),
                                    nn.BatchNorm2d(256,momentum=0.1,  eps=0.8),
                                    nn.LeakyReLU(0.2,inplace=True),
                                    nn.Conv2d(256,512, kernel_size=4, stride=2, padding=1, bias=False),
                                    nn.BatchNorm2d(512,momentum=0.1,  eps=0.8),
                                    nn.LeakyReLU(0.2,inplace=True),
                                    nn.Dropout(0.4),
                                    nn.Flatten(),
                                    nn.Linear(131072,1),
                                    nn.Sigmoid()
                                    )
    def forward(self,image_tesnsor,class_my):
      label_output=self.label_conditioned_discriminator(class_my)
      #print('lable output shape',label_output.shape)
      label_output=label_output.view(-1,1,256,256)
      #print('lable output shape after view',label_output.shape)
      concate=torch.cat((image_tesnsor,label_output),dim=1)#shape[batch_size,2,256,256]
      #print('concate shape',concate.shape)
      output=self.conv_block_1(concate)

      return output

#my_dis=c_discriminator(3)
#n_classes = 3  # Assuming the range is 0 to 2
#tensor = torch.randint(0, n_classes, (4,))
#print(tensor)
#generate the random  noise and the class tensor if batch size 4
#noise=torch.randn(4,1,256,256)
#print(noise.shape)
#print(tensor.shape)
#output=my_dis(noise,tensor)
#print(output)

In [None]:
my_generator=c_generator(3)
my_generator.to(device)
my_discriminaor=c_discriminator(3)
my_discriminaor.to(device)

In [None]:
bc_loss=nn.BCELoss()

In [None]:
G_optimizer=torch.optim.Adam(params=my_generator.parameters(), lr=0.0001,weight_decay=1e-3 )
D_optimizer=torch.optim.Adam(params=my_discriminaor.parameters(), lr=0.0001,weight_decay=1e-3 )
batch_size=4

In [None]:
def discriminator_loss(label, output):
    disc_loss = bc_loss(output,label )
    #print(total_loss)
    return disc_loss

In [None]:
def generator_loss(label, fake_output):
    gen_loss = bc_loss(label, fake_output)
    return gen_loss

In [None]:
transform_gan = T.Compose([
    T.Resize([256,256]),
    T.ToTensor()])

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)  # 80% train, 20% tem
train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
print("TRAIN Dataset: {}".format(train_df.shape))
print("TEST Dataset: {}".format(test_df.shape))
train_df_2=Dataset_gan(images=train_df['file_path'].values,labels=train_df['label'].values,transform=transform_gan)
test_df_2=Dataset_gan(images=test_df['file_path'].values,labels=test_df['label'].values,transform=transform_gan)


In [None]:
#check the image shape
rand_img=train_df_2[1][0].shape
rand_img

In [None]:

train_params = {'batch_size': 4,
                'shuffle': True,
                'num_workers': 0,
                'pin_memory':True
                }

test_params = {'batch_size': 4,
                'shuffle': True,
                'num_workers': 0,
               'pin_memory':True
                }
training_loader = DataLoader(train_df_2, **train_params)
testing_loader = DataLoader(test_df_2, **test_params)


##Training

In [None]:
#Training
def training_gan(generator,discriminator,batch_size,optimizer_g,optimizer_d,loss_function_G,loss_function_D,noise_dim,train_loader,epochs,device):
  G_loses=[]
  D_loses=[]
  #best_val_loss = float('inf')
  #no_improvement_count = 0
  for epoch in range(epochs):
    generator.train()
    discriminator.train()
    train_loss_g = 0
    train_loss_d = 0
    step = 0
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}")
    for step, (images,labeles ) in progress_bar:
      images, labeles = images.to(device), labeles.to(device)
      print("the shape of the images",images.shape)
      #making sure that labels are tensors of shape [batch size,labels]
      labeles=labeles.type(torch.LongTensor)
      labeles=labeles.to(device)
      print("the sahpe of the clases",labeles.shape)
      print('the lables',labeles)
      optimizer_g.zero_grad()
      optimizer_d.zero_grad()
      #create a random  noise tensor of shapr[batch_size,100]
      my_noise=torch.randn(batch_size, noise_dim)
      my_noise=my_noise.to(device)
      #creating the tensor of 1 for the "original" images
      real_labels=torch.ones(batch_size,1)
      real_labels=real_labels.to(device)
      #creating the tensor of 0 for the "fake images"
      fake_labels=torch.zeros(batch_size,1)
      fake_labels=fake_labels.to(device)
      #the discrimiator loss for real images
      real_outputs=discriminator(images,labeles)#probability
      print('prob of dic real',real_outputs)
      real_loss=loss_function_D(real_labels,real_outputs)
      #Creating fake images from  the noise
      fake_g_images=generator(my_noise,labeles)
      #the discriminator loss from the fake images
      fake_outputs=discriminator(fake_g_images.detach(),labeles)#probability
      fake_loss=loss_function_D(fake_labels,fake_outputs)
      total_loss_d=(fake_loss+real_loss)/2
      train_loss_d+=total_loss_d.item()
      total_loss_d.backward()
      optimizer_d.step()
      #the generator loss from the outputs of the discriminator on the fake images
      g_loss=loss_function_G(real_labels,discriminator(fake_g_images,labeles))
      train_loss_g+=g_loss.item()
      g_loss.backward()
      optimizer_g.step()
      step += 1
      if step % 20 == 0:
        print(f"Epoch {epoch+1}, Step {step}, Loss G: {train_loss_g/step:.4f}, Loss D: {train_loss_d/step:.4f}")
    train_loss_g_epoch = train_loss_g/step
    train_loss_d_epoch = train_loss_d/step
    G_loses.append(train_loss_g_epoch)
    D_loses.append(train_loss_d_epoch)
    print(f"Epoch {epoch+1}/{epochs}, Training Loss G: {train_loss_g_epoch:.4f}, Training Loss D: {train_loss_d_epoch}")
  #plotting
  plt.figure(figsize=(20,8))
  plt.subplot(1,2,1)
  plt.title("Model Loss")
  plt.plot(G_loses, label="Generator")
  plt.legend()
  plt.grid()
  plt.subplot(1,2,2)
  plt.title("Model Loss")
  plt.plot(D_loses, label="Discriminator")
  plt.legend()
  plt.grid()
  plt.show()



In [None]:
#lets train
training_gan(my_generator,my_discriminaor,4,G_optimizer,D_optimizer,generator_loss,discriminator_loss,100,training_loader,50,device)