In [None]:
import os
import hashlib
from collections import Counter
from copy import deepcopy

import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU instead.")

In [None]:
def file_hash(filepath):
    with open(filepath, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()

def check_duplicates(set1, set2):
    hashes = {}
    duplicates = []

    # Process all files in both sets and store their hashes
    for dataset_path in [set1, set2]:
        for root, _, files in os.walk(dataset_path):
            for filename in files:
                if filename.endswith('jpg'):  # Add other file types if needed
                    file_path = os.path.join(root, filename)
                    filehash = file_hash(file_path)
                    if filehash in hashes:
                        duplicates.append((hashes[filehash], file_path))
                    else:
                        hashes[filehash] = file_path
    return duplicates

# Check for duplicates
duplicates = check_duplicates('raw_data/Training', 'raw_data/Testing')
if duplicates:
    print("Duplicates found:", len(duplicates))
    # for dup in duplicates:
    #     print(f"Duplicate: {dup[0]} and {dup[1]}")
else:
    print("No duplicates found.")

In [None]:
label_map = {'notumor': 0, 'glioma': 1, 'meningioma': 2, 'pituitary': 3}
image_size = 150

def preprocess_data(image, image_size):
    image_np = np.array(image)
    image_np = cv2.bilateralFilter(image_np, 2, 50, 50)
    image_np = cv2.resize(image_np, (image_size, image_size))
    return Image.fromarray(image_np)

def load_unique_images(base_path):
    images = []
    labels = []
    hashes = set()
    
    for partition in ('Training', 'Testing'):
        for label in label_map.keys():
            path = os.path.join(base_path, partition, label)
            for file in tqdm(os.listdir(path)):
                file_path = os.path.join(path, file)

                img_hash = file_hash(file_path)
                if img_hash in hashes:
                    continue
                hashes.add(img_hash)

                # process image
                image = Image.open(file_path).convert('L')
                image = preprocess_data(image, image_size)
                images.append(image)
                labels.append(label_map[label])
    
    return (images, labels)

all_images, all_labels = load_unique_images('raw_data')

x_train, x_test, y_train, y_test = train_test_split(all_images, all_labels, test_size=0.2, random_state=69)

In [None]:
# process training sets (no data augment and data augment)
base_image_train = deepcopy(x_train)
rotate_image_train = deepcopy(x_train)

simple_transform = transforms.Compose([
    transforms.ToTensor() # already normalizes the image
])
base_image_train = [simple_transform(image) for image in base_image_train]

rotate_transform = transforms.Compose([
    transforms.RandomRotation(180), 
    transforms.ToTensor() 
])
rotate_image_train = [rotate_transform(image) for image in rotate_image_train]

# test on fixed rotated images
test_images = []
test_labels = []
test_rotate_info = []
rotation_degrees = [theta for theta in range(0, 360, 22.5)]
for image, label in zip(x_test, y_test):
    for angle in rotation_degrees:
        rotated_image = image.copy()
        rotated_image = rotated_image.rotate(angle)
        rotated_image = simple_transform(rotated_image) # converts to Tensor
        test_images.append(rotated_image)
        test_labels.append(label)
        test_rotate_info.append(angle)

# convert to TensorDatasets and DataLoaders
base_image_train = torch.stack(base_image_train)
rotate_image_train = torch.stack(rotate_image_train)
y_train = torch.tensor(y_train)

test_images = torch.stack(test_images)
test_labels = torch.tensor(test_labels)
test_rotate_info = torch.tensor(test_rotate_info)

base_train_loader = DataLoader(TensorDataset(base_image_train, y_train), batch_size=32,
                               shuffle=True,
                               pin_memory=True,
                               num_workers=3)
rotated_train_loader = DataLoader(TensorDataset(rotate_image_train, y_train), batch_size=32,
                               shuffle=True,
                               pin_memory=True,
                               num_workers=3)
test_loader = DataLoader(TensorDataset(test_images, test_labels, test_rotate_info), batch_size=32,
                         shuffle=False,
                         pin_memory=True,
                         num_workers=3)


In [None]:
class StandardCNN(nn.Module):
    def __init__(self):
        super(StandardCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * 75 * 75, 128)  # Adjust if different size or pooling
        self.fc2 = nn.Linear(128, 4)  # Output layer for multiclass classification

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(F.relu(self.conv3(x)), 2)  # Pooling to reduce spatial dimensions
        x = x.view(x.size(0), -1)  # Flatten the output
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # No activation, raw logits for CrossEntropyLoss
        return x


In [None]:
class RIC_CNN(nn.Module): # TODO
    def __init__(self):
        super(RIC_CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * 75 * 75, 128)  # Adjust if different size or pooling
        self.fc2 = nn.Linear(128, 4)  # Output layer for multiclass classification

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(F.relu(self.conv3(x)), 2)  # Pooling to reduce spatial dimensions
        x = x.view(x.size(0), -1)  # Flatten the output
        x = F.relu(self.fc1(x))
        x = self.fc2(x)  # No activation, raw logits for CrossEntropyLoss
        return x


In [None]:
def train_multiclass_model(model, optimizer, criterion, training_loader, evaluate_loader, num_epochs):
    model.train()  # Set the model to training mode
    
    for epoch in range(num_epochs):
        for images, labels in tqdm(training_loader):
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')
        evaluate_multiclass_model(model, evaluate_loader)

def evaluate_multiclass_model(model, evaluate_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in evaluate_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy}%\n')


In [None]:
print("Training Standard, No Augment")
no_augment_model = StandardCNN().to(device)
no_augment_criterion = nn.CrossEntropyLoss()  # Suitable for multiclass
no_augment_optimizer = torch.optim.Adam(no_augment_model.parameters(), lr=0.001)
train_multiclass_model(no_augment_model, no_augment_optimizer, no_augment_criterion, 10)

In [None]:
print("Training Standard, No Augment")
augment_model = StandardCNN().to(device)
augment_criterion = nn.CrossEntropyLoss()  # Suitable for multiclass
augment_optimizer = torch.optim.Adam(augment_model.parameters(), lr=0.001)
train_multiclass_model(augment_model, augment_optimizer, augment_criterion, 10)

In [None]:
print("Training RIC-CNN, No Augment")
riccnn_model = RIC_CNN().to(device)
riccnn_criterion = nn.CrossEntropyLoss()  # Suitable for multiclass
riccnn_optimizer = torch.optim.Adam(riccnn_model.parameters(), lr=0.001)
train_multiclass_model(riccnn_model, riccnn_optimizer, riccnn_criterion, 10)