### Utils

In [29]:
from typing import Any
from torch.utils.data import Dataset
from PIL import Image
import os
import pandas as pd
import yaml
from torchvision import transforms

import torch
from sklearn.model_selection import StratifiedKFold

default_path = "/fhome/vlia/HelicoDataSet"
config_path = "config.yml"

def ensure_dataset_path_yaml() -> int:
	"""
	Ensures that config.yml contains the dataset path.
	:return: 0 if path is valid, 1 if path is not present, 2 if path is invalid
	"""
	# Check config.yml exists
	config = {}

	# Check if config.yml exists
	if not os.path.exists(config_path):
		config["dataset_path"] = default_path
		with open(config_path, "w") as file:
			yaml.safe_dump(config, file)
		return 1

	# Load existing config
	try:
		with open(config_path, "r") as file:
			config = yaml.safe_load(file) or {}
	except yaml.YAMLError as e:
		raise ValueError(f"Error parsing {config_path}: {e}")

	# Check if 'dataset_path' exists in config
	if "dataset_path" not in config:
		config["dataset_path"] = default_path
		with open(config_path, "w") as file:
			yaml.safe_dump(config, file)
		return 1

	# Check if the dataset path exists on the filesystem
	if not os.path.exists(config["dataset_path"]):
		return 2

	return 0

def listdir(path: str, filter: str = None, extension: str = None) -> list:
	"""
	Returns a list of directories in the given path.
	If a filter is provided, only directories that contain the filter in their name will be returned.

	:param path: Path to the directory
	:param filter: Filter to apply to the directories
	:return: List of directory names as strings
	"""
	directories = os.listdir(path)
	if filter is not None:
		directories = [directory for directory in directories if filter in directory]
	if extension is not None:
		directories = [directory for directory in directories if directory.endswith(extension)]
	return directories

def transform_image(image: Image, size: tuple) -> Image:
	transformations = transforms.Compose([
		transforms.Resize(size),
		transforms.ToTensor(),
		transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
	])
	return transformations(image)

class HelicoDatasetAnomalyDetection(Dataset):
	def __init__(self) -> None:
		super().__init__()
		# Initialize paths
		path_error = ensure_dataset_path_yaml()
		if path_error == 1:
			print(f"Dataset path not found in config.yml. Defaulting to {default_path}")
			if not os.path.exists(default_path):
				raise FileNotFoundError(f"Default path {default_path} does not exist. Specify a valid path in config.yml.")
		elif path_error == 2:
			current_path = yaml.safe_load(open("config.yml", "r"))["dataset_path"]
			raise FileNotFoundError(f"Dataset path {current_path} does not exist. Specify a valid path in config.yml.")
		
		self.dataset_path = yaml.safe_load(open("config.yml", "r"))["dataset_path"]
		self.csv_file_path = os.path.join(self.dataset_path, "PatientDiagnosis.csv")
		self.cropped_path = os.path.join(self.dataset_path, "CrossValidation", "Cropped")

		# Find all the negative diagnosis directories
		paths_negatives = self.get_negative_diagnosis_directories(self.csv_file_path)
		paths = [os.path.join(self.cropped_path, filename) for filename in listdir(self.cropped_path)]
		actual_paths = []
		for path_negative in paths_negatives:
			for path in paths:
				if path_negative == path[:-2]:
					actual_paths.append(path)
					break

		# Retrieve all the patches from the directories
		self.paths_patches = []
		for directory in actual_paths:
			patches_names = listdir(directory, extension=".png")
			patches_paths = [os.path.join(directory, patches_name) for patches_name in patches_names]
			self.paths_patches.extend(patches_paths)

	def get_negative_diagnosis_directories(self, csv_path: str) -> list:
		"""
		Given a CSV file path, returns a list of directories for the NEGATIVE Diagnosis.
		Each directory follows the format "/fhome/vlia/helicoDataSet/CrossValidation/Cropped/patientCODI",
		where "patientCODI" is based on the CODI column in the CSV.

		:param csv_path: Path to the CSV file
		:return: List of directory paths as strings
		"""
		data = pd.read_csv(csv_path)

		# Filter rows where the DENSITAT is "NEGATIVA"
		negative_diagnosis = data[data["DENSITAT"] == "NEGATIVA"]

		# Create directory paths based on the CODI values
		directories = [
			os.path.join(self.cropped_path, codi)
			for codi in negative_diagnosis["CODI"]
		]

		return directories

	def __getitem__(self, index) -> Any:
		return transform_image(Image.open(self.paths_patches[index]).convert("RGB"), (256, 256))

	def __len__(self) -> int:
		return len(self.paths_patches)


class HelicoDatasetClassification(Dataset):
	def __init__(self) -> None:
		super().__init__()
		# Initialize paths
		path_error = ensure_dataset_path_yaml()
		if path_error == 1:
			print(f"Dataset path not found in config.yml. Defaulting to {default_path}")
			if not os.path.exists(default_path):
				raise FileNotFoundError(f"Default path {default_path} does not exist. Specify a valid path in config.yml.")
		elif path_error == 2:
			current_path = yaml.safe_load(open("config.yml", "r"))["dataset_path"]
			raise FileNotFoundError(f"Dataset path {current_path} does not exist. Specify a valid path in config.yml.")
		
		self.dataset_path = yaml.safe_load(open("config.yml", "r"))["dataset_path"]
		self.annotated_path = os.path.join(self.dataset_path, "CrossValidation", "Annotated")
		self.excel_file_path = os.path.join(self.dataset_path, "HP_WSI-CoordAnnotatedPatches.xlsx")

		self.paths_labels = self.get_paths_and_labels(self.annotated_path, self.excel_file_path)
	
	def get_paths_and_labels(self, annotated_path: str, excel_path: str) -> tuple:
		"""
		Given the annotated path and an Excel file path with columns "Path_ID", "Window_ID", and "Presence" (which ranges -1, 0, 1),
		returns a list of tuples (path, label). The label is 0 if the presence is -1, and 1 if the presence is 1.
		Samples with "Presence" 0 are ignored.
		"""
		# Load the Excel file
		data = pd.read_excel(excel_path)

		# Filter the rows with "Presence" -1 or 1
		data = data[(data["Presence"] == -1) | (data["Presence"] == 1)]

		# Create a list of tuples (path, label)
		paths_labels = [
			(os.path.join(annotated_path, f"{row['Pat_ID']}",f"{row['Window_ID']}.png"), 0 if row["Presence"] == -1 else 1)
			for _, row in data.iterrows()
		]
		paths = [os.path.join(annotated_path, filename) for filename in listdir(annotated_path)]
		actual_paths = []
		for path_label_tup in paths_labels:
			path_label = os.path.dirname(path_label_tup[0])
			for path in paths:
				if path_label == path[:-2]:
					basename = os.path.splitext(os.path.basename(path_label_tup[0]))[0].zfill(5)
					listed_basenames = listdir(path)
					actual_basenames = [listed_basename for listed_basename in listed_basenames if listed_basename.startswith(basename)]
					label_append = path_label_tup[1]
					for actual_basename in actual_basenames:
						path_append = os.path.join(path, actual_basename)
						actual_paths.append((path_append, label_append))
					break

		return actual_paths
		
	def __getitem__(self, index) -> Any:
		path, label = self.paths_labels[index]
		return transform_image(Image.open(path).convert("RGB"), (256, 256)), label
	
	def __len__(self) -> int:
		return len(self.paths_labels)

if __name__ == "__main__":
	# dataset = HelicoDatasetAnomalyDetection()
	dataset = HelicoDatasetClassification()
	print(len(dataset))
	print(dataset[0][0].shape, dataset[0][1])
	# Split the dataset into training and testing sets
	train_size = int(0.8 * len(dataset))
	test_size = len(dataset) - train_size
	train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
	k_folds = 5

	# Extract labels for StratifiedKFold
	labels = [label for _, label in train_dataset]

	stratified_kfold = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

	for fold, (train_idx, val_idx) in enumerate(stratified_kfold.split(train_dataset, labels)):
		print(f"FOLD {fold}")
		print("--------------------------------")

		# Sample elements randomly from a given list of indices, no replacement.
		train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
		val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)
        



2676
torch.Size([3, 256, 256]) 0
FOLD 0
--------------------------------
FOLD 1
--------------------------------
FOLD 2
--------------------------------
FOLD 3
--------------------------------
FOLD 4
--------------------------------


In [11]:
import os

# Path to check
path_to_check = "C:\\Users\\Usuario\\Documents\\GitHub\\MED-GIA\\data\\HelicoDataSet\\HP_WSI-CoordAnnotatedPatches.xlsx"

# Check if the path exists
if os.path.exists(path_to_check):
    print(f"The path exists: {path_to_check}")
else:
    print(f"The path does not exist: {path_to_check}")

gt = "C:\\Users\\Usuario\\Documents\\GitHub\\MED-GIA\\data\\HelicoDataSet\\HP_WSI-CoordAllAnnotatedPatches.xlsx"

if path_to_check != gt:
    print(f"The paths are different:\nPath to check: {path_to_check}\nGround truth: {gt}")

The path exists: C:\Users\Usuario\Documents\GitHub\MED-GIA\data\HelicoDataSet\HP_WSI-CoordAnnotatedPatches.xlsx
The paths are different:
Path to check: C:\Users\Usuario\Documents\GitHub\MED-GIA\data\HelicoDataSet\HP_WSI-CoordAnnotatedPatches.xlsx
Ground truth: C:\Users\Usuario\Documents\GitHub\MED-GIA\data\HelicoDataSet\HP_WSI-CoordAllAnnotatedPatches.xlsx


### Model

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class HelicobacterClassifier(nn.Module):
    def __init__(self):
        super(HelicobacterClassifier, self).__init__()

        # (B, C, H, W) -> (B, 32, H/2, W/2)
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        # (B, 32, H/2, W/2) -> (B, 64, H/4, W/4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        # (B, 64, H/4, W/4) -> (B, 128, H/8, W/8)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        # (B, 128, H/8, W/8) -> (B, 128, H/16, W/16)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(128 * 32 * 32, 512)
        self.fc2 = nn.Linear(512, 2)  

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 128 * 32 * 32)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

### Train Loop

In [None]:
import torch
import torch.nn as nn
import wandb
from torch.utils.data import DataLoader
from model import HelicobacterClassifier
from utils import HelicoDatasetClassification
from sklearn.model_selection import StratifiedKFold


def train(model, loss_function, optimizer, train_loader, val_loader, device, num_epochs=10):
    """
    Train the model on the given dataset for the specified number of epochs.

    :param model: The model to train
    :param loss_function: The loss function to use
    :param optimizer: The optimizer to use
    :param train_loader: The training data loader
    :param val_loader: The validation data loader
    :param num_epochs: The number of epochs to train for
    """
    model = model.to(device)
    print("Starting training")
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for i, data in enumerate(train_loader): # Data is a tuple ([B, C, H, W], [B])
            img, label = data
            img = img.to(device)
            label = label.to(device)
            optimizer.zero_grad()
            output = model(img)
            loss = loss_function(output, label)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch + 1}, Loss: {avg_loss}")
        wandb.log({"epoch": epoch + 1, "loss": avg_loss})
        # Validation
        if val_loader is not None:
            model.eval()
            val_loss = 0
            correct = 0
            total = 0
            with torch.no_grad():
                for i, data in enumerate(val_loader):
                    img, label = data
                    img = img.to(device)
                    label = label.to(device)
                    output = model(img)
                    loss = loss_function(output, label)
                    val_loss += loss.item()
                    _, predicted = torch.max(output.data, 1)
                    total += label.size(0)
                    correct += (predicted == label).sum().item()
            avg_val_loss = val_loss / len(val_loader)
            accuracy = 100 * correct / total
            print(f"Epoch {epoch + 1}, Validation Loss: {avg_val_loss}, Accuracy: {accuracy}%")
            wandb.log({"epoch": epoch + 1, "val_loss": avg_val_loss, "accuracy": accuracy})

if __name__ == "__main__":
    # Initialize wandb
    wandb.login(key="07313fef21f9b32f2fb1fb00a2672258c8b5c3d4")
    wandb.init(project="MED-GIA")
    
    # Set hyperparameters
    wandb.config = {
        "learning_rate": 0.001,
        "epochs": 8,
        "batch_size": 256,
        "optimizer" : "adam",
        "k_folds": 5
    }

    print("num_epochs: ", wandb.config["epochs"])
    print("batch_size: ", wandb.config["batch_size"])
    print("learning_rate: ", wandb.config["learning_rate"])
    print("k_folds: ", wandb.config["k_folds"])
    
    # Load the dataset
    dataset = HelicoDatasetClassification()
    
    # Split the dataset into training and testing sets
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    k_folds = wandb.config["k_folds"]
     
    # Initialize the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    stratified_kfold = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

    train_labels = [label for _, label in train_dataset]
    
    for fold, (train_idx, val_idx) in enumerate(stratified_kfold.split(train_dataset, train_labels)):
        print(f"FOLD {fold}")
        print("--------------------------------")
        
        # Sample elements randomly from a given list of indices, no replacement.
        train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
        val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)
        
        # Define data loaders for training and validation
        train_loader = DataLoader(train_dataset, batch_size=wandb.config["batch_size"], sampler=train_subsampler)
        val_loader = DataLoader(train_dataset, batch_size=wandb.config["batch_size"], sampler=val_subsampler)
        
        # Initialize the model
        model = HelicobacterClassifier()
        loss_function = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=wandb.config["learning_rate"])
        
        # Train the model
        train(model, loss_function, optimizer, train_loader, val_loader, device, num_epochs=wandb.config["epochs"])
        
        # Save the model for each fold
        torch.save(model.state_dict(), f"HelicobacterClassifier_fold{fold}.pth")
        wandb.save(f"HelicobacterClassifier_fold{fold}.pth")
    
    # Final training on the entire training dataset
    final_train_loader = DataLoader(train_dataset, batch_size=wandb.config["batch_size"], shuffle=True)
    final_model = HelicobacterClassifier().to(device)
    final_loss_function = nn.CrossEntropyLoss()
    final_optimizer = torch.optim.Adam(final_model.parameters(), lr=wandb.config["learning_rate"])
    
    train(final_model, final_loss_function, final_optimizer, final_train_loader, None, device, num_epochs=wandb.config["epochs"])
    
    # Save the final model
    torch.save(final_model.state_dict(), "HelicobacterClassifier_final.pth")
    wandb.save("HelicobacterClassifier_final.pth")




num_epochs:  8
batch_size:  256
learning_rate:  0.001
k_folds:  5


AttributeError: 'HelicoDatasetClassification' object has no attribute 'label'