In [2]:
import os

import numpy as np
import pandas as pd
from PIL import Image

import torch
from torch import nn
from torchvision import models, transforms

## Load Resnet + Head

In [3]:
## Define head structure
class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)

class FlattenLayer(Lambda):
    def __init__(self):
        super().__init__(func=lambda x: x.view(len(x), -1))

head = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)), FlattenLayer(), nn.Linear(2048, 1))

## Create hook for feature maps of resnet
feature_maps = []
def hook_fn(module, input, output):
    # Store the output of the module in a list
    feature_maps.append(output)

## Load model 
model_path = '/dhc/groups/mpws2022cl1/models/testModel_head.pt'
m_func = models.resnet50
model = m_func(pretrained=True)
model.conv1 = nn.Conv2d(
                50, 64, kernel_size=7, stride=2, padding=3, bias=False
            )
checkpoint = torch.load(model_path)
model.load_state_dict(checkpoint['resnet_state_dict'])
head.load_state_dict(checkpoint['head_state_dict'])

<All keys matched successfully>

## Make predictions

In [4]:
## Transform each img of multi tensor

def percentile_scaling_array(
        array, lower_percentile=0, upper_percentile=98, min_val=0, max_val=1
    ):
        lower_bound = np.percentile(array, lower_percentile)
        upper_bound = np.percentile(array, upper_percentile)
        array = (array - lower_bound) / (upper_bound - lower_bound)
        array = array * (max_val - min_val) + min_val
        tensor = transforms.ToTensor()(array).float()
        return tensor

In [24]:
dir_images = '/dhc/groups/mpws2022cl1/tensor/3000_GRAY/'
df_images = pd.read_csv('/dhc/groups/mpws2022cl1/tensor/3000_GRAY.csv', header=0)
df_labels = pd.read_csv('/dhc/groups/mpws2022cl1/input/ejectionFraction_normalized_multiChannel_38591.csv')

In [12]:
# add columns to df_images
df_images = df_images.assign(label="", prediction="")

In [None]:
### Predict EF for each entry in dir_images and print to dataframe 'df_images'
### Also print the label, if existing, to the dataframe 'df_images'

for index, row in df_images.iterrows():
    # Open the file with the current path
    file_path = os.path.join(dir_images, row['path'])
    mcTensor = torch.load(file_path)

    # ToTensor requires a numpy.ndarray (H x W x C)
    ## 1) transform mcTensor to npArrayList (50x 1x1x224x224)
    npArrays = mcTensor.numpy()
    npArrayList = np.split(npArrays, 50, axis=0)
    ## 2) remove dimensions of size 1 infront of the array 1,1,224,244 -> 224,244
    npArrayList = np.squeeze(npArrayList)
    ## 3) add dimentions of size 1 at the end of the array 224,244 -> 224,244,1
    npArrayList = [array[:, :, np.newaxis] for array in npArrayList]
    ## 4) scale tensor list + toTensor
    scaledTensorList = [
        percentile_scaling_array(array, 0, 98, 0, 1) for array in npArrayList
    ]
    ## 5) Get rid of the first dimension: [1,224,224] -> [224,224]
    squeezed_tensors = [tensor.squeeze(0) for tensor in scaledTensorList]
    ## 6) Stack all the transformed images back together to a create a 50 channel tensor
    stacked_tensor = torch.stack(squeezed_tensors, dim=0)

    ## Add batch dimension for resnet
    input_tensor = stacked_tensor.unsqueeze(0) 
    input_tensor.shape

    # Get the layer 4 module and register the hook
    layer4 = model.layer4
    feature_maps = []
    layer4.register_forward_hook(hook_fn)

    # Pass the input tensor through the model
    model.eval()
    head.eval()
    with torch.no_grad():
        output = model(input_tensor)

    layer4_feature_maps = feature_maps[0]

    with torch.no_grad():
        output = head(layer4_feature_maps)

    ef_predicted = output.item()

    df_images.loc[index, 'prediction'] = ef_predicted

    # add the label to that row
    entry = row['path'][:-3]
    if entry in df_labels["image"].values:
        row_index = df_labels.index[df_labels["image"] == entry].tolist()[0]
        ef_value = df_labels.loc[row_index, "Ejection Fraction"]
        df_images.loc[index, 'label'] = ef_value
    
    print(df_images.loc[index])

df_images.to_csv("predict_ef.csv", index=False)