# Import packages


In [None]:
from pathlib import Path
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import skimage
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from joblib import Parallel, delayed
from PIL import Image
from skimage import transform
from skimage.color import rgb2hed, rgba2rgb
from skimage.io import imread
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, root_mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.svm import SVR
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
from torchsummary import summary
from torchvision import datasets
from torchvision.transforms import functional as F

In [None]:
writer = SummaryWriter("runs/experiment_1")

# Utility functions:


Define some utility functions for working with images.


In [None]:
def read_image(image_id: str) -> np.array:
    """Reads an image from the dataset

    Args:
        image_id (str): The id of the image to be read

    Returns:
        np.array: The image as a numpy array
    """

    image_folder = Path("data/patches_256")

    image_path = image_folder / f"{image_id}.png"

    rgb_image = imread(image_path)

    # if the image has an alpha channel, remove it
    if rgb_image.shape[-1] == 4:
        rgb_image = rgba2rgb(rgb_image)

    return rgb_image

In [None]:
def convert_rgb_to_hed(input_rgb_image: np.array) -> np.array:
    """
    Converts an RGB image to the HED color space.

    Parameters:
        input_rgb_image (np.array): The input RGB image.

    Returns:
        np.array: The image converted to the HED color space.
    """
    hed_image = rgb2hed(input_rgb_image)
    return hed_image

In [None]:
def calculate_intensity_avg(input_image: np.array, channel: int) -> float:
    """
    Calculates the average intensity for a specific channel in an RGB or HED image.

    Parameters:
        input_image (np.array): The input image (RGB or HED).
        channel (int): The channel index for which to calculate the average intensity.

    Returns:
        float: The average intensity for the specified channel.
    """
    return input_image[:, :, channel].mean()

In [None]:
def calculate_intensity_std(input_image: np.array, channel: int) -> float:
    """
    Calculates the standard deviation of the intensity for a specific channel in an RGB or HED image.

    Parameters:
        input_image (np.array): The input image (RGB or HED).
        channel (int): The channel index for which to calculate the standard deviation of the intensity.

    Returns:
        float: The standard deviation of the intensity for the specified channel.
    """
    return input_image[:, :, channel].std()

In [None]:
def calculate_avg_h_intensity(image_id: str) -> dict:
    """
    Calculate the average H intensity of an image.

    Parameters:
        image_id (str): The ID of the image.

    Returns:
        dict: A dictionary containing the image ID and the average H intensity.
    """
    rgb_image = read_image(image_id)
    hed_image = convert_rgb_to_hed(rgb_image)
    avg_h_intensity = calculate_intensity_avg(hed_image, 0)
    return {
        "image_id": image_id,
        "avg_h_intensity": avg_h_intensity,
    }

In [None]:
def load_data() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Loads protein expression data from a CSV file and splits it into training and testing datasets.

    Returns:
        training_data (pandas.DataFrame): The training dataset containing specimens A1, B1, and D1.
        testing_data (pandas.DataFrame): The testing dataset containing specimen C1.
    """
    df = pd.read_csv(
        "https://warwick.ac.uk/fac/sci/dcs/teaching/material/cs909/protein_expression_data.csv"
    )

    # create specimen id field
    df["specimen_id"] = df.VisSpot.apply(lambda x: x.split("-")[2])

    # create image id field
    df["image_id"] = df.VisSpot.apply(lambda x: x.split("-")[2]) + "_" + df.id

    df = df.set_index("image_id").sort_index()

    # use specimens A1, B1 and D1 for training
    training_data = df.loc[df["specimen_id"].isin(["A1", "B1", "D1"])]

    # use specimen C1 for testing
    testing_data = df.loc[df["specimen_id"].isin(["C1"])]

    return training_data, testing_data

# Load data


Load the proteint expression data after splitting it into training and testing:


In [None]:
training_data, testing_data = load_data()

In [None]:
print("Number of training samples:", len(training_data))
print("Number of testing samples:", len(testing_data))

In [None]:
training_data, validation_data = train_test_split(
    training_data, test_size=0.2, random_state=42
)

In [None]:
print("Number of training samples:", len(training_data))
print("Number of validation samples:", len(validation_data))
print("Number of testing samples:", len(testing_data))

In [None]:
training_rgb_images_list = []
training_hed_images_list = []


def process_image(image_id):
    rgb_image = read_image(image_id)
    rgb_image_resized = transform.resize(rgb_image, (64, 64), anti_aliasing=True)
    hed_image = convert_rgb_to_hed(rgb_image_resized)

    return rgb_image_resized, hed_image


training_images_list = Parallel(n_jobs=-1, verbose=10)(
    delayed(process_image)(image_id) for image_id in training_data.index
)

testing_images_list = Parallel(n_jobs=-1, verbose=10)(
    delayed(process_image)(image_id) for image_id in testing_data.index
)

training_rgb_images_list = [result[0] for result in training_images_list]
training_hed_images_list = [result[1] for result in training_images_list]

testing_rgb_images_list = [result[0] for result in testing_images_list]
testing_hed_images_list = [result[1] for result in testing_images_list]

In [None]:
training_rgb_images_list[0].shape

In [None]:
testing_hed_images_list[0].shape

# Question No. 1: (Data Analysis)


For the following questions, we will use only the `training_data`


## Counting Examples:


In [None]:
(
    training_data.groupby("specimen_id", as_index=False)
    .agg(n_sample=("id", "count"))
    .sort_values("n_sample", ascending=False)
)

## Protein Expression Histograms


In [None]:
ax = sns.displot(data=training_data, x="NESTIN", col="specimen_id", hue="specimen_id")

ax.set_titles("Protein expression in specimen NESTIN")
ax.set_xlabels("Protein expression")
ax.set_ylabels("Frequency")

In [None]:
ax = sns.displot(data=training_data, x="cMYC", col="specimen_id", hue="specimen_id")

ax.set_titles("Protein expression in specimen cMYC")
ax.set_xlabels("Protein expression")
ax.set_ylabels("Frequency")

In [None]:
ax = sns.displot(data=training_data, x="MET", col="specimen_id", hue="specimen_id")

ax.set_titles("Protein expression in specimen MET")
ax.set_xlabels("Protein expression")
ax.set_ylabels("Frequency")

From the above plots, we notice the following:

1. Different protients have different ranges. `NESTIN` has values in the range `[-7, 1]`, `cMYC` has values in the range `[-10.5, 3.2]`, and `MET` has values in the range `[-10.7, 1.58]`

2. The majority of the different protient values across different specimens are centered around 0, with fewer values spread around the extreme.


## Image Pre-processing


In [None]:
np.random.seed(42)

random_image_ids = np.random.choice(training_data.index, size=10)

for image_id in random_image_ids:
    rgb_image = read_image(image_id)

    hed_image = convert_rgb_to_hed(rgb_image)

    fig, ax = plt.subplots(1, 2, figsize=(10, 5))

    ax[0].imshow(rgb_image)
    ax[0].set_title("RGB Image")
    ax[0].axis("off")

    ax[1].imshow(hed_image[:, :, 0], cmap="gray")
    ax[1].set_title("H Channel")
    ax[1].axis("off")

    plt.show()

## H-channel Analysis


In [None]:
avg_h_intensity_list = Parallel(n_jobs=-1, verbose=10)(
    delayed(calculate_avg_h_intensity)(image_id) for image_id in training_data.index
)

In [None]:
avg_h_intensity_df = pd.DataFrame(avg_h_intensity_list).set_index("image_id")

In [None]:
avg_h_intensity_df = avg_h_intensity_df.join(training_data[["NESTIN", "specimen_id"]])

In [None]:
avg_h_intensity_df.head()

In [None]:
ax = sns.scatterplot(
    data=avg_h_intensity_df,
    x="avg_h_intensity",
    y="NESTIN",
    hue="specimen_id",
    alpha=0.2,
)

ax.set_title("Average H intensity vs NESTIN expression")
ax.set_xlabel("Average H intensity")
ax.set_ylabel("NESTIN expression")

In [None]:
correlation = avg_h_intensity_df["avg_h_intensity"].corr(avg_h_intensity_df["NESTIN"])

In [None]:
print(
    f"The correlation between average H intensity and NESTIN expression is {correlation:.2f}"
)

From the scatter plot and the correlation value we can see that there is a positive relation between the average intensity value of the `H` channel and the expression levels of `NESTIN`.

However, this correlation is weak and won't capture the true relation of the target variable.


## Performance Metrics for Prediction


# Question No. 2: (Feature Extraction and Classical Regression)


In [None]:
def calculate_image_channel_stats(image_id: str):
    """
    Calculate the intensity statistics for each channel of an image.

    Args:
        image_id (str): The ID of the image.

    Returns:
        dict: A dictionary containing the image ID and the calculated intensity statistics for each channel.
            - "image_id": The ID of the image.
            - "h_intensity_avg": The average intensity of the H channel in the HED color space.
            - "h_intensity_std": The standard deviation of the intensity of the H channel in the HED color space.
            - "r_intensity_avg": The average intensity of the R channel in the RGB color space.
            - "r_intensity_std": The standard deviation of the intensity of the R channel in the RGB color space.
            - "g_intensity_avg": The average intensity of the G channel in the RGB color space.
            - "g_intensity_std": The standard deviation of the intensity of the G channel in the RGB color space.
            - "b_intensity_avg": The average intensity of the B channel in the RGB color space.
            - "b_intensity_std": The standard deviation of the intensity of the B channel in the RGB color space.
    """
    rgb_image = read_image(image_id)
    hed_image = convert_rgb_to_hed(rgb_image)

    h_intensity_avg = calculate_intensity_avg(hed_image, 0)
    h_intensity_std = calculate_intensity_std(hed_image, 0)

    r_intensity_avg = calculate_intensity_avg(rgb_image, 0)
    r_intensity_std = calculate_intensity_std(rgb_image, 0)

    g_intensity_avg = calculate_intensity_avg(rgb_image, 1)
    g_intensity_std = calculate_intensity_std(rgb_image, 1)

    b_intensity_avg = calculate_intensity_avg(rgb_image, 2)
    b_intensity_std = calculate_intensity_std(rgb_image, 2)

    return {
        "image_id": image_id,
        "h_intensity_avg": h_intensity_avg,
        "h_intensity_std": h_intensity_std,
        "r_intensity_avg": r_intensity_avg,
        "r_intensity_std": r_intensity_std,
        "g_intensity_avg": g_intensity_avg,
        "g_intensity_std": g_intensity_std,
        "b_intensity_avg": b_intensity_avg,
        "b_intensity_std": b_intensity_std,
    }

In [None]:
image_channels_stats_list = Parallel(n_jobs=-1, verbose=10)(
    delayed(calculate_image_channel_stats)(image_id) for image_id in training_data.index
)

In [None]:
image_channels_stats_df = pd.DataFrame(image_channels_stats_list).set_index("image_id")

In [None]:
image_channels_stats_df = image_channels_stats_df.join(
    training_data[["NESTIN", "specimen_id"]]
)

In [None]:
image_channels_stats_df.head()

In [None]:
# rgb_images_list = []
# hed_images_list = []

# for image_id in training_data.index:
#     rgb_image = read_image(image_id)
#     rgb_image_resized = transform.resize(rgb_image, (64, 64), anti_aliasing=True)
#     hed_image = convert_rgb_to_hed(rgb_image_resized)

#     rgb_images_list.append(rgb_image_resized)
#     hed_images_list.append(hed_image)

In [None]:
# test_rgb_images_list = []
# test_hed_images_list = []

# for image_id in testing_data.index:
#     rgb_image = read_image(image_id)
#     rgb_image_resized = transform.resize(rgb_image, (64, 64), anti_aliasing=True)
#     hed_image = convert_rgb_to_hed(rgb_image_resized)

#     test_rgb_images_list.append(rgb_image_resized)
#     test_hed_images_list.append(hed_image)

## PCA

In [None]:
training_hed_images_list[0].shape, len(training_hed_images_list)

In [None]:
X_hed_train = np.stack([hed_image[:, :, 0].flatten() for hed_image in training_hed_images_list], axis=0)

In [None]:
X_hed_test = np.stack([hed_image[:, :, 0].flatten() for hed_image in testing_hed_images_list], axis=0)

In [None]:
print(f"X_train shape: {X_hed_train.shape}")
print(f"X_test shape: {X_hed_test.shape}")

In [None]:
pca = PCA(svd_solver="randomized")

In [None]:
# fit the PCA model on the training data
pca.fit(X_hed_train)

In [None]:
# find the number of components that explain 95% of the variance
n_components = np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1
print(f"Number of components explaining 95% of variance: {n_components}")

In [None]:
X_hed_train_pca = pca.transform(X_hed_train)
X_hed_test_pca = pca.transform(X_hed_test)

In [None]:
X_hed_train_reduced = X_hed_train_pca[:, :n_components]
X_hed_test_reduced = X_hed_test_pca[:, :n_components]

In [None]:
print("X_train_reduced shape:", X_hed_train_reduced.shape)
print("X_test_reduced shape:", X_hed_test_reduced.shape)

## SVR

In [None]:
SVR_model = SVR()

In [None]:
SVR_model.fit(X_hed_train_reduced, training_data["NESTIN"])

In [None]:
y_pred = SVR_model.predict(X_hed_test_reduced)

In [None]:
root_mean_squared_error(testing_data["NESTIN"], y_pred)

In [None]:
r2_score(testing_data["NESTIN"], y_pred)

In [None]:
sns.scatterplot(x=testing_data["NESTIN"], y=y_pred, alpha=0.5)

# Question No. 3 (Using Convolutional Neural Networks)


In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)

In [None]:
print(f"Using {device} device")

In [None]:
# Hyper parameters
num_epochs = 5
batch_size = 100
learning_rate = 0.001

In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()

        # first convolutional layer
        self.conv_layer_1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=5,
                stride=1,
                padding=2,
            ),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.ReLU(),
        )

        # second convolutional layer
        self.conv_layer_2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=2,
            ),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.ReLU(),
        )

        # fully connected layer
        self.fc = nn.Linear(32 * 17 * 17, 1)

    def forward(self, x):
        out = self.conv_layer_1(x)
        out = self.conv_layer_2(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
model = ConvNet().to(device)

In [None]:
# print the number of parameters in the model
n_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of parameters in the model: {n_params}")

In [None]:
# training data
training_image_ids = training_data.index.to_numpy()
training_labels = training_data["NESTIN"].to_numpy()

# validation data
validation_image_ids = validation_data.index.to_numpy()
validation_labels = validation_data["NESTIN"].to_numpy()

# testing data
testing_image_ids = testing_data.index.to_numpy()
testing_labels = testing_data["NESTIN"].to_numpy()

In [None]:
class RGBToHEDTransform:
    def __call__(self, pic):
        """
        Convert a PIL Image or numpy.ndarray from RGB to HED color space.

        Parameters:
            pic (PIL Image or numpy.ndarray): Image to be converted.

        Returns:
            Tensor: Converted image.
        """
        # Convert PIL Image to numpy array
        if isinstance(pic, Image.Image):
            img_array = np.array(pic)
        elif isinstance(pic, np.ndarray):
            img_array = pic
        elif torch.is_tensor(pic):
            img_array = pic.numpy()
            img_array = img_array.swapaxes(0, 2)
        else:
            raise TypeError(
                "img should be PIL Image or ndarray. Got {}".format(type(pic))
            )

        # Convert RGB to HED. The output array from rgb2hed can have negative values,
        # so it's important to scale and shift the values to bring them into a suitable range (e.g., 0 to 1) if necessary.
        hed_img = rgb2hed(img_array)

        # return the H channel
        return F.to_tensor(hed_img[:, :, 0])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, image_ids: np.array, labels: np.array, transform=None):
        self.image_ids = image_ids
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        label = self.labels[idx]

        # read the image
        rgb_image = read_image(image_id)

        # apply transformation
        if self.transform:
            transformed_image = self.transform(rgb_image)

        return transformed_image, label

In [None]:
transformations = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Resize((64, 64)),
        RGBToHEDTransform(),
    ]
)

In [None]:
training_dataset = CustomDataset(training_image_ids, training_labels, transformations)
validation_dataset = CustomDataset(
    validation_image_ids, validation_labels, transformations
)
testing_dataset = CustomDataset(testing_image_ids, testing_labels, transformations)

In [None]:
train_dataloader = DataLoader(training_dataset, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(
    validation_dataset, batch_size=batch_size, shuffle=False
)
test_dataloader = DataLoader(testing_dataset, batch_size=batch_size, shuffle=False)

In [None]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
def train(
    dataloader: DataLoader,
    model: ConvNet,
    loss_fn: nn.MSELoss,
    optimizer: torch.optim.SGD,
    epoch: int,
):

    size = len(dataloader.dataset)
    model.train()

    train_loss = 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        X = torch.tensor(X, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        train_loss += loss.item()

        if batch % 5 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

    avg_train_loss = train_loss / size
    writer.add_scalar("Loss/Train", avg_train_loss, epoch)

In [None]:
def test(dataloader, model, loss_fn, epoch):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)

    model.eval()

    test_loss = 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)

            X = torch.tensor(X, dtype=torch.float32)
            y = torch.tensor(y, dtype=torch.float32)

            pred = model(X)

            test_loss += loss_fn(pred, y).item()

    avg_val_loss = test_loss / size
    writer.add_scalar("Loss/Validation", avg_val_loss, epoch)

    print(f"Test Error: \n Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 5
for t in range(epochs):

    print(f"Epoch {t+1}\n-------------------------------")

    train(train_dataloader, model, loss_fn, optimizer, epoch=t + 1)
    test(validation_dataloader, model, loss_fn, epoch=t + 1)

print("Done!")

writer.flush()

In [None]:
model.eval()

test_loss = 0

predictions = []

with torch.no_grad():
    for X, y in test_dataloader:
        X, y = X.to(device), y.to(device)

        X = torch.tensor(X, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32)

        pred = model(X)

        predictions.append(pred)

        test_loss += loss_fn(pred, y).item()

In [None]:
predictions = torch.cat(predictions).cpu().numpy()

In [None]:
sns.scatterplot(x=testing_data["NESTIN"], y=predictions.squeeze(), alpha=0.5)