**https://github.com/aladdinpersson/Machine-Learning-Collection/tree/master/ML/Pytorch/Basics/custom_dataset**

In [None]:
import os
from pathlib import Path

data_entry_path = Path(neelmbansal_emphysema_dataset_from_nih_chest_x_ray_path) / "Data_Entry_2024.csv"
dataset_path = Path(neelmbansal_emphysema_dataset_from_nih_chest_x_ray_path) / "emphReform/emphReform"


In [None]:
!pip install grad-cam
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
from skimage import io
from PIL import Image
import matplotlib.pyplot as plt

class Emphysema_Dataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.annotations.iloc[index, 0])
        image = io.imread(img_path)
        image = Image.fromarray(image).convert('RGB')  # Convert the NumPy array to a PIL Image
        y_label = torch.tensor(self.annotations.iloc[index, 1])

        if self.transform:
            image = self.transform(image)

        return image, y_label

%load_ext tensorboard

In [None]:
from google.colab import output

# JavaScript code to prevent timeout
code = """
function KeepClicking(){
    console.log("Clicking to prevent timeout");
    document.querySelector("colab-toolbar-button").click();
}
setInterval(KeepClicking, 60000);
"""

output.eval_js(code)

In [None]:
#%tensorboard --logdir logs/resnet

# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import models
from pathlib import Path
from PIL import Image
#import tmm

from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir='./logs/resnet')

# Set device (use GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device,"||",torch.device)

# Load Emphysema_Dataset  (host domain)
transform = transforms.Compose([
  transforms.Resize((224, 224)),  # Resize images to the size expected by ResNet50 originally 1024 x1024
  transforms.ToTensor(),
  transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),  # Normalize based on ImageNet stats
])


dataset = Emphysema_Dataset(csv_file = data_entry_path, root_dir= dataset_path, transform=transform)

train_size = int(0.3 * len(dataset)) ## 0.6 is 60% split for train and testclea
test_size = len(dataset) - train_size
train_set, test_set = torch.utils.data.random_split(dataset, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(train_set, batch_size=96, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=96, shuffle=False)

In [None]:
from pytorch_grad_cam import GradCAM, HiResCAM, ScoreCAM, GradCAMPlusPlus, AblationCAM, XGradCAM, EigenCAM, FullGrad
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image, deprocess_image, preprocess_image
import cv2
import numpy as np

def gradcam_PPI (img_pth, device):

    # Check if the image file exists
    if not Path(img_pth).is_file():
        raise FileNotFoundError(f"Image file not found: {img_pth}")

    rgb_img = cv2.imread(img_pth)[:, :, ::-1]
    rgb_img = np.float32(rgb_img) / 255
    input_tensor = preprocess_image(rgb_img, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]).to(device)

    return rgb_img, input_tensor

pic1_rgb_img, pic1_input_tensor = gradcam_PPI(str(Path(dataset_path / "00000002_000.png")), device)
pic2_rgb_img, pic2_input_tensor = gradcam_PPI(str(Path(dataset_path / "00000009_000.png")), device)


In [None]:
from sklearn.metrics import f1_score, precision_score, roc_auc_score

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    print("i am here with", num_epochs)
    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        model.eval()

            # Log the training loss
        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

        writer.add_scalar('Training Loss', epoch_loss, epoch + 1)


        correct = 0 ##to calc accuracy
        total = 0
        extended_predicted = []
        extended_probabilities = []
        extended_labels = []
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                probabilities = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()
                extended_probabilities.extend(probabilities)
                extended_predicted.extend(predicted.cpu().numpy())
                extended_labels.extend(labels.cpu().numpy())

        accuracy = 100 * correct / total
        print(f'Accuracy of the model on the test images: {accuracy:.2f}%')

        f1 = f1_score(extended_labels, extended_predicted)
        precision = precision_score(extended_labels, extended_predicted)
        auc = roc_auc_score(extended_labels, extended_probabilities)

        writer.add_scalar('F1 Score', f1, epoch + 1)
        writer.add_scalar('Precision', precision, epoch + 1)
        writer.add_scalar('AUC', auc, epoch + 1)

##-- For Gradcan

        target_layers = [model.layer4[-1]]
        # Note: input_tensor can be a batch tensor with several images!

        # We have to specify the target we want to generate the CAM for.
        targets = [ClassifierOutputTarget(1)]


        # Construct the CAM object once, and then re-use it on many images.
        with GradCAM(model=model, target_layers=target_layers) as cam:
          # You can also pass aug_smooth=True and eigen_smooth=True, to apply smoothing.
          gc1 = cam(input_tensor=pic1_input_tensor, targets=targets)[0, :]
          # In this example gc1 has only one image in the batch:
          visualization1 = show_cam_on_image(pic1_rgb_img, gc1, use_rgb=True)
          # You can also get the model outputs without having to redo inference
          # You can also pass aug_smooth=True and eigen_smooth=True, to apply smoothing.
          gc2 = cam(input_tensor=pic2_input_tensor, targets=targets)[0, :]
          # In this example gc2 has only one image in the batch:
          visualization2 = show_cam_on_image(pic2_rgb_img, gc2, use_rgb=True)
          # You can also get the model outputs without having to redo inference

          writer.add_image(f"Grad-CAM/Image1/Epoch_{(epoch+1)}", visualization1, epoch+1, dataformats="HWC")
          writer.add_image(f"Grad-CAM/Image2/Epoch_{(epoch+1)}", visualization2, epoch+1, dataformats="HWC")


          #model_outputs = cam.outputs

          #import matplotlib.pyplot as plt
          #plt.imshow(visualization)
          #plt.axis('off')
          #plt.show()

        # Log the test accuracy
        writer.add_scalar('Test Accuracy', accuracy, epoch + 1)



        model.train()

In [None]:
def modelResnet():

    model = models.resnet50(pretrained=True)



    model.fc = nn.Linear(model.fc.in_features, 2)  # Emphysema_Dataset has 2 classes
    model = model.to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)


    return model, criterion, optimizer

In [None]:
# Train and evaluate the model
#model = Image.fromarray(model)

model, criterion, optimizer = modelResnet()


epochs = input("How Many Epochs?: ")

##. ASK FUTURE TYPES OF MODEL TO RUN AND BATCH SIZE



In [None]:
train_model(model, train_loader, criterion, optimizer, num_epochs=int(epochs))

https://github.com/jacobgil/pytorch-grad-cam

In [None]:
writer.close()
# Save the trained model

In [None]:
torch.save(model.state_dict(), 'trained_model.pth')


In [None]:
%tensorboard --logdir logs/resnet