<a href="https://colab.research.google.com/github/Harshyadv/Worked-up/blob/main/CNNWithRESNET(50)Integration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
from medmnist import INFO, Evaluator
from medmnist.dataset import BreastMNIST
import torchvision.transforms as transforms
import torchvision.models as models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
from PIL import Image
import os
from torchvision.models import resnet50, ResNet50_Weights

In [None]:
# Check if CUDA is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Define transformations (resizing to 224x224 and normalizing)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])  # Normalize to [-1, 1]
])

In [None]:
# Load the BreastMNIST dataset
dataset = BreastMNIST(split='train', transform=transform, download=True)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

test_dataset = BreastMNIST(split='test', transform=transform, download=True)

Using downloaded and verified file: /root/.medmnist/breastmnist.npz
Using downloaded and verified file: /root/.medmnist/breastmnist.npz


In [None]:
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Define the ResNet-50 model
class ResNet50(nn.Module):
    def __init__(self):
        super(ResNet50, self).__init__()
        self.resnet50 = models.resnet50(weights=ResNet50_Weights.DEFAULT)
        # Modify ResNet-50 to accept 1-channel input instead of 3
        self.resnet50.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet50.fc = nn.Linear(self.resnet50.fc.in_features, 128)  # Change output to 128 for concatenation

    def forward(self, x):
        x = self.resnet50(x)
        return x

In [None]:
# Define the CNN model mimicking QCNN
class QCNN(nn.Module):
    def __init__(self):
        super(QCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(128 * 28 * 28, 128)  # Adjust based on input size
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 128 * 28 * 28)  # Flatten for fully connected layer
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        return x

In [None]:
# Define the hybrid model that combines QCNN and ResNet-50
class HybridModel(nn.Module):
    def __init__(self):
        super(HybridModel, self).__init__()
        self.qcnn = QCNN()
        self.resnet50 = ResNet50()
        self.fc = nn.Linear(128 + 128, 2)  # Combine 128 from QCNN and 128 from ResNet-50

    def forward(self, x):
        qcnn_output = self.qcnn(x)
        resnet_output = self.resnet50(x)
        combined = torch.cat((qcnn_output, resnet_output), dim=1)
        output = self.fc(combined)
        return output

In [None]:
# Instantiate the hybrid model and move it to GPU if available
model = HybridModel().to(device)

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 156MB/s]


In [None]:
# Define, criterion, optimizer as before
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5)

In [None]:
# Training loop
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.squeeze().to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    return running_loss / len(train_loader)

In [None]:
# Validation loop
def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.squeeze().to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            pred = outputs.argmax(dim=1, keepdim=True)
            correct += pred.eq(labels.view_as(pred)).sum().item()
    val_loss /= len(val_loader)
    accuracy = 100. * correct / len(val_loader.dataset)
    return val_loss, accuracy

In [None]:
# Early stopping variables
best_val_loss = float('inf')
patience = 10
early_stop_count = 0

num_epochs = 100  # Start with a higher number of epochs

for epoch in range(1, num_epochs + 1):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = validate(model, val_loader, criterion, device)
    print(f'Epoch {epoch}/{num_epochs}, Training Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_count = 0  # Reset the counter if we get a new best model
    else:
        early_stop_count += 1
        if early_stop_count >= patience:
            print("Early stopping triggered.")
            break

    # Adjust the learning rate based on the validation loss
    scheduler.step(val_loss)
    print(f'Learning rate: {scheduler.get_last_lr()}')

Epoch 1/100, Training Loss: 0.5784, Validation Loss: 0.5882, Validation Accuracy: 70.00%
Learning rate: [0.001]
Epoch 2/100, Training Loss: 0.4411, Validation Loss: 0.6459, Validation Accuracy: 58.18%
Learning rate: [0.001]
Epoch 3/100, Training Loss: 0.3185, Validation Loss: 0.6171, Validation Accuracy: 61.82%
Learning rate: [0.001]
Epoch 4/100, Training Loss: 0.2969, Validation Loss: 0.5568, Validation Accuracy: 70.00%
Learning rate: [0.001]
Epoch 5/100, Training Loss: 0.2388, Validation Loss: 1.0764, Validation Accuracy: 77.27%
Learning rate: [0.001]
Epoch 6/100, Training Loss: 0.1764, Validation Loss: 0.9605, Validation Accuracy: 73.64%
Learning rate: [0.001]
Epoch 7/100, Training Loss: 0.2023, Validation Loss: 0.7292, Validation Accuracy: 70.00%
Learning rate: [0.001]
Epoch 8/100, Training Loss: 0.1791, Validation Loss: 2.2042, Validation Accuracy: 70.00%
Learning rate: [0.001]
Epoch 9/100, Training Loss: 0.0725, Validation Loss: 0.6652, Validation Accuracy: 78.18%
Learning rate: 

In [None]:
# Test the model and calculate metrics
def test(model, test_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.squeeze().to(device)
            outputs = model(inputs)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    print(f'Test Accuracy: {accuracy:.2f}%')
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 Score: {f1:.2f}')

    return all_preds, all_labels

In [None]:
# Run the test and get predictions
preds, labels = test(model, test_loader, device)

Test Accuracy: 0.84%
Precision: 0.87
Recall: 0.92
F1 Score: 0.89


In [None]:
# Output images with predictions
output_dir = 'ImageFolder'
os.makedirs(output_dir, exist_ok=True)
for i, (pred, label) in enumerate(zip(preds, labels)):
    img, _ = test_dataset[i]
    img = transforms.ToPILImage()(img)
    # Include prediction and label in the filename
    img.save(os.path.join(output_dir, f'img_{i}_pred_{pred}_label_{label}.png'))

print(f"Images saved to '{output_dir}' directory.")

Images saved to 'ImageFolder' directory.
