REMBRANDT Preprocessing

In [None]:
import os
import random
import torch
import pydicom
from PIL import Image
from torchvision import transforms

#Resizes, normalizes, and converts images to tensors.
TransformImagesREM = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
#Finds all DICOM files from REMBRANDT filepath, setting file count limits and shuffling as well.
def Collect_Dicoms(path, max_files=None, random_shuffle=True):
    DCMFiles = [
        os.path.join(root, file)
        for root, _, files in os.walk(path)
        for file in files if file.endswith('.dcm')]
    if random_shuffle:
        random.shuffle(DCMFiles)
    return DCMFiles[:max_files] if max_files else DCMFiles

#Converts DICOM files to tensors and sets patient IDs.
def Convert_Dicoms(filepath):
    DCM = pydicom.dcmread(filepath)
    ConvertDCM = Image.fromarray(DCM.pixel_array).convert("RGB")
    TensorImage = TransformImagesREM(ConvertDCM)
    PathID = str(DCM.PatientID).strip()
    return TensorImage, PathID

#Sets batch sizes and the number of batches to download at a time, then process batches while also accounting for processing errors.
def Process_Batches(path, output_path, batch_size=1000, max_batches=10, random_shuffle=True):
    os.makedirs(output_path, exist_ok=True)
    BatchLimit = batch_size * max_batches if max_batches else None
    DicomPaths = Collect_Dicoms(path, max_files=BatchLimit, random_shuffle=random_shuffle)
    Total, TotalErrors = 0, 0
#Append batches to FinalTensors and FinalIDs, converting each image in the batch and checking for errors.
    for BatchIndex, i in enumerate(range(0, len(DicomPaths), batch_size), start=1):
        BatchPaths = DicomPaths[i:i+batch_size]
        FinalTensors = []
        FinalIDs = []
        print(f"Processing batch {BatchIndex} ({len(BatchPaths)} files)")
        for path in BatchPaths:
            try:
                TensorImage, PathID = Convert_Dicoms(path)
                FinalTensors.append(TensorImage)
                FinalIDs.append(PathID)
            except Exception as e:
                TotalErrors += 1
                with open("error_log.txt", "a", encoding="utf-8") as w:
                    w.write(f"{path} - {str(e)}\n")
#Save the batch tensors to the matching patient IDs.
        if FinalTensors:
            torch.save(torch.stack(FinalTensors), os.path.join(output_path, f"batch_{BatchIndex}.pt"))
            torch.save(FinalIDs, os.path.join(output_path, f"batch_{BatchIndex}_ids.pt"))
            Total += len(FinalTensors)
    print(f"Done. Saved {Total} tensors in {BatchIndex} batches. Total Errors: {TotalErrors}")
    
#Provide dataset paths and run preprocessing.
REMDirectory = r"C:\Users\hgood\OneDrive\REMBRANDT"
ProcessedTensorDir = r"C:\Users\hgood\OneDrive\Processed_REMBRANDT"
Process_Batches(REMDirectory, ProcessedTensorDir, batch_size=1000)

Processing batch 1 (1000 files)
Processing batch 2 (1000 files)
Processing batch 3 (1000 files)
Processing batch 4 (1000 files)
Processing batch 5 (1000 files)
Processing batch 6 (1000 files)
Processing batch 7 (1000 files)
Processing batch 8 (1000 files)
Processing batch 9 (1000 files)
Processing batch 10 (1000 files)
Done. Saved 10000 tensors in 10 batches. Total Errors: 0


Create Custom Data Class

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models
import pandas as pd
import os

#Creates a custom dataset to load preprocessed Rembrandt images with the proper labels.
class REMDataset(Dataset):
    def __init__(self, batch_directory, label_file, transform=None):
        self.transform = transform
        self.all_data = []
        LabelData = pd.read_csv(label_file)
# Filter out rows where 'Grade' is Na
        LabelData = LabelData.dropna(subset=['Grade'])
#Convert grades to integers
        LabelData['Grade'] = LabelData['Grade'].astype(int)
# Build label map that maps SampleIDs to Grade
        self.label_map = {
            str(row['Sample']).strip(): row['Grade']
            for _, row in LabelData.iterrows()
        }
# Iterate through batch files and their ID lists
        for file in os.listdir(batch_directory):
            if file.endswith('.pt') and 'ids' not in file:
                TensorPath = os.path.join(batch_directory, file)
                IDPath = TensorPath.replace(".pt", "_ids.pt")
                if not os.path.exists(IDPath):
                    continue
                FinalImages = torch.load(TensorPath)  
                IDFinal = torch.load(IDPath)       
#Pair tensor images with the correct grade labels, only including the image in dataset if a label is found.
                for tensor, id in zip(FinalImages, IDFinal):
                    IDString = str(id).strip()
                    Label = self.label_map.get(IDString, -1)  # -1 if missing
                    if Label != -1:
                        self.all_data.append((tensor, Label))
#Return correct image/label pairs.
    def __len__(self):
        return len(self.all_data)
    
    def __getitem__(self, index):
        imgREM, Label = self.all_data[index]
        return imgREM, Label

Build ResNet Model

In [None]:
import torchvision.models as models
import torch.nn as nn
import torchvision.models.resnet as resnet

#Build ResNet-18 model.
def build_resnet(num_classes=3):
    Weights = resnet.ResNet18_Weights.DEFAULT
    CNNModel = models.resnet18(weights = Weights)

# Replace final fully connected layer to match specified number of classes.
    CNNModel.fc = nn.Linear(CNNModel.fc.in_features, num_classes)
    return CNNModel

CNNModel = build_resnet(num_classes=3)

Train Model

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, random_split
from sklearn.utils.class_weight import compute_class_weight

#Set hyperparameters
ImageDir = r"C:\Users\hgood\OneDrive\Processed_REMBRANDT"
CSVAcess = r"C:\Users\hgood\Downloads\clinical_cleaned_v2.csv"
BatchSize = 32
EpochCount = 10
LearningRate = 0.001
Classes = 3
BestModelPathCNN = 'best_model.pth'

# Set dataset and class weights 
datasetREM = REMDataset(ImageDir, CSVAcess, transform=TransformImagesREM)
REMlabels = [int(label) for _, label in datasetREM]
Weights = compute_class_weight(class_weight='balanced', classes=np.unique(REMlabels), y=REMlabels)
TensorWeights = torch.tensor(Weights, dtype=torch.float)

# Split dataset into train/validation/testing
TrainSplit = int(0.8 * len(datasetREM))
ValSplit = int(0.1 * len(datasetREM))
TestSplit = len(datasetREM) - TrainSplit - ValSplit
TrainData, ValData, TestData = random_split(datasetREM, [TrainSplit, ValSplit, TestSplit])
#Create data loaders three different ways.
TrainingLoader = DataLoader(TrainData, batch_size=BatchSize, shuffle=True)
ValidationLoader = DataLoader(ValData, batch_size=BatchSize, shuffle=False)
TestingLoader = DataLoader(TestData, batch_size=BatchSize, shuffle=False)

# Create model, loss, and optimizer, utilizing an Adam optimizer and learning rate scheduler as well.
TorchDevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
CNNModel = build_resnet(num_classes=Classes).to(TorchDevice)
EntropyREM = nn.CrossEntropyLoss(weight=TensorWeights.to(TorchDevice))
OptimizerREM = optim.Adam(CNNModel.parameters(), lr=LearningRate)
SchedulerREM = optim.lr_scheduler.StepLR(OptimizerREM, step_size=5, gamma=0.1)

# Resume training from checkpoint unless we start training from scratch
start_epoch = 0
best_val_accuracy = 0.0
if os.path.exists(BestModelPathCNN):
    checkpoint = torch.load(BestModelPathCNN)
    CNNModel.load_state_dict(checkpoint['model_state_dict'])
    OptimizerREM.load_state_dict(checkpoint['optimizer_state_dict'])
    start_epoch = checkpoint['epoch'] + 1
    best_val_accuracy = checkpoint['val_accuracy']
    print(f"Resuming training at epoch {start_epoch} with Validation Accuracy: {best_val_accuracy:.2f}%")
else:
    print("Starting training from scratch")

TotalEpochs = start_epoch + EpochCount

# Function to calculate accuracy and loss metrics.
def evaluate_accuracy_metrics(loader):
    CNNModel.eval()
    CorrectPredictions = 0
    TotalPredictions = 0
    TotalLoss = 0.0
    with torch.no_grad():
        for inputs, REMlabels in loader:
            inputs, REMlabels = inputs.to(TorchDevice), REMlabels.to(TorchDevice)
            outputs = CNNModel(inputs)
            loss = EntropyREM(outputs, REMlabels)
            TotalLoss += loss.item()
            _, predicted = torch.max(outputs, 1)
            CorrectPredictions += (predicted == REMlabels).sum().item()
            TotalPredictions += REMlabels.size(0)
    return TotalLoss / len(loader), 100 * CorrectPredictions/ TotalPredictions

# Create the training loop.
for epoch in range(start_epoch, TotalEpochs):
    CNNModel.train()
    TotalLoss = 0.0
    CorrectPredictions = 0
    TotalPredictions = 0
# Reset gradients, pass outputs forward, calculate total loss, and utilize optimizer.
    for inputs, REMlabels in TrainingLoader:
        inputs, REMlabels = inputs.to(TorchDevice), REMlabels.to(TorchDevice)
        OptimizerREM.zero_grad()
        outputs = CNNModel(inputs)
        loss = EntropyREM(outputs, REMlabels)
        loss.backward()
        OptimizerREM.step()
        TotalLoss += loss.item()
        _, predicted = torch.max(outputs, 1)
        CorrectPredictions += (predicted == REMlabels).sum().item()
        TotalPredictions += REMlabels.size(0)
#Evaluate the model on both validation and test sets.
    AvgTrainingLoss = TotalLoss/len(TrainingLoader)
    TrainingAccuracy = 100 * CorrectPredictions/TotalPredictions
    ValidationLoss, ValidationAccuracy = evaluate_accuracy_metrics(ValidationLoader)
    TestingLoss, TestingAccuracy = evaluate_accuracy_metrics(TestingLoader)

    print(f"Epoch [{epoch + 1}/{TotalEpochs}], Train Loss: {AvgTrainingLoss:.4f}, Train Accuracy: {TrainingAccuracy:.2f}%, "
          f"Val Loss: {ValidationLoss:.4f}, Val Accuracy: {ValidationAccuracy:.2f}%, "
          f"Test Loss: {TestingLoss:.4f}, Test Accuracy: {TestingAccuracy:.2f}%")

    SchedulerREM.step()
# Save the latest checkpoint.
    torch.save({
        'model_state_dict': CNNModel.state_dict(),
        'optimizer_state_dict': OptimizerREM.state_dict(),
        'epoch': epoch
    }, 'model_checkpoint.pth')

# Save best model if the validation accuracy improves.
    if ValidationAccuracy > best_val_accuracy:
        best_val_accuracy = ValidationAccuracy
        torch.save({
            'model_state_dict': CNNModel.state_dict(),
            'optimizer_state_dict': OptimizerREM.state_dict(),
            'epoch': epoch,
            'val_accuracy': ValidationAccuracy,
            'test_accuracy': TestingAccuracy
        }, BestModelPathCNN)
        print(f"Best model saved at epoch {epoch + 1} with Val Accuracy: {ValidationAccuracy:.2f}% and Test Accuracy: {TestingAccuracy:.2f}%")

Resuming training at epoch 24 with Validation Accuracy: 83.20%
Epoch [25/34], Train Loss: 0.4583, Train Accuracy: 83.51%, Val Loss: 0.4650, Val Accuracy: 83.70%, Test Loss: 0.4160, Test Accuracy: 85.40%
Best model saved at epoch 25 with Val Accuracy: 83.70% and Test Accuracy: 85.40%
Epoch [26/34], Train Loss: 0.4639, Train Accuracy: 83.44%, Val Loss: 0.4593, Val Accuracy: 84.10%, Test Loss: 0.4114, Test Accuracy: 86.30%
Best model saved at epoch 26 with Val Accuracy: 84.10% and Test Accuracy: 86.30%
Epoch [27/34], Train Loss: 0.4628, Train Accuracy: 83.51%, Val Loss: 0.4599, Val Accuracy: 83.80%, Test Loss: 0.4171, Test Accuracy: 85.70%
Epoch [28/34], Train Loss: 0.4585, Train Accuracy: 83.89%, Val Loss: 0.4515, Val Accuracy: 84.00%, Test Loss: 0.4128, Test Accuracy: 85.40%
Epoch [29/34], Train Loss: 0.4566, Train Accuracy: 83.91%, Val Loss: 0.4532, Val Accuracy: 83.90%, Test Loss: 0.4121, Test Accuracy: 85.70%
Epoch [30/34], Train Loss: 0.4442, Train Accuracy: 83.94%, Val Loss: 0.4525

Assign Grades to BTP Images

In [None]:
import os
import torch
import pydicom
import pandas as pd
from PIL import Image
from collections import Counter
from torchvision import transforms

# Set paths and reutilize previous transformations.
BTPDataset = r"C:\Brain-Tumor-Progression"
BTPGrades = r"C:\Users\hgood\OneDrive\Documents\Brain-Tumor-ProgressionCNNAssignments.csv"
BestModelPath = "best_model.pth"
transformREM = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
#Load ResNet model.
Device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BuildResnet = build_resnet(num_classes=3)
BuildResnet.load_state_dict(torch.load(BestModelPath, map_location=Device)['model_state_dict'])
BuildResnet.to(Device)
BuildResnet.eval()

#Function that predicts most common gades from each DICOM file in a folder.
def Folder_Grades(scan_folder):
    GradePrediction = []
    for root, _, files in os.walk(scan_folder):
        for file in files:
            if file.endswith('.dcm'):
                try:
# Convert DICOMs to RGB PIL images, add batch dimensions, get the class index and predict grades.
                    DCMBTP = pydicom.dcmread(os.path.join(root, file))
                    ConvertDCMBTP = Image.fromarray(DCMBTP.pixel_array).convert("RGB")
                    TensorImageBTP = transformREM(ConvertDCMBTP).unsqueeze(0).to(Device)
                    with torch.no_grad():
                        ArgMax = torch.argmax(BuildResnet(TensorImageBTP), dim=1).item()
                    GradePrediction.append(ArgMax)
                except Exception as e:
                    print(f"Error processing {file}: {e}")
    return Counter(GradePrediction).most_common(1)[0][0] if GradePrediction else None

#Collect predictions after iterating through the entire dataset.
FolderPredictions = []
for patientid in os.listdir(BTPDataset):
    Patient = os.path.join(BTPDataset, patientid)
    if not os.path.isdir(Patient): continue
    for folderdate in os.listdir(Patient):
        Dates = os.path.join(Patient, folderdate)
        if not os.path.isdir(Dates): continue
#Only process files if they are in T1post or FLAIR folders.
        for checkfolders in os.listdir(Dates):
            if not any(tag in checkfolders for tag in ["T1post", "FLAIR"]):
                continue
            Scans = os.path.join(Dates, checkfolders)
            FinalGrades = Folder_Grades(Scans)
            if FinalGrades is not None:
                FolderPredictions.append({
                    "PatientID": patientid,
                    "ScanDateFolder": folderdate,
                    "PredictedGrade": FinalGrades
                })
                break  
#Output predictions to a CSV file.
pd.DataFrame(FolderPredictions).to_csv(BTPGrades, index=False)
print(f"Predictions saved to {BTPGrades}")

Predictions saved to C:\Users\hgood\OneDrive\Documents\Brain-Tumor-ProgressionCNNAssignments.csv
