# Installations

In [None]:
!pip install -q datasets transformers accelerate fastervit evaluate

In [None]:
!pip install timm==0.9.12

In [None]:
import torch
if torch.cuda.is_available():
    device_name = torch.device("cuda")
else:
    device_name = torch.device('cpu')
print("Using {}.".format(device_name))

# Imports

In [None]:
from datasets import load_dataset, DatasetDict, load_metric, Dataset
from transformers import AutoImageProcessor, AutoFeatureExtractor, AutoModelForImageClassification, TrainingArguments, Trainer, ViTForImageClassification, ViTFeatureExtractor, ViTImageProcessor, Swinv2Model
from torchvision import transforms
from sklearn.metrics import accuracy_score, roc_curve, roc_auc_score
from sklearn.utils import resample
from sklearn.model_selection import train_test_split
from fastai.vision.all import *
from torchvision.io import read_image
from fastervit import create_model
from torchvision.transforms import (
    CenterCrop,
    Compose,
    Normalize,
    RandomHorizontalFlip,
    RandomResizedCrop,
    Resize,
    ToTensor,
)
import timm
import evaluate
import datasets
import numpy as np
import torch
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import PIL

# Reusable Functions

In [None]:
labels = ["No Cancer", "Cancer"]
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
    label2id[label] = i
    id2label[i] = label

In [None]:
def batch_sampler(examples):
    pixel_values = torch.stack([example["pixel_values"] for example in examples])
    labels = torch.tensor([example["cancer"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    # Calculate accuracy
    accuracy = accuracy_score(labels, preds)

   # Calculate precision, recall, and F1-score
    precision = precision_score(labels, preds, average='weighted')
    recall = recall_score(labels, preds, average='weighted')
    f1 = f1_score(labels, preds, average='weighted')

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [None]:
class ConvertToRGB(Transform):
    def __init__(self):
        pass

    def encodes(self, x: PIL.Image.Image) -> PIL.Image.Image:
        return x.convert('RGB')

In [None]:
def convert_to_dataset(data):
    dataset = Dataset.from_pandas(data)
    return dataset

# Import Clean Data

In [None]:
dataset = load_dataset("Nicole-M/Dataset1")
dataset

In [None]:
dfTrain = pd.DataFrame(dataset['train'])
dfVal = pd.DataFrame(dataset['validate'])
dfTest = pd.DataFrame(dataset['test'])

# Image Preprocessing

In [None]:
# Transforms for pre-processing across a batch.
class imageTransform:

    def __init__(self, featureExtractor):
        size = (featureExtractor.size["height"], featureExtractor.size["width"])
        self.transforms = transforms.Compose([
            transforms.Resize(size),
            transforms.ToTensor(),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomVerticalFlip(p=0.5),
            transforms.RandomRotation(degrees=180),
            transforms.Normalize(mean=featureExtractor.image_mean, std=featureExtractor.image_std),
        ])

    def __call__(self, example_batch):
        example_batch["pixel_values"] = [self.transforms(image.convert("RGB")) for image in example_batch["image"]] # Convert grayscale image to RGB
        return example_batch


In [None]:
class valImageTransform:

    def __init__(self, featureExtractor):
        size = (featureExtractor.size["height"], featureExtractor.size["width"])
        self.transforms = transforms.Compose([
            transforms.Resize(size),
            transforms.ToTensor(),
            transforms.Normalize(mean=featureExtractor.image_mean, std=featureExtractor.image_std),
        ])

    def __call__(self, example_batch):
        example_batch["pixel_values"] = [self.transforms(image.convert("RGB")) for image in example_batch["image"]] # Convert grayscale image to RGB
        return example_batch

# VIT model

Load Model

In [None]:
# VIT
VIT = "google/vit-base-patch16-224-in21k"

Setup

In [None]:
vitImageProcessor = ViTImageProcessor.from_pretrained(VIT)

In [None]:
vitModel = ViTForImageClassification.from_pretrained(
    VIT,
    num_labels=2,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes = True, # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
)

In [None]:
# vitModel

In [None]:
# vitModel.config

In [None]:
# Set the training transforms
dataset["train"].set_transform(imageTransform(vitImageProcessor))
# Set the validation transforms
dataset["validate"].set_transform(valImageTransform(vitImageProcessor))
# Set the test transforms
dataset["test"].set_transform(valImageTransform(vitImageProcessor))

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
args = TrainingArguments(
    remove_unused_columns=False,
    output_dir="./results/Dataset1/vit2",
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=5,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    logging_dir='./logs',
    weight_decay=0.01,
    lr_scheduler_type='linear',
    save_total_limit=1,
    push_to_hub=True,
)

trainer = Trainer(
    model=vitModel,
    args=args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validate"],
    tokenizer=vitImageProcessor,
    compute_metrics=compute_metrics,
    data_collator=batch_sampler,
)

Train

In [None]:
# Fine-tune the model
train_results = trainer.train()

In [None]:
kwargs = {
    "finetuned_from": "VIT",
    "tasks": "image-classification",
    "dataset": 'Mammogram V1',
    "tags": ['image-classification', 'breast cancer'],
}

if args.push_to_hub:
    trainer.push_to_hub('VIT-fineTuned', **kwargs)
else:
    trainer.create_model_card(**kwargs)


In [None]:
train_results.metrics

In [None]:
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

In [None]:
# trainer.state.log_history

In [None]:
history = pd.DataFrame(trainer.state.log_history)
history.head(5)

In [None]:
history.to_csv('Dataset1-ViT.csv')

In [None]:
from matplotlib import pyplot as plt
history.plot(kind='line', x='learning_rate', y='epoch')

In [None]:
from matplotlib import pyplot as plt
history.plot(kind='line', x='learning_rate', y='loss')

In [None]:
history.plot(kind='line', x='epoch', y='loss')

In [None]:
from matplotlib import pyplot as plt
history.plot(kind='line', x='epoch', y='eval_loss')

In [None]:
outputs = trainer.predict(dataset['test'])
print(outputs.metrics)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

y_true = outputs.label_ids
y_pred = outputs.predictions.argmax(1)

labels = ['Malignant', 'Benign']
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot()

In [None]:
# Assuming y_true are the true labels and y_scores are the predicted probabilities
y_true = outputs.label_ids
y_scores = outputs.predictions.argmax(1)

# Calculate the ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_scores)

# Calculate AUC
auc_score = roc_auc_score(y_true, y_scores)
print(f'AUC: {auc_score}')

# Plot the ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {auc_score:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc="lower right")
plt.show()

# SWIN-V2 model

Load Model

In [None]:
# SWIN-V2
SwinV2 = "microsoft/swinv2-base-patch4-window8-256"

Setup

In [None]:
SwinV2ImageProcessor  = AutoImageProcessor.from_pretrained(SwinV2)

In [None]:
swinV2Model = AutoModelForImageClassification.from_pretrained(
    SwinV2,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes = True,
)

In [None]:
# swinV2Model

In [None]:
# swinV2Model.config

In [None]:
# Set the training transforms
dataset['train'].set_transform(imageTransform(SwinV2ImageProcessor))
# Set the validation transforms
dataset['validate'].set_transform(valImageTransform(SwinV2ImageProcessor))
# Set the test transforms
dataset['test'].set_transform(valImageTransform(SwinV2ImageProcessor))

In [None]:
training_args = TrainingArguments(
    remove_unused_columns=False,
    output_dir="./results/swinV2-Mammmogram-V1",
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=10, #IDEAL NUMBER IS 3/4
    weight_decay=0.01,
    logging_steps=10,
    logging_dir="./logs",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    # push_to_hub=True,
    lr_scheduler_type='linear',
    save_total_limit=1,
)

# Instantiate the Trainer object
trainer = Trainer(
    model=swinV2Model,
    args=training_args,
    data_collator=batch_sampler,
    compute_metrics=compute_metrics,
    train_dataset=dataset['train'],
    eval_dataset=dataset['validate'],
    tokenizer=SwinV2ImageProcessor,
)

Train

In [None]:
# Fine-tune the model
swinTrainResult = trainer.train()

In [None]:
kwargs = {
    "finetuned_from": "swinv2",
    "tasks": "image-classification",
    "dataset": 'Mammogram V1',
    "tags": ['image-classification', 'breast cancer'],
}

# if training_args.push_to_hub:
trainer.push_to_hub('SwinV2-finetuned', **kwargs)
# else:
#     trainer.create_model_card(**kwargs)

In [None]:
!cp -r '/content/results' /content/drive/MyDrive/Data/Sept

In [None]:
swinTrainResult.metrics

In [None]:
trainer.save_model()
trainer.log_metrics("train", swinTrainResult.metrics)
trainer.save_metrics("train", swinTrainResult.metrics)
trainer.save_state()

In [None]:
trainer.state.log_history

In [None]:
history = pd.DataFrame(trainer.state.log_history)
history.head(5)

In [None]:
history.to_csv('Dataset1-SwinV2.csv')

In [None]:
# SwinMetrics = trainer.evaluate(val_data)
# trainer.log_metrics("eval", SwinMetrics)
# trainer.save_metrics("eval", SwinMetrics)
# print(f"Evaluation results: {SwinMetrics}")

In [None]:
# @title learning_rate vs epoch

from matplotlib import pyplot as plt
history.plot(kind='line', x='learning_rate', y='epoch')

In [None]:
from matplotlib import pyplot as plt
history.plot(kind='line', x='learning_rate', y='loss')

In [None]:
from matplotlib import pyplot as plt
history.plot(kind='line', x='epoch', y='loss')

In [None]:
# @title accuracy vs epoch

from matplotlib import pyplot as plt
history.plot(kind='line', x='eval_accuracy', y='epoch')

In [None]:
swinOutputs = trainer.predict(dataset['test'])
print(swinOutputs.metrics)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

y_true = swinOutputs.label_ids
y_pred = swinOutputs.predictions.argmax(1)

labels = ['Malignant', 'Benign']
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot()

In [None]:
# Assuming y_true are the true labels and y_scores are the predicted probabilities
y_true = swinOutputs.label_ids
y_scores = swinOutputs.predictions.argmax(1)

# Calculate the ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_scores)

# Calculate AUC
auc_score = roc_auc_score(y_true, y_scores)
print(f'AUC: {auc_score}')

# Plot the ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {auc_score:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC)')
plt.legend(loc="lower right")
plt.show()

# Fast VIT model

Load Model

In [None]:
# Fast VIT
pretrained_fastVit = timm.create_model('fastvit_sa24.apple_in1k', pretrained=True, num_classes=2)

In [None]:
pretrained_fastVit

In [None]:
transformed = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

In [None]:
item_tfms = [transforms.Resize((256,256)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]
# batch_tfms = [*aug_transforms(size=224, max_warp=0), transforms.functional.normalize]

In [None]:
# Define functions to retrieve images and labels from the DataFrame
def get_x(row): return row['image']  # Column containing PIL.Image objects
def get_y(row): return row['cancer']  # Column containing labels

# Define the DataBlock
dblock = DataBlock(
    blocks=(ImageBlock(PILImage), CategoryBlock),  # Handle PIL.Image and categorical labels
    get_x=get_x,
    get_y=get_y,
    splitter=RandomSplitter(valid_pct=0.2, seed=999),  # 80-20 train-validation split with a seed
    # item_tfms=item_tfms,  # Example item transformation
    # batch_tfms=batch_tfms  # Example batch transformations
)

# Create DataLoaders
dls = dblock.dataloaders(dfTrain, bs=32)

# Show a batch of images
dls.show_batch(max_n=9, figsize=(6,6))

In [None]:
learner = vision_learner(dls, 'fastvit_sa24.apple_in1k', metrics=accuracy)
learner.lr_find()

In [None]:
learner.fine_tune(5, 5e-4)

In [None]:
learner.recorder.plot_loss()

In [None]:
interp = ClassificationInterpretation.from_learner(learner)
interp.plot_confusion_matrix()

In [None]:
interp.most_confused(min_val=50)

In [None]:
interp.plot_top_losses(9, figsize= (16,16))

In [None]:
learner.save('/content/Mammogram_FastViT')
learner.export()
!cp -r '/content/Mammogram_FastViT.pth' /content/drive/MyDrive/Data/

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
from huggingface_hub import push_to_hub_fastai

repo_id = "Nicole-M/fastViT-Mammogram-V1"

push_to_hub_fastai(learner=learner, repo_id=repo_id)

# Faster ViT model

In [None]:
# Load FasterViT model
pretrainedFasterViT = create_model('faster_vit_3_224', pretrained=True, model_path="/content/drive/MyDrive/Pretrained-models/fastervit_3_224_1k.pth.tar")

# Print the model architecture
print(model)

In [None]:
# Modify the final layer for custom classification
num_ftrs = pretrainedFasterViT.head.in_features
pretrainedFasterViT.head = torch.nn.Linear(num_ftrs, 2)

In [None]:
# Move the model to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
pretrainedFasterViT = pretrainedFasterViT.to(device)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = pd.DataFrame(dataframe)

    def __getitem__(self, index):
        image = self.dataframe.iloc[index, 0]
        label = self.dataframe.iloc[index, 1]
        # print(image.shape)
        # print(label)
        # image = transforms.ToPILImage()(image)
        image = ConvertToRGB()(image)
        # print(type(image))
        image = transforms.ToTensor()(image)
        image = transforms.Resize((224,224))(image)
        image = transforms.RandomRotation(180)(image)
        image = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.dataframe)

data = CustomDataset(dataframe=dfTrain)
trainDataloader = DataLoader(data, batch_size=32, shuffle=True )
# for sample in trainDataloader:
#     print(sample)
#     break

In [None]:
class CustomTestDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = pd.DataFrame(dataframe)
        print(self.dataframe)

    def __getitem__(self, index):
        image = self.dataframe.iloc[index, 0]
        label = self.dataframe.iloc[index, 1]
        image = ConvertToRGB()(image)
        image = transforms.ToTensor()(image)
        image = transforms.Resize((224,224))(image)
        image = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.dataframe)

data = CustomTestDataset(dataframe=dfTest)
testDataloader = DataLoader(data, batch_size=32, shuffle=True )
# for sample in testDataloader:
#     print(sample)
#     break

In [None]:
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader

# Define loss function
criterion = torch.nn.CrossEntropyLoss()

# Define optimizer
# optimizer = optim.SGD(pretrainedFasterViT.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(pretrainedFasterViT.parameters(), lr=0.0005)

# Learning rate scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
# Training loop
num_epochs = 4
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pretrainedFasterViT.to(device)

train_losses = []  # To store the losses for plotting
best_val_loss = float('inf')  # Initialize with a very large value
y_pred = []
y_true = []

# Train the model
for epoch in range(num_epochs):

    # Train the model on the training set
    pretrainedFasterViT.train()

    # Initialize the training loss accumulator to zero
    training_loss = 0.0

    for i, (image, labels) in enumerate(trainDataloader):
        # Prepare data and send it to the proper device
        image = image.to(device)
        # print(image.shape)
        labels = labels.float().to(device)

        # Clear the gradients of all optimized parameters
        optimizer.zero_grad()

        # Forward pass: obtain model predictions for the input data
        outputs = pretrainedFasterViT(image)

        # Compute the loss between the model predictions and the true labels
        loss = criterion(outputs, labels.long())

        # Backward pass: compute gradients of the loss with respect to model parameters
        loss.backward()

        # Update model parameters using the computed gradients and the optimizer
        optimizer.step()

        # Update the training loss
        training_loss += loss.item()

    # Calculate average training loss
    train_loss = training_loss / len(trainDataloader)
    train_losses.append(train_loss)

# Evaluate the model on the validation set
    pretrainedFasterViT.eval()
    val_loss = 0.0
    correct_preds = 0
    total_samples = 0
    with torch.no_grad():
        for image, labels in testDataloader:
            # Prepare data and send it to the proper device
            image = image.to(device)
            labels = labels.float().to(device)

            # Forward pass: obtain model predictions for the input data
            outputs = pretrainedFasterViT(image.float())

            # Compute the loss between the model predictions and the true labels
            loss = criterion(outputs, labels.long())

            # Update the validation loss
            val_loss += loss.item()

             # Round up and down to either 1 or 0
            predicted = torch.round(outputs)
            total_samples += labels.size(0)

            # Calculate how many images were correctly classified
            correct_preds += torch.sum(torch.all(torch.eq(predicted[:, 0], labels))).item()

    output2 = (torch.max(torch.exp(predicted), 1)[1]).data.cpu().numpy()
    y_pred.extend(output2) # Save Prediction

    labels2 = labels.data.cpu().numpy()
    y_true.extend(labels2) # Save Truth

    # Calculate validation loss
    val_loss /= len(testDataloader)

    # Calculate validation accuracy
    val_acc = correct_preds / total_samples * 100

    # Print validation loss and accuracy
    print(f"Epoch [{epoch + 1}/{num_epochs}] Train Loss: {train_loss:.4f}  Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%")

    # Save the model if it performs better on validation set
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(pretrainedFasterViT.state_dict(), f'/content/best_model_epoch_{epoch + 1}.pth')

print('Finished Training')

# Plotting the evolution of loss
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Evolution of Training Loss')
plt.legend()
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

cm = confusion_matrix(y_true, y_pred)
ConfusionMatrixDisplay(cm).plot()

In [None]:
# Save the model
torch.save(pretrainedFasterViT.state_dict(), '/content/faster_vit_Dataset1.pth')

In [None]:
!cp -r '/content/faster_vit_Dataset1.pth' /content/drive/MyDrive/Data/results

# Save to Google Drive

In [None]:
# !cp -r '/content/best_model_epoch_2.pth' /content/drive/MyDrive/Data/results