In [1]:
import cv2 as cv
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
from tqdm import tqdm
from torch import nn
import torch.nn.functional as F
import torch.nn.init as init
from torchvision.transforms import v2
import winsound

In [2]:
from Functions import one_hot, accuracy, randint_distinct, augmant_data
from models import BaseCNNClassifier, ResNet18, ResNet34

In [None]:
# Set the seed for reproducibility
torch.random.manual_seed(42)

In [None]:
# Set the device to CUDA if available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
# Define a composition of transformations for data augmentation
transforms = v2.Compose([
    # Convert image to PIL Image format
    v2.ToPILImage(),
    # Randomly flip the image horizontally with 50% probability
    v2.RandomHorizontalFlip(0.5),
    # Randomly rotate the image by up to 45 degrees
    v2.RandomRotation(45),
    # Convert image to tensor format
    v2.ToTensor(),
])

In [None]:
# Define constants for file names and image size
files_names = ['benign', 'malignant', 'normal']
image_sz = 224

# Initialize empty tensors for X and y
X = torch.tensor([], dtype=torch.float32)
y = torch.tensor([], dtype=torch.int16)

def collect_training_data(X, y):
    """
    Collect training data from files in the current directory.

    Args:
        X (torch.tensor): Empty tensor to store image data
        y (torch.tensor): Empty tensor to store class labels

    Returns:
        X (torch.tensor): Tensor with image data
        y (torch.tensor): Tensor with class labels
    """
    for file in files_names:
        # Construct path to file and get class number
        path = os.path.join(os.getcwd().replace('\\', '/'), file)
        class_num = files_names.index(file)
        print(f"Class: {file} | Class value: {class_num}")

        # Iterate over images in the file
        for img in tqdm(os.listdir(path)):
            # Skip images with "mask" in the name
            if "mask" not in img:
                # Read image and resize to image_sz
                img_array = cv.imread(os.path.join(path, img), cv.IMREAD_GRAYSCALE)
                new_array = cv.resize(img_array, (image_sz, image_sz))

                # Append image data to X and class label to y
                X = torch.cat((X, torch.tensor(new_array, dtype=torch.float32).unsqueeze(0)), dim=0)
                y = torch.cat((y, torch.tensor([class_num], dtype=torch.int16).unsqueeze(0)), dim=0)

    # Normalize image data and add channel dimension
    X = X / 256
    X = X.unsqueeze(1)

    return X, y

# Call the function to collect training data
X, y = collect_training_data(X, y)

In [7]:
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_test, y_train, y_test = X_train.to(device), X_test.to(device), y_train.to(device), y_test.to(device)

In [None]:
# Augment the data
X_temp, y_temp = augmant_data(X_train, y_train, transforms, 3, True)
X_train, y_train = torch.cat((X_temp, X_train), dim=0), torch.cat((y_temp, y_train), dim=0)
del X_temp, y_temp

In [9]:
# Initialize CNN model
model_0 = BaseCNNClassifier().to(device)

In [10]:
# Define Hyperparameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 5000

# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer_0 = torch.optim.AdamW(model_0.parameters(), lr=learning_rate)

In [None]:
# Define the training loop
for epoch in range(num_epochs):
    # Get random batches of data and one hot encode the labels
    batch_idxs = randint_distinct(0, X_train.size(0), batch_size)
    X_batch, y_batch = X_train[batch_idxs], y_train[batch_idxs]
    y_batch = one_hot(y_batch).to(device)

    # Forward pass, compute loss, compute accuracy
    y_pred = model_0(X_batch)
    loss = loss_fn(y_pred, y_batch)
    train_acc = accuracy(y_pred, y_batch)

    # Compute backpropagation and optimization
    optimizer_0.zero_grad()
    loss.backward()
    optimizer_0.step()

    # Evaluate the model on a test set every 10 epochs
    if (epoch+1) % 10 == 0:
        model_0.eval()
        test_loss = 0
        test_acc = 0
        with torch.inference_mode():
            for i in range(6):
                # Get random batches of data and one hot encode the labels
                X_batch, y_batch = X_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)], y_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)]
                y_batch = one_hot(y_batch).to(device)

                # Forward pass, compute loss, compute accuracy
                test_logits = model_0(X_batch)
                test_loss += loss_fn(test_logits, y_batch).item()
                test_acc += accuracy(test_logits, y_batch).item()
        test_loss /= 6
        test_acc /= 6
        model_0.train()

        # Print the loss and accuracy
        print(f"Epoch: {epoch+1} | Train loss: {loss.item():.4f} | Train Acc: {train_acc.item():.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}\n")

        # Save the model if the test accuracy is higher than 0.8 and higher than the previous best accuracy
        if test_acc > 0.8 and test_acc > model_0.best_acc:
            # Get current model accuracy and index
            model_0.best_acc = test_acc
            idxs = [int(file.split("Idx")[1][:-4]) for file in os.listdir(os.getcwd().replace('\\', '/') + "/saves") if "Idx" in file]
            idx = max(idxs) + 1 if idxs else 0
            
            # Save the model
            torch.save(model_0.state_dict(), os.getcwd().replace('\\', '/') + f"/saves/BaseCNNAcc{test_acc:.5f}Idx{idx}.pth")
            print(f"Model saved successfully with accuracy: {test_acc:.4f}")

# Print the best accuracy
print(f"Best Accuracy: {model_0.best_acc:.4f}")

In [12]:
# Initialize ResNet18 model
model_1 = ResNet18().to(device)

In [13]:
# Define Hyperparameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 5000

# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model_1.parameters(), lr=learning_rate)

In [None]:
# Define the training loop
for epoch in range(num_epochs):
    # Get random batches of data and one hot encode the labels
    batch_idxs = randint_distinct(0, X_train.size(0), batch_size)
    X_batch, y_batch = X_train[batch_idxs], y_train[batch_idxs]
    y_batch = one_hot(y_batch).to(device)

    # Forward pass, compute loss, compute accuracy
    y_pred = model_1(X_batch)
    loss = loss_fn(y_pred, y_batch)
    train_acc = accuracy(y_pred, y_batch)

    # Compute backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Evaluate the model on a test set every 10 epochs
    if (epoch+1) % 10 == 0:
        model_1.eval()
        test_loss = 0
        test_acc = 0
        with torch.inference_mode():
            for i in range(6):
                # Get random batches of data and one hot encode the labels
                X_batch, y_batch = X_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)], y_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)]
                y_batch = one_hot(y_batch).to(device)

                # Forward pass, compute loss, compute accuracy
                test_logits = model_1(X_batch)
                test_loss += loss_fn(test_logits, y_batch).item()
                test_acc += accuracy(test_logits, y_batch).item()
        test_loss /= 6
        test_acc /= 6
        model_1.train()

        # Print the loss and accuracy
        print(f"Epoch: {epoch+1} | Train loss: {loss.item():.4f} | Train Acc: {train_acc.item():.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}\n")

        # Save the model if the test accuracy is higher than 0.8 and higher than the previous best accuracy
        if test_acc > 0.8 and test_acc > model_1.best_acc:
            # Get current model accuracy and index
            model_1.best_acc = test_acc
            idxs = [int(file.split("Idx")[1][:-4]) for file in os.listdir(os.getcwd().replace('\\', '/') + "/saves") if "Idx" in file]
            idx = max(idxs) + 1 if idxs else 0
            
            # Save the model
            torch.save(model_1.state_dict(), os.getcwd().replace('\\', '/') + f"/saves/ResNet18Acc{test_acc:.5f}Idx{idx}.pth")
            print(f"Model saved successfully with accuracy: {test_acc:.4f}")

# Print the best accuracy
print(f"Best accuracy: {model_1.best_acc:.4f}")

In [41]:
# Initialize ResNet34 model
model_2 = ResNet34().to(device)

In [42]:
# Define Hyperparameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 5000

# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model_2.parameters(), lr=learning_rate)

In [None]:
# Define the training loop
for epoch in range(num_epochs):
    # Get random batches of data and one hot encode the labels
    batch_idxs = randint_distinct(0, X_train.size(0), batch_size)
    X_batch, y_batch = X_train[batch_idxs], y_train[batch_idxs]
    y_batch = one_hot(y_batch).to(device)

    # Forward pass, compute loss, compute accuracy
    y_pred = model_2(X_batch)
    loss = loss_fn(y_pred, y_batch)
    train_acc = accuracy(y_pred, y_batch)

    # Compute backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Evaluate the model on a test set every 10 epochs
    if (epoch+1) % 10 == 0:
        model_2.eval()
        test_loss = 0
        test_acc = 0
        with torch.inference_mode():
            for i in range(6):
                # Get random batches of data and one hot encode the labels
                X_batch, y_batch = X_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)], y_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)]
                y_batch = one_hot(y_batch).to(device)

                # Forward pass, compute loss, compute accuracy
                test_logits = model_2(X_batch)
                test_loss += loss_fn(test_logits, y_batch).item()
                test_acc += accuracy(test_logits, y_batch).item()
        test_loss /= 6
        test_acc /= 6
        model_2.train()

        # Print the loss and accuracy
        print(f"Epoch: {epoch+1} | Train loss: {loss.item():.4f} | Train Acc: {train_acc.item():.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}\n")

        # Save the model if the test accuracy is higher than 0.8 and higher than the previous best accuracy
        if test_acc > 0.8 and test_acc > model_2.best_acc:
            # Get current model accuracy and index
            model_2.best_acc = test_acc
            idxs = [int(file.split("Idx")[1][:-4]) for file in os.listdir(os.getcwd().replace('\\', '/') + "/saves") if "Idx" in file]
            idx = max(idxs) + 1 if idxs else 0

            # Save the model
            torch.save(model_2.state_dict(), os.getcwd().replace('\\', '/') + f"/saves/ResNet34Acc{test_acc:.5f}Idx{idx}.pth")
            print(f"Model saved successfully with accuracy: {test_acc:.4f}")

# Print the best accuracy
print(f"Best accuracy: {model_2.best_acc:.4f}")

In [7]:
# Import torchvision models for pre-trained models
from torchvision import models

In [None]:
# Initialize pytorch's pretrained ResNet34 model
model_3 = models.resnet34(pretrained=True)

In [None]:
# first conv layer takes RGB input and not GRAYSCALE as we have, so we will adjust the data by duplicating the channels.
model_3.conv1 

Recreating the dataset as a too big augmented dataset proved to be less effective.

In [10]:
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_test, y_train, y_test = X_train.to(device), X_test.to(device), y_train.to(device), y_test.to(device)

In [None]:
# Augment the data
X_temp, y_temp = augmant_data(X_train, y_train, transforms, 2, True)
X_train, y_train = torch.cat((X_temp, X_train), dim=0), torch.cat((y_temp, y_train), dim=0)
del X_temp, y_temp

In [None]:
# Reshape the data to (3, 224, 224) as pretrained model expects this shape
X_train, X_test = X_train.repeat(1, 3, 1, 1), X_test.repeat(1, 3, 1, 1)
X_train.shape, X_test.shape

In [50]:
# Change the output layer from 1000 to 3 classes and add dropout
model_3.fc = nn.Sequential(
    nn.Dropout(0.4),
    nn.Linear(in_features=512, out_features=3))

In [51]:
model_3 = model_3.to(device) # Moving the model to GPU
model_3.best_acc = 0 # Initialize the best accuracy to 0
model_3.device = device

In [52]:
# Define Hyperparameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 1500

# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_3.parameters(), lr=learning_rate)

In [None]:
# Define the training loop
for epoch in range(num_epochs):
    # Get random batches of data and one hot encode the labels
    batch_idxs = randint_distinct(0, X_train.size(0), batch_size)
    X_batch, y_batch = X_train[batch_idxs], y_train[batch_idxs]
    y_batch = one_hot(y_batch).to(device)

    # Forward pass, compute loss, compute accuracy
    y_pred = model_3(X_batch)
    loss = loss_fn(y_pred, y_batch)
    train_acc = accuracy(y_pred, y_batch)

    # Compute backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Evaluate the model on a test set every 10 epochs
    if (epoch+1) % 10 == 0:
        model_3.eval()
        test_loss = 0
        test_acc = 0
        with torch.inference_mode():
            for i in range(6):
                # Get random batches of data and one hot encode the labels
                X_batch, y_batch = X_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)], y_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)]
                y_batch = one_hot(y_batch).to(device)

                # Forward pass, compute loss, compute accuracy
                test_logits = model_3(X_batch)
                test_loss += loss_fn(test_logits, y_batch).item()
                test_acc += accuracy(test_logits, y_batch).item()
        test_loss /= 6
        test_acc /= 6
        model_3.train()

        # Print the loss and accuracy
        print(f"Epoch: {epoch+1} | Train loss: {loss.item():.4f} | Train Acc: {train_acc.item():.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}\n")

        # Save the model if the test accuracy is higher than 0.8 and higher than the previous best accuracy
        if test_acc > 0.9 and test_acc > model_3.best_acc:
            # Get current model accuracy and index
            model_3.best_acc = test_acc
            idxs = [int(file.split("Idx")[1][:-4]) for file in os.listdir(os.getcwd().replace('\\', '/') + "/saves") if "Idx" in file]
            idx = max(idxs) + 1 if idxs else 0

            # Save the model
            torch.save(model_3.state_dict(), os.getcwd().replace('\\', '/') + f"/saves/ResNet34Acc{test_acc:.5f}Idx{idx}.pth")
            print(f"Model saved successfully with accuracy: {test_acc:.4f}")

# Print the best accuracy
print(f"Best accuracy: {model_3.best_acc:.4f}")

In [None]:
# Initialize pytorch's pretrained ResNet50 model
model_4 = models.resnet50(pretrained=True)

In [14]:
# Change the output layer from 1000 to 3 classes and add dropout
model_4.fc = nn.Sequential(
    nn.Dropout(0.6),
    nn.Linear(in_features=2048, out_features=3))

In [15]:
model_4 = model_4.to(device) # Moving the model to GPU
model_4.best_acc = 0 # Initialize the best accuracy to 0
model_4.device = device

In [16]:
# Define Hyperparameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 1500

# Define the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_4.parameters(), lr=learning_rate)

In [18]:
# Define the training loop
for epoch in range(num_epochs):
    # Get random batches of data and one hot encode the labels
    batch_idxs = randint_distinct(0, X_train.size(0), batch_size)
    X_batch, y_batch = X_train[batch_idxs], y_train[batch_idxs]
    y_batch = one_hot(y_batch).to(device)

    # Forward pass, compute loss, compute accuracy
    y_pred = model_4(X_batch)
    loss = loss_fn(y_pred, y_batch)
    train_acc = accuracy(y_pred, y_batch)

    # Compute backpropagation and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Evaluate the model on a test set every 10 epochs
    if (epoch+1) % 10 == 0:
        model_4.eval()
        test_loss = 0
        test_acc = 0
        with torch.inference_mode():
            for i in range(6):
                # Get random batches of data and one hot encode the labels
                X_batch, y_batch = X_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)], y_test[X_test.size(0) // 6 * i: X_test.size(0) // 6 * (i + 1)]
                y_batch = one_hot(y_batch).to(device)

                # Forward pass, compute loss, compute accuracy
                test_logits = model_4(X_batch)
                test_loss += loss_fn(test_logits, y_batch).item()
                test_acc += accuracy(test_logits, y_batch).item()
        test_loss /= 6
        test_acc /= 6
        model_4.train()

        # Print the loss and accuracy
        print(f"Epoch: {epoch+1} | Train loss: {loss.item():.4f} | Train Acc: {train_acc.item():.4f} | Test loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}\n")

        # Save the model if the test accuracy is higher than 0.8 and higher than the previous best accuracy
        if test_acc > 0.92 and test_acc > model_4.best_acc:
            # Get current model accuracy and index
            model_4.best_acc = test_acc
            idxs = [int(file.split("Idx")[1][:-4]) for file in os.listdir(os.getcwd().replace('\\', '/') + "/saves") if "Idx" in file]
            idx = max(idxs) + 1 if idxs else 0

            # Save the model
            torch.save(model_4.state_dict(), os.getcwd().replace('\\', '/') + f"/saves/ResNet50Acc{test_acc:.5f}Idx{idx}.pth")
            print(f"Model saved successfully with accuracy: {test_acc:.4f}")

# Print the best accuracy
print(f"Best accuracy: {model_4.best_acc:.4f}")

Epoch: 10 | Train loss: 0.0072 | Train Acc: 1.0000 | Test loss: 0.5062 | Test Acc: 0.8718



KeyboardInterrupt: 