In [13]:
import torch
import time
import os
import cv2
import random 
import torch.nn as nn
import numpy as np
import pandas as pd
import torch.backends.cudnn as cudnn
import torch.cuda.amp as amp 
import matplotlib.pyplot as plt
import seaborn as sns
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from pytorch_grad_cam import AblationCAM
from pytorch_grad_cam.ablation_layer import AblationLayerVit
from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR
from swin_transformer import SwinTransformer
from sklearn.metrics import accuracy_score, recall_score, f1_score, confusion_matrix, roc_curve
from torchsummary import summary    

OSError: /home/user/.conda/envs/sanket_swin/lib/python3.7/site-packages/nvidia/cublas/lib/libcublas.so.11: undefined symbol: cublasLtGetStatusString, version libcublasLt.so.11

In [8]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(degrees=180), 
    transforms.GaussianBlur(kernel_size=5, sigma=(1.0, 3.0)),
    transforms.RandomAffine(degrees=0, translate=(0.25, 0.25), scale=(0.9, 1.1)),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.0, 0.0, 0.0), std=(1.0, 1.0, 1.0)),   
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.0, 0.0, 0.0), std=(1.0, 1.0, 1.0)),  
])

def mixup(data, targets, alpha=1.0):
    """
    Applies MixUp augmentation to the input data and targets.

    Args:
        data (torch.Tensor): Input data tensor.
        targets (torch.Tensor): Target labels tensor.
        alpha (float): MixUp hyperparameter controlling the mix ratio.

    Returns:
        mixed_data (torch.Tensor): Mixed data tensor.
        targets_a (torch.Tensor): Targets for the first data sample.
        targets_b (torch.Tensor): Targets for the second data sample.
        lam (float): Lambda value representing the mix ratio.
    """
    if alpha == 0:
        return data, targets, targets, 1.0 
    lam = np.random.beta(alpha, alpha)
    batch_size = data.size(0)
    index = torch.randperm(batch_size)
    mixed_data = lam * data + (1 - lam) * data[index, :]
    targets_a, targets_b = targets, targets[index]
    return mixed_data, targets_a, targets_b, lam

NameError: name 'transforms' is not defined

In [4]:
# Set the device for PyTorch to CUDA if available, otherwise use CPU.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set a random seed for reproducibility and configure CUDA for benchmarking.
seed = 2
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
cudnn.benchmark = True

# Define the file paths for the training and validation datasets.
train_data_path = 'birads/Data/Train'
val_data_path = 'birads/Data/Test'

# Create PyTorch datasets with specified transformations.
train_dataset = ImageFolder(train_data_path, transform=train_transform)
val_dataset = ImageFolder(val_data_path, transform=val_transform)

# Set the batch size and create data loaders for training and validation.
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size,shuffle=True, num_workers=8)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

In [5]:
# Define the Swin Transformer model configuration.
swin_config = {
    'img_size': 224,
    'patch_size': 4,
    'in_chans': 3,
    'num_classes': 5,
    'embed_dim': 96,
    'depths': [2, 2, 6, 2],
    'num_heads': [3, 6, 12, 24],
    'window_size': 7,
    'mlp_ratio': 4,
    'stochastic_depth_prob': 0.2,
}

# Instantiate Swin Transformer model. 
model = SwinTransformer(img_size=swin_config['img_size'],
                        patch_size=swin_config['patch_size'],
                        in_chans=swin_config['in_chans'],
                        num_classes=swin_config['num_classes'],
                        embed_dim=swin_config['embed_dim'],
                        depths=swin_config['depths'],
                        num_heads=swin_config['num_heads'],
                        window_size=swin_config['window_size'],
                        mlp_ratio=swin_config['mlp_ratio'],
                        qkv_bias=True,
                        qk_scale=None,
                        drop_rate=0.0,
                        drop_path_rate=0.1,
                        ape=False,
                        patch_norm=True,
                        use_checkpoint=False,
                        fused_window_process=False).to(device)

# Generate a summary of the model's architecture
#summary(model, (3, 224, 224)) 


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:

# Set up the Cross Entropy loss function
criterion = nn.CrossEntropyLoss()

# Set the Optimizer as AdamW
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=7.8125e-06,     
    betas=(0.9, 0.999), 
    eps=1.0e-08,      
    weight_decay=0.05,   
)

# Set the Scheduler
scheduler = CosineAnnealingLR(
    optimizer,
    T_max=30,   
    eta_min=7.8125e-08      
)

# Set gradient scaler
scaler = amp.GradScaler()

In [None]:
import torch
print(torch.cuda.is_available())

True


In [1]:
# Initialize training parameters and lists to track metrics.
num_epochs = 100
best_accuracy = 0.0 
best_recall = 0.0
best_f1_score = 0.0
best_epoch_accuracy = 0
best_sensitivity=0
best_model_path = 'birads/save'
train_losses = []
val_losses = []
val_accuracies = []
best_accuracy = 0.0

# Main training loop over epochs.
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    start_time = time.time()

    # Training loop over batches.
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        # Apply MixUp augmentation.
        # if np.random.rand() < 0.5:
        #     images, targets_a, targets_b, lam = mixup(images, labels, alpha=1.0)
        # else:
        #     images, targets_a, targets_b, lam = mixup(images, labels, alpha=1.0)
        optimizer.zero_grad()
        
        with amp.autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)
            #loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)
            
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        #scheduler.step()

        running_loss += loss.item()
    
    train_losses.append(running_loss/len(train_loader))
    end_time = time.time()
    epoch_time = end_time - start_time
    print("############################################################")
    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {running_loss/len(train_loader)}, Time: {epoch_time:.2f} seconds")

    if (epoch+1) % 2 == 0:
        model.eval()
        correct = 0
        total = 0
        val_predictions = []
        val_labels = []
        val_predictions_prob=[]
        running_val_loss = 0.0

        # Validation loop to evaluate model performance.
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                val_predictions_prob.append(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
                val_predictions.extend(predicted.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())


                val_loss = criterion(outputs, labels)   
                running_val_loss += val_loss.item()

        accuracy = 100 * correct / total
        val_losses.append(running_val_loss/len(val_loader))
        val_predictions_prob = np.concatenate(val_predictions_prob, axis=0)

        # Print and save model performance metrics.
        print(f"Val Loss: {running_val_loss/len(val_loader)}")
        val_accuracies.append(accuracy)
        recall = recall_score(val_labels, val_predictions)
        f1 = f1_score(val_labels, val_predictions)

        #Calculating sensitivity at 95% specificity
        fpr, tpr, thresholds = roc_curve(val_labels, val_predictions_prob[:, 1])
        target_specificity = 0.95
        target_fpr_index = next(i for i, rate in enumerate(fpr) if rate >= 1 - target_specificity)
        threshold = thresholds[target_fpr_index]
        val_predictions_binary = (val_predictions_prob[:, 1] >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(val_labels, val_predictions_binary).ravel()
        sens_at_95spec = tp / (tp + fn)
    

        print(f"Current Validation Accuracy: {accuracy}%")
        print(f"Current Validation Recall: {recall}")
        print(f"Current Validation F1 Score: {f1}")
        print(f"Sensitivity at 95% Specificity: {sens_at_95spec}")
        

        if sens_at_95spec > best_sensitivity:
            best_sensitivity = sens_at_95spec
            # Save the model.
            model_path = os.path.join(best_model_path, f"model_classify_{epoch + 1}.pth")
            torch.save(model.state_dict(), model_path)
            print("Model Saved!")

# Plot training and validation metrics.
plt.figure(figsize=(12, 3))
plt.subplot(1, 3, 1)
plt.plot(range(1, num_epochs + 1, 1), train_losses, label='Train Loss', color='blue')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
plt.legend()

plt.subplot(1, 3, 2)
plt.plot(range(2, num_epochs + 1, 2), val_losses, label='Validation Loss', color='orange')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
plt.legend()

plt.subplot(1, 3, 3)
plt.plot(range(2, num_epochs + 1, 2), val_accuracies, label='Validation Accuracy', color='green')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True))
plt.legend()
plt.show()


NameError: name 'model' is not defined

In [11]:
#Defining data preprocessing for test data.
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.0, 0.0, 0.0), std=(1.0, 1.0, 1.0)),
])

#Defining path for saving test prediction, model checkpoint, CAMs, and Excel file
test_data_path = '/home/ee22s501/c2/data_classify/test'
model_path = '/home/ee22s501/c2/code/save/model_classify_72.pth'  
cams_path = '/home/ee22s501/c2/code/save/cams/'
excel_file_path = '/home/ee22s501/c2/code/save/predictions_classify.xlsx'
batch_size = 1

# Create a test dataset and data loader for inference.
test_dataset = ImageFolder(test_data_path, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

In [12]:
# Define a function to reshape a tensor for Ablation-CAM function 
def reshape_transform(tensor, height=7, width=7):
    """
    Reshape a tensor to the specified height and width dimensions.

    Args:
        tensor (torch.Tensor): Input tensor.
        height (int): Target height dimension.
        width (int): Target width dimension.

    Returns:
        result (torch.Tensor): Reshaped tensor.
    """
    result = tensor.reshape(tensor.size(0), height, width, tensor.size(2))
    result = result.transpose(2, 3).transpose(1, 2)
    return result 


# Load a pre-trained model and set it to evaluation mode.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.load_state_dict(torch.load(model_path))
model.eval()

# Define class names, fonts, and variables for predictions and image file names.
class_names = ['NRG', 'RG']
i = 0
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_color = (255, 255, 255)
font_thickness = 1
predicted_labels = []
image_file_names = []

# Loop through test data and generate predictions.
for images, _ in test_loader:
    images = images.to(device)
    outputs = model(images)
    _, predicted = torch.max(outputs.data, 1)
    predicted_label = class_names[predicted.item()]

## Uncomment this code for generating Ablation-CAM plots. 
    # target_layers = [model.layers[-1].blocks[-1].norm2]
    # cam = AblationCAM(model=model, target_layers=target_layers, reshape_transform=reshape_transform, ablation_layer=AblationLayerVit())
    # grayscale_cam = cam(input_tensor=images, aug_smooth=True, eigen_smooth=True, targets=None)
    # rgb_img = cv2.imread(test_loader.dataset.samples[i][0], 1)[:, :, ::-1]
    # rgb_img_n = np.float32(rgb_img) / 255
    # cam_image = show_cam_on_image(rgb_img_n, grayscale_cam[0, :])

    # predicted_labels.append(predicted_label)
    # image_file_names.append(os.path.basename(test_loader.dataset.samples[i][0]))

    # combined_image = np.copy(cam_image)
    # text = f'Predicted: {predicted_label}'
    # cv2.putText(combined_image, text, (15, 15), font, font_scale, font_color, font_thickness)
    # cv2.imwrite(os.path.join(cams_path, os.path.basename(test_loader.dataset.samples[i][0])), combined_image)

    ## Append predicted labels and image file names.
    file_name = os.path.splitext(os.path.basename(test_loader.dataset.samples[i][0]))[0]
    image_file_names.append(file_name)
    predicted_labels.append(predicted.item())
    i += 1

# Create a DataFrame and save predictions to an Excel file.
data = {
    'Image name': image_file_names,
    'Predicted Class label': predicted_labels,
}
df = pd.DataFrame(data)
df['Predicted Class label'] = df['Predicted Class label'].map({0: 'NRG', 1: 'RG'})

df.to_excel(excel_file_path, index=False)
print(f"Predictions saved to {excel_file_path}")


Predictions saved to /home/ee22s501/cvip/code/save/predictions_classify.xlsx
