In [3]:
from PIL import Image, ImageFilter
import tensorflow as tf
from tensorflow.keras import datasets, layers, models, optimizers
import matplotlib.pyplot as plt
from pathlib import Path
import random
import os
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim

In [4]:
base_path = "data/100"

# Get tumour file paths and shuffle
tumour_files = []
tumour_dirs = [
    "Invasive_Tumor",
    "Prolif_Invasive_Tumor",
    "T_Cell_and_Tumor_Hybrid"
]

for dir_name in tumour_dirs:
    dir_path = os.path.join(base_path, dir_name)
    if os.path.isdir(dir_path):
        files = [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
        tumour_files.extend(files)

random.shuffle(tumour_files)

# Get immune file paths and shuffle
immune_files = []
immune_dirs = [
    "CD4+_T_Cells", "CD4+_T_Cells", 
    "CD8+_T_Cells", 
    "B_Cells", 
    "Mast_Cells", 
    "Macrophages_1", 
    "Macrophages_2", 
    "LAMP3+_DCs",
    "IRF7+_DCs"
]

for dir_name in immune_dirs:
    dir_path = os.path.join(base_path, dir_name)
    if os.path.isdir(dir_path):
        files = [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
        immune_files.extend(files)

random.shuffle(immune_files)


# Get stromal file paths and shuffle
stromal_files = []
stromal_dirs = [
    "Stromal", 
    "Stromal_and_T_Cell_Hybrid", 
    "Perivascular-Like"
]

for dir_name in stromal_dirs:
    dir_path = os.path.join(base_path, dir_name)
    if os.path.isdir(dir_path):
        files = [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
        stromal_files.extend(files)

random.shuffle(stromal_files)

# Get other file paths and shuffle
other_files = []
other_dirs = [
    "Endothelial",
    "Myoepi_ACTA2+", 
    "Myoepi_KRT15+", 
    "DCIS_1", 
    "DCIS_2", 
    "Unlabeled"
]

for dir_name in stromal_dirs:
    dir_path = os.path.join(base_path, dir_name)
    if os.path.isdir(dir_path):
        files = [os.path.join(dir_path, f) for f in os.listdir(dir_path)]
        other_files.extend(files)

random.shuffle(other_files)


In [5]:
def load_resize(img_path, size=(224,224)):
    img = Image.open(img_path).convert('RGB')
    img = img.resize(size)
    return np.array(img)

In [None]:
tumour_imgs = [load_resize(f) for f in tumour_files]
print("tumour loaded")

immune_imgs = [load_resize(f) for f in immune_files]
print("immune loaded")

stromal_imgs = [load_resize(f) for f in stromal_files]
print("stromal loaded")

other_imgs = [load_resize(f) for f in other_files]
print("other loaded")

# Train using 80% of data from each group
'''
tumour_train_ind = int(0.8 * len(tumour_imgs))
tumour_test_ind = int(0.2 * len(tumour_imgs))

immune_train_ind = int(0.8 * len(immune_imgs))
immune_test_ind = int(0.2 * len(immune_imgs))

stromal_train_ind = int(0.8 * len(stromal_imgs))
stromal_test_ind = int(0.2 * len(stromal_imgs))

other_train_ind = int(0.8 * len(other_imgs))
other_test_ind = int(0.2 * len(other_imgs))
'''

tumour_train_ind = 2000
tumour_test_ind = 500

immune_train_ind = 2000
immune_test_ind = 500

stromal_train_ind = 2000
stromal_test_ind = 500

other_train_ind = 2000
other_test_ind = 500

imgs_train = immune_imgs[:immune_train_ind] + tumour_imgs[:tumour_train_ind] + stromal_imgs[:stromal_train_ind] + other_imgs[:other_train_ind]
imgs_test = immune_imgs[immune_train_ind:] + tumour_imgs[tumour_train_ind:] + stromal_imgs[stromal_train_ind:] + other_imgs[other_train_ind:]

Xmat_train = np.stack(imgs_train, axis=0)
Xmat_test = np.stack(imgs_test, axis=0)

y_train = ['Immune'] * immune_train_ind + ['Tumour'] * tumour_train_ind + ['Stromal'] * stromal_train_ind + ['Other'] * other_train_ind
y_test = ['Immune'] * immune_test_ind + ['Tumour'] * tumour_test_ind + ['Stromal'] * stromal_test_ind + ['Other'] * other_test_ind

In [None]:

le = LabelEncoder()
y_train_enc = le.fit_transform(y_train)
y_test_enc = le.transform(y_test)

In [None]:
class NumpyImageDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.fromarray((image * 255).astype('uint8'))  # Convert to PIL Image

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        return image, label

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

train_dataset = NumpyImageDataset(Xmat_train, y_train_enc, transform=transform)
test_dataset = NumpyImageDataset(Xmat_test, y_test_enc, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(pretrained=True)

# Freeze feature extractor (optional)
for param in model.parameters():
    param.requires_grad = False

# Replace final layer for 4 classes
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 4)

model = model.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}")