<a href="https://colab.research.google.com/github/Ofiregev/Final_Project-FetalCns/blob/main/Fetal_Cns_Final_reg_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import os
import pandas as pd
import torch.nn.functional as F
from sklearn.model_selection import KFold
import numpy as np

# Mount Google Drive
drive.mount('/content/drive')

print("Part 1 has done")

Mounted at /content/drive
Part 1 has done


In [2]:
# Part 2: Define CustomDataset Class and Read CSV File
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame.iloc[idx, 0])
        image = Image.open(img_name)
        if self.transform:
            image = self.transform(image)

        label = self.data_frame.iloc[idx, 2]  # Assuming the third column is the label

        return image, label

In [3]:

# Determine the available hardware (CPU or GPU) and set the PyTorch device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


cuda


In [4]:
# Read the list of images from the CSV file
train_csv = "/content/drive/MyDrive/FinalProject/training_set_pixel_size_and_HC.csv"
train_csv_df = pd.read_csv(train_csv)


In [5]:
train_csv_df.head()
len(train_csv_df)

999

In [6]:
# Part 3: Define Transforms and Create Dataset

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
])
train_dataset = CustomDataset(csv_file="/content/drive/MyDrive/FinalProject/training_set_pixel_size_and_HC.csv",
                              root_dir="/content/drive/MyDrive/FinalProject/Dataset/training_set/training_set/",
                              transform=transform)

In [7]:
# Part 4: Create DataLoader
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [8]:
class Model(nn.Module):
    def __init__(self, input_channels=1, input_size=64):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 64, 3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, 3, padding=1)
        self.conv4 = nn.Conv2d(256, 512, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        conv_output_size = input_size // 2 // 2 // 2 // 2  # Adjusted for additional pooling layers
        self.fc1 = nn.Linear(512 * conv_output_size * conv_output_size, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 1)
        self.dropout = nn.Dropout(0.3)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.batchnorm2 = nn.BatchNorm2d(128)
        self.batchnorm3 = nn.BatchNorm2d(256)
        self.batchnorm4 = nn.BatchNorm2d(512)

    def forward(self, x):
        x = self.pool(F.relu(self.batchnorm1(self.conv1(x))))
        x = self.pool(F.relu(self.batchnorm2(self.conv2(x))))
        x = self.pool(F.relu(self.batchnorm3(self.conv3(x))))
        x = self.pool(F.relu(self.batchnorm4(self.conv4(x))))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = self.fc4(x)
        return x

In [9]:
model = Model(input_channels=1, input_size=64)
model.to(device)

Model(
  (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=8192, out_features=1024, bias=True)
  (fc2): Linear(in_features=1024, out_features=512, bias=True)
  (fc3): Linear(in_features=512, out_features=256, bias=True)
  (fc4): Linear(in_features=256, out_features=1, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
  (batchnorm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm4)

In [10]:
# Part 6: Define Training Parameters and Optimizer
learning_rate = 0.001
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)


In [14]:
def train(model, train_loader, optimizer, criterion, epochs, device):
    model.to(device)
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            inputs = inputs.float().to(device)
            labels = labels.float().unsqueeze(1).to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)  # Calculate the MSE loss
            loss.backward()  # Compute gradients
            optimizer.step()  # Update model parameters

            running_loss += loss.item()

        epoch_loss = running_loss / len(train_loader)  # Average loss per epoch
        print(f"Epoch {epoch + 1}, Loss: {epoch_loss:.4f}")


In [15]:
def evaluate(model, val_loader, criterion, device):
    model.eval()
    val_losses = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.float().to(device)
            labels = labels.float().unsqueeze(1).to(device)

            outputs = model(inputs)
            val_loss = criterion(outputs, labels)  # Calculate the MSE loss
            val_losses.append(val_loss.item())

    fold_avg_val_loss = np.mean(val_losses)  # Average validation loss
    return fold_avg_val_loss

In [17]:
from torch.utils.data import SubsetRandomSampler

k = 2
kf = KFold(n_splits=k, shuffle=True, random_state=42)
evaluation_metrics = []
epochs = 2

# Define the function to reinitialize the model
def reset_model(model_class, *args, **kwargs):
    model = model_class(*args, **kwargs)
    return model

for fold_idx, (train_idx, val_idx) in enumerate(kf.split(train_dataset)):
    print(f"Fold {fold_idx + 1}")
    print("=========")

    # Reset the model and optimizer
    model = reset_model(Model, input_channels=1, input_size=64).to(device)
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-5)

    # Create data loaders using SubsetRandomSampler
    train_sampler = SubsetRandomSampler(train_idx)
    val_sampler = SubsetRandomSampler(val_idx)

    train_loader_fold = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler, num_workers=2)
    val_loader_fold = DataLoader(train_dataset, batch_size=batch_size, sampler=val_sampler, num_workers=4)

    # Train and evaluate
    train(model, train_loader_fold, optimizer, criterion, epochs, device)
    fold_avg_val_loss = evaluate(model, val_loader_fold, criterion, device)
    evaluation_metrics.append(fold_avg_val_loss)

    print(f"Fold {fold_idx + 1} Validation Loss: {fold_avg_val_loss:.4f}")
    print("=========")

average_metric = np.mean(evaluation_metrics)
print(f"Average Evaluation Metric: {average_metric:.4f}")


Fold 1
Epoch 1, Loss: 7352.7286
Epoch 2, Loss: 3054.5255
Fold 1 Validation Loss: 3130.8958
Fold 2
Epoch 1, Loss: 6802.7679
Epoch 2, Loss: 3051.3976
Fold 2 Validation Loss: 2167.4744
Average Evaluation Metric: 2649.1851


In [None]:
# Part 8: Save the trained model
model_path = "/content/drive/MyDrive/FinalProject/trained_model.pth"
torch.save(model.state_dict(), model_path)
print("Model saved successfully!")

In [None]:
# # Load the trained model
model_path = "/content/drive/MyDrive/FinalProject/trained_model.pth"
model = Model(input_channels=1, input_size=64)
model.load_state_dict(torch.load(model_path))
model.eval()

In [None]:
# Function to predict head circumference from a new image
def predict_head_circumference(image_path, model):
    # Open and preprocess the new image
    transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor()
])
    image = Image.open(image_path)
    image_tensor = transform(image).unsqueeze(0)  # Add batch dimension

    # Make prediction
    with torch.no_grad():
        output = model(image_tensor.float())

    return output.item()  # Return the predicted head circumference as a scalar

In [None]:
# Choose 10 random indices from the dataset
num_images_to_test = 500
random_indices = np.random.choice(len(train_csv_df), size=num_images_to_test, replace=False)

# Initialize a list to store the absolute errors
absolute_errors = []

# Loop through the selected images
for idx in random_indices:
    image_path = os.path.join("/content/drive/MyDrive/FinalProject/Dataset/training_set/training_set/", train_csv_df.iloc[idx, 0])
    ground_truth_circumference = train_csv_df.iloc[idx, 2]  # Assuming the third column contains the head circumference labels

    # Call the function to predict head circumference from the image
    predicted_circumference = predict_head_circumference(image_path, model)

    # Calculate the absolute error and add it to the list
    absolute_error = abs(predicted_circumference - ground_truth_circumference)
    absolute_errors.append(absolute_error)

    # Print the predicted and ground truth head circumferences
    print("Image:", image_path)
    print("Predicted Head Circumference:", predicted_circumference)
    print("Ground Truth Head Circumference:", ground_truth_circumference)
    print("Absolute Error:", absolute_error)
    print("==============================================")

# Calculate the average absolute error
average_absolute_error = sum(absolute_errors) / len(absolute_errors)
print("Average Absolute Error:", average_absolute_error)


In [None]:
# # Path for the new image
# new_image_path = "/content/drive/MyDrive/FinalProject/Dataset/training_set/training_set/001_HC.png"
# # Call the function to predict head circumference from the new image
# predicted_circumference = predict_head_circumference(new_image_path, model)
# print("Predicted Head Circumference:", predicted_circumference)
