In [14]:
import os
import random
import shutil
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
from PIL import ImageOps

torch.manual_seed(100)

<torch._C.Generator at 0x7f8f59354630>

In [None]:
old_names = ["NORMAL", "COVID", "PNEUMONIA"]
new_names = ["0", "1", "2"]

In [15]:
dataset_path = 'C:\\Users\\Asus\\Downloads\\SoftLab\\Assignment1\\Dataset\\'
train_path =   'C:\\Users\\Asus\\Downloads\\SoftLab\\Assignment1\\Final_Dataset\\train\\'
val_path =     'C:\\Users\\Asus\\Downloads\\SoftLab\\Assignment1\\Final_Dataset\\val\\'
test_path =    'C:\\Users\\Asus\\Downloads\\SoftLab\\Assignment1\\Final_Dataset\\test\\'

In [None]:
for old_name, new_name in zip(old_names, new_names):
    old_folder_path = os.path.join(dataset_path, old_name)
    new_folder_path = os.path.join(dataset_path, new_name)
    os.rename(old_folder_path, new_folder_path)

In [16]:
classes = ['0' ,'1' ,'2']
batch_size = 32
num_iters=3000

In [17]:
os.makedirs(train_path, exist_ok=True)
os.makedirs(val_path, exist_ok=True)
os.makedirs(test_path, exist_ok=True)

train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

In [18]:
for class_name in classes:
    print(class_name)
    class_folder_path = os.path.join(dataset_path, class_name)
    images = os.listdir(class_folder_path)
    random.shuffle(images)

    train_split = int(len(images) * train_ratio)
    val_split = int(len(images) * (train_ratio + val_ratio))

    train_images = images[:train_split]
    val_images = images[train_split:val_split]
    test_images = images[val_split:]

    for img_name in train_images:
        src = os.path.join(class_folder_path, img_name)
        dst = os.path.join(train_path, class_name, img_name)
        os.makedirs(os.path.join(train_path, class_name), exist_ok=True)
        shutil.move(src, dst)

    for img_name in val_images:
        src = os.path.join(class_folder_path, img_name)
        dst = os.path.join(val_path, class_name, img_name)
        os.makedirs(os.path.join(val_path, class_name), exist_ok=True)
        shutil.move(src, dst)

    for img_name in test_images:
        src = os.path.join(class_folder_path, img_name)
        dst = os.path.join(test_path, class_name, img_name)
        os.makedirs(os.path.join(test_path, class_name), exist_ok=True)
        shutil.move(src, dst)

In [19]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Lambda(lambda img: ImageOps.equalize(img, mask=None)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

train_dataset = ImageFolder(train_path, transform=transform)
val_dataset = ImageFolder(val_path, transform=transform)
test_dataset = ImageFolder(test_path, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [20]:
print(len(train_dataset))
print(len(test_dataset))
print(len(val_dataset))

3659
785
784


In [21]:
# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()
#         self.conv_layers = nn.Sequential(
#             nn.Conv2d(3, 16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(16, 32, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2)
#         )
#         self.fc_layers = nn.Sequential(
#             nn.Linear(32 * 56 * 56, 256),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(256, 128),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(128, 3)
#         )

#     def forward(self, x):
#         x = self.conv_layers(x)
#         x = x.view(x.size(0), -1)
#         x = self.fc_layers(x)
#         return x


# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()
#         self.conv_layers = nn.Sequential(
#             nn.Conv2d(3, 16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(16, 32, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(32, 64, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(64, 128, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(128, 256, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#         )
#         self.fc_layers = nn.Sequential(
#             nn.Linear(256 * 7 * 7, 512),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(512, 1024),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(1024, 2048),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(2048, 2048),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(2048, 2048),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(2048, 1024),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(1024, 512),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(512, 256),
#             nn.ReLU(),
#             nn.Dropout(0.2),
#             nn.Linear(256, 128),
#             nn.ReLU(),
#             nn.Dropout(0.2),
            
#             nn.Linear(128, 3)
#         )

#     def forward(self, x):
#         x = self.conv_layers(x)
#         x = x.view(x.size(0), -1)
#         x = self.fc_layers(x)
#         return x


# class CNN(nn.Module):
#     def __init__(self):
#         super(CNN, self).__init__()
#         self.conv_layers = nn.Sequential(
#             nn.Conv2d(3, 16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(16, 32, kernel_size=3, padding=1),
#             nn.LeakyReLU(0.1),
#             nn.Conv2d(32, 64, kernel_size=3, padding=1),
#             nn.Tanh(),
#             nn.Conv2d(64, 128, kernel_size=3, padding=1),
#             nn.Sigmoid(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#             nn.Conv2d(128, 256, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.Conv2d(256, 512, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool2d(kernel_size=2, stride=2),
#         )
#         self.fc_layers = nn.Sequential(
#             nn.Linear(512 * 28 * 28, 512),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(512, 256),
#             nn.LeakyReLU(0.1),
#             nn.Dropout(0.5),
#             nn.Linear(256, 128),
#             nn.Tanh(),
#             nn.Dropout(0.5),
#             nn.Linear(128, 3)
#         )

#     def forward(self, x):
#         x = self.conv_layers(x)
#         x = x.view(x.size(0), -1)
#         x = self.fc_layers(x)
#         return x



class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 14 * 14, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 3)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

    
model = CNN()

In [22]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [23]:
patience = 7
best_val_loss = float('inf')
counter = 0

num_epochs = num_iters / (len(train_dataset) / batch_size)
num_epochs = int(num_epochs)

In [24]:
print(num_epochs)

26


In [25]:
model.to(device)
model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader):.4f}, '
          f'Train Accuracy: {100. * correct / total:.2f}%')

    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()

    print(f'Validation Loss: {val_loss / len(val_loader):.4f}, '
          f'Validation Accuracy: {100. * val_correct / val_total:.2f}%')
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        torch.save(model.state_dict(), 'best_model.pt')
        print("Best Model Save")
        model.train()
        
    else:
        counter += 1
        if counter >= patience:
            print(f'Early stopping at epoch {epoch + 1}.')
            break

Epoch 1/26, Loss: 0.6207, Train Accuracy: 72.72%
Validation Loss: 0.4189, Validation Accuracy: 83.04%
Best Model Save
Epoch 2/26, Loss: 0.3050, Train Accuracy: 88.25%
Validation Loss: 0.2627, Validation Accuracy: 89.80%
Best Model Save
Epoch 3/26, Loss: 0.2253, Train Accuracy: 91.25%
Validation Loss: 0.2244, Validation Accuracy: 92.22%
Best Model Save
Epoch 4/26, Loss: 0.1996, Train Accuracy: 92.95%
Validation Loss: 0.1905, Validation Accuracy: 92.98%
Best Model Save
Epoch 5/26, Loss: 0.1626, Train Accuracy: 94.01%
Validation Loss: 0.1652, Validation Accuracy: 94.77%
Best Model Save
Epoch 6/26, Loss: 0.1451, Train Accuracy: 94.51%
Validation Loss: 0.1474, Validation Accuracy: 95.03%
Best Model Save
Epoch 7/26, Loss: 0.1243, Train Accuracy: 95.55%
Validation Loss: 0.1567, Validation Accuracy: 94.52%
Epoch 8/26, Loss: 0.1024, Train Accuracy: 96.17%
Validation Loss: 0.1412, Validation Accuracy: 95.54%
Best Model Save
Epoch 9/26, Loss: 0.1055, Train Accuracy: 96.58%
Validation Loss: 0.1396

In [26]:
model = CNN().to(device)
model.load_state_dict(torch.load('best_model.pt'))
print("Best Model Loaded")
model.eval()
test_correct = 0
test_total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        _, predicted = outputs.max(1)
        test_total += labels.size(0)
        test_correct += predicted.eq(labels).sum().item()

test_accuracy = 100. * test_correct / test_total
print(f'Test Accuracy: {test_accuracy:.2f}%')

Best Model Loaded
Test Accuracy: 96.94%
