In [1]:
# Access to google drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
# Import libraries
import torchvision
import torch
from PIL import Image
from sklearn.metrics import confusion_matrix, accuracy_score
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, datasets
from sklearn.model_selection import train_test_split
import os
import pandas as pd
import matplotlib.pyplot as plt
import re
import shutil
import random
from datetime import datetime
from imblearn.over_sampling import RandomOverSampler
import seaborn as sns
from sklearn.metrics import classification_report
from collections import OrderedDict

In [3]:
# #Use Hannah's code to filter "Patches" folder to keep only relevant images (filter out non-patch images and patches that are too small)
# #Saves resulting patches into "filtered_patches" folder - ONLY NEED TO RUN ONCE

# #Define filtering function
# def filter(input_folder, output_base_folder):
#     # Make sure input folder exists
#     if not os.path.isdir(input_folder):
#         print(f"Error: {input_folder} is not a valid directory.")
#         return

#     # Create the output base folder if it doesn't exist
#     os.makedirs(output_base_folder, exist_ok=True)

#     # Iterate over each item in the input folder
#     for item in os.listdir(input_folder):
#         item_path = os.path.join(input_folder, item)
#         if "patched_" in item_path or item_path[-3:] == 'tif' or os.path.isdir(item_path):
#             pass
#         else:
#             size = os.stat(item_path).st_size
#             if size > 2000 and bool(re.search("patch\d{1,3}", item_path)):
#                 shutil.copy(item_path, output_base_folder)
#                 # print(f"Copied '{item}' to '{output_base_folder}'")

# # call function - modify folder paths if needed !!!
# if __name__ == "__main__":
#     input_folder = '/content/drive/My Drive/STAT 390/CMIL_SP2025_Patches_Apr16'
#     output_base_folder = '/content/drive/My Drive/STAT 390/filtered_patches_2'

#     filter(input_folder, output_base_folder)

In [4]:
# Add labels to each patch and match by their case, randomly split patients into training, validation, and testing sets

labels = pd.read_csv('/content/drive/My Drive/STAT 390/case_grade_match.csv') # Replace with the path to file

# Define a function to group patches by case number
def group_patches(patch_dir):
    case_patches = {}
    for filename in os.listdir(patch_dir):
        match = re.search(r"case_(\d+)", filename)  # Using regex for case number extraction
        if match:
            case_num = int(match.group(1))
            if case_num not in case_patches:
                case_patches[case_num] = []
            case_patches[case_num].append(os.path.join(patch_dir, filename))
    return case_patches

# Define a custom dataset class for loading PNG images -- in order to be used in Dataloader in the future
class PNGDataset(Dataset):
    def __init__(self, case_patches, labels_df, transform=None):
        self.case_patches = case_patches
        self.labels_df = labels_df
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Label benign images 0, and high-grade CMIL images 1
        for case_num, patches in case_patches.items():
            label = labels_df.loc[labels_df['Case'] == case_num, 'Class'].values[0]
            label = 0 if label == 1 else 1
            for patch_path in patches:
                self.image_paths.append(patch_path)
                self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        # Get patch images
        image = Image.open(image_path).convert('RGB')
        # Get the label information using the labels dataframe based on case number
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)
        return image, label

# Resize all patch images to 256x256 and convert data into tensors
# potentially consider image padding to preserve entire images
transform = transforms.Compose([
    #rescaling the image (also may lose fine details when resizing)
    transforms.Resize(256),
    #cropping the image from the center (potentially losing information, but achieving uniform image sizing)
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

image_folder_path = '/content/drive/My Drive/STAT 390/filtered_patches' # Replace with the path to filtered patches
patches = group_patches(image_folder_path)

# Get case numbers and their labels
case_nums = list(patches.keys())
dataset = labels.loc[[(int(x)-1) for x in case_nums]]

# Remove those that are equal to 2 (not classifying low-grade CMIL images currently)
noindex = dataset.Class != 2.0
X = dataset[noindex].Case
y = dataset[noindex].Class

# Ensure X and y have the same length before splitting
X = X.reset_index(drop=True)  # Reset index to avoid potential issues
y = y.reset_index(drop=True)  # Reset index to avoid potential issues

# Split the data into train, test, and validation sets
train_X, test_X, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=40)
train_X, val_X, y_train, y_val = train_test_split(train_X, y_train, test_size=0.2, stratify=y_train, random_state=40)

# Create the training, validation, and testing patches
train_patches = {case_num: patches[int(case_num)] for case_num in train_X}
val_patches = {case_num: patches[int(case_num)] for case_num in val_X}
test_patches = {case_num: patches[int(case_num)] for case_num in test_X}

# Create PNGDataset instances for train, validation, and test
train_dataset = PNGDataset(train_patches, labels, transform=transform)
val_dataset = PNGDataset(val_patches, labels, transform=transform)
test_dataset = PNGDataset(test_patches, labels, transform=transform)

In [5]:
labels_arr = np.array(train_dataset.labels)
n_high = np.sum(labels_arr == 1)
n_benign = np.sum(labels_arr == 0)

print(f"Total patches in training: {len(train_dataset)}")
print(f"High-grade patches: {n_high}")
print(f"Benign patches: {n_benign}")
print(f"Imbalance ratio (high:benign) = {n_high}:{n_benign} ≈ {n_high/n_benign:.2f}:1")

Total patches in training: 9070
High-grade patches: 6750
Benign patches: 2320
Imbalance ratio (high:benign) = 6750:2320 ≈ 2.91:1


In [6]:
# Create a mapping from case number to class label (1: benign, 3: high-grade)
case_label_map = dict(zip(labels['Case'], labels['Class']))

# Tally class counts by case
case_counts = {'Benign': 0, 'High-grade': 0}
for case_num in train_patches:
    label = case_label_map.get(case_num)
    if label == 1:
        case_counts['Benign'] += 1
    elif label == 3:
        case_counts['High-grade'] += 1

print(f"Number of training cases:")
print(f"  Benign: {case_counts['Benign']}")
print(f"  High-grade: {case_counts['High-grade']}")

Number of training cases:
  Benign: 17
  High-grade: 23


In [7]:
# Step 1: list cases with at least one patch in filtered_patches
filtered_patches_dir = '/content/drive/My Drive/STAT 390/filtered_patches'
filtered_patch_paths = os.listdir(filtered_patches_dir)
filtered_case_nums = set()

for p in filtered_patch_paths:
    match = re.search(r"case_(\d+)", p)
    if match:
        filtered_case_nums.add(int(match.group(1)))

# Step 2: compare against all cases in the CSV
all_cases = labels[labels['Class'].isin([1, 3])]['Case']
benign_cases = labels[labels['Class'] == 1]['Case']

dropped_benign = [c for c in benign_cases if c not in filtered_case_nums]

print(f"Dropped benign cases: {dropped_benign}")
print(f"Total dropped: {len(dropped_benign)}")

Dropped benign cases: []
Total dropped: 0


In [9]:
labels

Unnamed: 0,Case,Class
0,1,4.0
1,2,1.0
2,3,4.0
3,4,1.0
4,5,3.0
...,...,...
101,102,2.0
102,103,4.0
103,104,2.0
104,105,4.0


In [8]:
# Drop any rows where Class is NaN (just to be safe)
labels_clean = labels.dropna(subset=['Class'])

# Count how many cases are Class 1 and Class 3
class_counts = labels_clean['Class'].value_counts()

# Display the counts specifically for Class 1 and Class 3
num_class_1 = int(class_counts.get(1.0, 0))
num_class_3 = int(class_counts.get(3.0, 0))

print(f"Number of Class 1 cases (Benign): {num_class_1}")
print(f"Number of Class 3 cases (High-grade): {num_class_3}")


Number of Class 1 cases (Benign): 28
Number of Class 3 cases (High-grade): 37
