# PyTorch

In [None]:
## For installing pytorch, use:
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [1]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from PIL import Image
import torch.nn as nn
import torch.optim as optim
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
import random
import tifffile as tiff
import torchvision.models as models


In [2]:
# Define paths
csv_file = "../metadata.csv"
image_folder = "../IMC_images"

# Load the CSV
df = pd.read_csv(csv_file)

# Filter rows with NA in PDL1_score and convert to binary
df = df.dropna(subset=["PDL1_score"])
df["PDL1_score"] = df["PDL1_score"].astype(int)

device = "cuda"

# Train-test split
train_df, val_and_test_df = train_test_split(df, test_size=0.4, random_state=42, stratify=df["PDL1_score"])
test_df, val_df = train_test_split(val_and_test_df, test_size=0.5, random_state=42, stratify=val_and_test_df["PDL1_score"])

class IMCDataset(Dataset):
    def __init__(self, dataframe, image_folder, transform=None):
        self.dataframe = dataframe
        self.image_folder = image_folder
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        image_path = os.path.join(self.image_folder, f"{row['sample_id']}.tiff")
        
        # Load the multi-channel tiff image with tifffile
        image = tiff.imread(image_path)  # This will load all 46 channels

        # Convert to a PyTorch tensor
        image = torch.tensor(image, dtype=torch.float32)

        # Ensure the shape is (channels, height, width)
        if image.shape[0] != 46:
            raise ValueError(f"Expected 46 channels, but got {image.shape[0]} in {image_path}")

        # Apply transformations if defined
        if self.transform:
            image = self.transform(image)

        label = torch.tensor(row["PDL1_score"], dtype=torch.long)
        
        return image, label

In [3]:
# Initialize variables to accumulate sum and sum of squares
nr_images = 0
sum_images = torch.zeros((46, 224, 224))  # for 46 channels, assuming the image size is 224x224
sum_squared_images = torch.zeros((46, 224, 224))  # to accumulate squared pixel values

# Assuming unnormalized_train_dataset is an instance of IMCDataset
unnormalized_train_dataset = IMCDataset(train_df, image_folder)

# Loop through the dataset to accumulate the sum and sum of squared pixel values
for image, _ in unnormalized_train_dataset:
    nr_images += 1
    sum_images += image
    sum_squared_images += image ** 2

# Compute the mean and standard deviation
mean = sum_images / nr_images
std = torch.sqrt(sum_squared_images / nr_images - mean ** 2)

# Define transforms
transform = transforms.Compose([
    transforms.Normalize(mean=mean, std=std)
])

data_augmentation_transform = transforms.Compose([
    transforms.Normalize(mean=mean, std=std),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
])

# Create datasets and dataloaders
train_dataset = IMCDataset(train_df, image_folder, transform=transform)
train_data_augmentation_dataset = IMCDataset(train_df, image_folder, transform=data_augmentation_transform)
val_dataset = IMCDataset(val_df, image_folder, transform=transform)
test_dataset = IMCDataset(test_df, image_folder, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
train_data_augmentation_loader = DataLoader(train_data_augmentation_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [4]:
# Old model:
# # Define the model
# class SimpleCNN(nn.Module):
#     def __init__(self, num_channels=46, num_classes=2):
#         super(SimpleCNN, self).__init__()
#         self.conv1a = nn.Conv2d(num_channels, 32, kernel_size=3, padding=1)
#         self.conv1b = nn.Conv2d(32, 32, kernel_size=3, padding=1)
#         self.bn1 = nn.BatchNorm2d(32)
#         self.conv1c = nn.Conv2d(32, 32, kernel_size=3, padding=1)
#         self.bn2 = nn.BatchNorm2d(32)
#         self.conv1d = nn.Conv2d(32, 32, kernel_size=3, padding=1)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.conv2 = nn.Conv2d(32, 20, kernel_size=3, padding=1)
#         self.fc1 = nn.Linear(20 * 56 * 56, 50)
#         self.fc2 = nn.Linear(50, num_classes)
#         self.relu = nn.ReLU()
#         self.dropout = nn.Dropout(0.5)

#     def forward(self, x):
#         x = self.pool(self.relu(self.conv1a(x)))
#         x = self.bn1(x)
#         x = self.relu(self.conv1b(x))
#         x = self.relu(self.conv1c(x))
#         x = self.bn2(x)
#         x = self.relu(self.conv1d(x))
#         x = self.pool(self.relu(self.conv2(x)))
#         x = x.view(x.size(0), -1)  # Flatten
#         x = self.relu(self.fc1(x))
#         x = self.dropout(x)
#         x = self.fc2(x)
#         return x


In [5]:
class ModifiedResNet(nn.Module):
    def __init__(self, num_classes=2, input_channels=46):
        super(ModifiedResNet, self).__init__()

        # Load the pretrained ResNet18 model
        self.model = models.resnet18(pretrained=True)

        # Modify the first convolution layer to accept 46 input channels
        # The original model has 3 input channels (RGB), so we replace it with a layer that accepts 46 channels
        self.model.conv1 = nn.Conv2d(input_channels, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # Modify the final fully connected layer to output 2 classes
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)

    def forward(self, x):
        return self.model(x)

# Create an instance of the modified model
model = ModifiedResNet(num_classes=2, input_channels=46).to(device)

# for param in model.model.parameters():
#     param.requires_grad = False

# # Unfreeze the layers you want to train
# model.model.conv1.requires_grad = True
# model.model.fc.requires_grad = True

# Initialize loss, optimizer
class_weights = compute_class_weight('balanced', classes=np.unique(train_df["PDL1_score"]), y=train_df["PDL1_score"])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)

# Used for knowning when to save the model
best_validation_accuracy = 0.0



In [8]:
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs, max_patience=5):
    global best_validation_accuracy
    # Stop when patience reaches max_patience
    patience = 0
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward + backward + optimize
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            running_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, accuracy: {100 * correct / total:.2f}%")

        # Validation step
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_accuracy = 100 * correct / total
        print(f"Validation Accuracy: {val_accuracy:.2f}%")
        
        if val_accuracy > best_validation_accuracy:
            best_validation_accuracy = val_accuracy
            torch.save(model, 'best_model.pth')
            print("New best accuracy found.")
            patience = 0
        else:
            patience += 1
            if patience > max_patience:
                print("Ran out of patience, stopping.")
                break

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=30)


Epoch 1, Loss: 0.09168258290737867, accuracy: 97.97%
Validation Accuracy: 76.65%
New best accuracy found.
Epoch 2, Loss: 0.03615222326479852, accuracy: 99.15%
Validation Accuracy: 69.54%
Epoch 3, Loss: 0.013712524593574926, accuracy: 100.00%
Validation Accuracy: 78.17%
New best accuracy found.
Epoch 4, Loss: 0.0043061112199211495, accuracy: 100.00%
Validation Accuracy: 77.16%
Epoch 5, Loss: 0.0015587147732730954, accuracy: 100.00%
Validation Accuracy: 76.14%
Epoch 6, Loss: 0.0018720589810982346, accuracy: 100.00%
Validation Accuracy: 76.65%
Epoch 7, Loss: 0.011632919844123535, accuracy: 99.66%
Validation Accuracy: 75.63%
Epoch 8, Loss: 0.006649886607192457, accuracy: 99.83%
Validation Accuracy: 75.63%
Epoch 9, Loss: 0.004007341220858507, accuracy: 99.83%
Validation Accuracy: 74.62%
Ran out of patience, stopping.


In [9]:
# Load the best model
model = torch.load("best_model.pth")

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test accuracy: {100 * correct / total:.2f}%")

  model = torch.load("best_model.pth")


Test accuracy: 78.68%


## Trying with autoencoders

In [111]:
224 / 8

28.0

In [7]:

class Autoencoder(nn.Module):
    def __init__(self, num_channels=46):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(num_channels, 64, kernel_size=3, stride=2, padding=1),  # Conv layer with downsampling
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),  # More downsampling
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=2, padding=1),  # Even more downsampling
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128 * 28 * 28, 1024),  # Flatten and reduce to a bottleneck dimension
            nn.ReLU()
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(1024, 128 * 28 * 28),
            nn.ReLU(),
            nn.Unflatten(1, (128, 28, 28)),  # Reshape to image format
            nn.ConvTranspose2d(128, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, num_channels, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x_reconstructed = self.decoder(x)
        return x_reconstructed

# Instantiate the autoencoder model
autoencoder = Autoencoder(num_channels=46)
autoencoder.to(device)

Autoencoder(
  (encoder): Sequential(
    (0): Conv2d(46, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (5): ReLU()
    (6): Flatten(start_dim=1, end_dim=-1)
    (7): Linear(in_features=100352, out_features=1024, bias=True)
    (8): ReLU()
  )
  (decoder): Sequential(
    (0): Linear(in_features=1024, out_features=100352, bias=True)
    (1): ReLU()
    (2): Unflatten(dim=1, unflattened_size=(128, 28, 28))
    (3): ConvTranspose2d(128, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (4): ReLU()
    (5): ConvTranspose2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (6): ReLU()
    (7): ConvTranspose2d(64, 46, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), output_padding=(1, 1))
    (8): Sigmoid()
  )
)

In [8]:
# Optimizer for the autoencoder
autoencoder_optimizer = optim.Adam(autoencoder.parameters(), lr=1e-4)

# Loss function for reconstruction (Mean Squared Error)
mse_loss = nn.MSELoss()

# Training the autoencoder
def train_autoencoder(model, dataloader, optimizer, criterion, epochs=20):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for images, _ in dataloader:
            images = images.to(device)
            
            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            reconstructed = model(images)
            loss = criterion(reconstructed, images)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Autoencoder Loss: {running_loss/len(dataloader)}")

# Train the autoencoder first
train_autoencoder(autoencoder, train_loader, autoencoder_optimizer, mse_loss, epochs=10)


Epoch 1/10, Autoencoder Loss: 237.12313725398138
Epoch 2/10, Autoencoder Loss: 235.27093857985275
Epoch 3/10, Autoencoder Loss: 234.76539377065805


KeyboardInterrupt: 

In [119]:
class Classifier(nn.Module):
    def __init__(self, bottleneck_size=1024, num_classes=2):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(bottleneck_size, 512)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Freeze the autoencoder to use its encoder part for feature extraction
autoencoder.eval()  # Set to evaluation mode

# Create the classifier
classifier = Classifier(bottleneck_size=1024, num_classes=2)
classifier.to(device)

class_weights = compute_class_weight('balanced', classes=np.unique(train_df["PDL1_score"]), y=train_df["PDL1_score"])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)


In [120]:

# Optimizer for the classifier
classifier_optimizer = optim.Adam(classifier.parameters(), lr=1e-4)

# Training loop for the classifier
def train_classifier(classifier, dataloader, val_loader, optimizer, criterion, epochs=10):
    for epoch in range(epochs):
        classifier.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            # Extract features from the autoencoder's encoder
            with torch.no_grad():
                features = autoencoder.encoder(images)

            # Classifier forward pass
            outputs = classifier(features)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        print(f"Epoch {epoch+1}/{epochs}, Classifier Loss: {running_loss/len(dataloader)}, Accuracy: {100 * correct / total:.2f}%")

        
        # Validation step
        classifier.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                 # Extract features from the autoencoder's encoder
                with torch.no_grad():
                    features = autoencoder.encoder(images)

                # Classifier forward pass
                outputs = classifier(features)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"Validation Accuracy: {100 * correct / total:.2f}%")

# Train the classifier
train_classifier(classifier, train_loader, val_loader, classifier_optimizer, criterion, epochs=10)


Epoch 1/10, Classifier Loss: 1.2155511746039758, Accuracy: 49.94%
Validation Accuracy: 57.36%
Epoch 2/10, Classifier Loss: 0.7878441031162555, Accuracy: 50.44%
Validation Accuracy: 57.36%
Epoch 3/10, Classifier Loss: 0.7239592671394348, Accuracy: 49.56%
Validation Accuracy: 57.36%
Epoch 4/10, Classifier Loss: 0.7189832283900335, Accuracy: 45.62%
Validation Accuracy: 57.36%
Epoch 5/10, Classifier Loss: 0.7058533384249761, Accuracy: 51.46%
Validation Accuracy: 57.36%
Epoch 6/10, Classifier Loss: 0.7180602917304406, Accuracy: 46.76%
Validation Accuracy: 57.36%
Epoch 7/10, Classifier Loss: 0.7160849158580487, Accuracy: 46.25%
Validation Accuracy: 57.36%
Epoch 8/10, Classifier Loss: 0.7228931876329275, Accuracy: 46.25%
Validation Accuracy: 57.36%
Epoch 9/10, Classifier Loss: 0.7259556834514325, Accuracy: 46.25%
Validation Accuracy: 57.36%
Epoch 10/10, Classifier Loss: 0.7279959550270667, Accuracy: 46.25%
Validation Accuracy: 57.36%
