# VGG Loss Comparison and Efficiency Test

In [1]:

import numpy as np
import torch
import time
from torchvision.models import vgg19
from torchvision.transforms import Compose, ToTensor, CenterCrop, Normalize, Lambda
from pathlib import Path
import pathlib
import h5py
from fastmri.data import transforms

  from .autonotebook import tqdm as notebook_tqdm


## Original VGG Loss Function

In [4]:

preprocess_orig = Compose([
    ToTensor(),
    CenterCrop((224, 224)),
    Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0)==1 else x),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def vgg_loss_orig(gt: np.ndarray, pred: np.ndarray) -> np.ndarray:
    # Load the pre-trained VGG19 model
    vgg = vgg19(pretrained=True).features

    # Remove the last max pooling layer to get the feature maps
    vgg = torch.nn.Sequential(*list(vgg.children())[:-1])
    
    # mathijs did not use eval() here, but it is generally a good practice
    # vgg.eval()  # Set the model to evaluation mode

    gt = gt * 255
    pred = pred * 255

    losses = []

    for gt_img, pred_img in zip(gt, pred):
        gt_tensor = preprocess_orig(gt_img).unsqueeze(0)
        pred_tensor = preprocess_orig(pred_img).unsqueeze(0)

        gt_feat = vgg(gt_tensor)
        pred_feat = vgg(pred_tensor)

        loss = torch.nn.functional.mse_loss(gt_feat, pred_feat)
        losses.append(loss)

    # Average the losses across all images in the batch
    avg_loss = torch.mean(torch.stack(losses))

    return avg_loss.detach().cpu().numpy()


## Optimized VGG Loss Function

In [5]:
import os
# Set the default GPU to GPU # 
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # Force GPU number
print(f"Using GPU: {os.environ['CUDA_VISIBLE_DEVICES']}")


Using GPU: 0


In [11]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
preprocess_opt = Compose([
    ToTensor(),
    CenterCrop((224, 224)),
    Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x),
    Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

vgg_model = vgg19(pretrained=True).features[:36].to(device).eval()

@torch.no_grad()
def vgg_loss_opt(gt: np.ndarray, pred: np.ndarray) -> np.ndarray:
    gt = gt * 255
    pred = pred * 255

    losses = []

    for gt_img, pred_img in zip(gt, pred):
        gt_tensor = preprocess_opt(gt_img).unsqueeze(0).to(device)
        pred_tensor = preprocess_opt(pred_img).unsqueeze(0).to(device)

        gt_feat = vgg_model(gt_tensor)
        pred_feat = vgg_model(pred_tensor)

        loss = torch.nn.functional.mse_loss(gt_feat, pred_feat)
        losses.append(loss)

    return torch.stack(losses).mean().item()
    #return torch.stack(losses).mean().cpu().numpy()


Using device: cuda


## Generate Dummy Test Data

In [8]:

def determine_and_apply_mask(target, recons, tgt_file):
    """
    processes two reconstruction files and applies a mask to 
    the target and reconstructed images based on the intersection of 
    non-zero values of 2 != reconstructions (sense and CS).
    => goal: only evaluate where they have meaningful values 
    Args:
        target (np.ndarray): ground truth image
        recons (np.ndarray): reconstructed image
        tgt_file (pathlib.Path): path to the target file
    """
    # define the base paths for sense + CS reconstructions
    reconstruction_sense_path_string = '/DATASERVER/MIC/GENERAL/STUDENTS/aslock2/Results/Sense/'
    reconstruction_CS_path_string = '/DATASERVER/MIC/GENERAL/STUDENTS/aslock2/Results/CS/'
    # Construct full pahts by appending target file name
    reconstruction_sense_path = pathlib.Path(reconstruction_sense_path_string) / tgt_file.name
    reconstruction_CS_path = pathlib.Path(reconstruction_CS_path_string) / tgt_file.name
    # Read reconstruction files
    reconstruction_sense = h5py.File(reconstruction_sense_path, 'r')
    reconstruction_CS = h5py.File(reconstruction_CS_path, 'r')
    reconstruction_sense = reconstruction_sense['reconstruction']
    reconstruction_CS = reconstruction_CS['reconstruction']
    # Convert to numpy arrays
    reconstruction_sense = np.array(reconstruction_sense)
    reconstruction_CS = np.array(reconstruction_CS)
    # Crop the reconstructions to the same size as the target
    reconstruction_sense = transforms.center_crop(reconstruction_sense, (target.shape[-1], target.shape[-1]))
    reconstruction_CS = transforms.center_crop(reconstruction_CS, (target.shape[-1], target.shape[-1]))
    # Create bitmasks where non-zero values in the reconstructions are marked as 1, and zero values are marked as 0.
    sense_bitmask = np.ones_like(reconstruction_sense)
    sense_bitmask = np.where(reconstruction_sense != 0, sense_bitmask, 0).astype(int)
    CS_bitmask = np.ones_like(reconstruction_CS)
    CS_bitmask = np.where(reconstruction_CS != 0, CS_bitmask, 0).astype(int)
    # Create an intersection mask where the non-zero values in the sense and CS reconstructions overlap
    intersection_mask = CS_bitmask & sense_bitmask
    # Apply the intersection mask to the target and reconstructed images
    gt = np.where(intersection_mask == 1, target, 0)
    pred = np.where(intersection_mask == 1, recons, 0)
        # If the value in intersection_mask is 1, the corresponding value from target is retained.
        # If the value in intersection_mask is 0, the corresponding value in gt is set to 0.
    return gt, pred

In [9]:


# # Dummy grayscale test data (batch of 3 images)
# gt = np.random.rand(3, 256, 256).astype(np.float32)
# pred = gt + np.random.normal(0, 0.01, size=gt.shape).astype(np.float32)

# Generate real data:
target_paths = [Path('/DATASERVER/MIC/SHARED/NYU_FastMRI/Preprocessed/multicoil_test_full/'), 
Path('/DATASERVER/MIC/SHARED/NYU_FastMRI/Knee/multicoil_val/')]
predictions_path = Path('/DATASERVER/MIC/GENERAL/STUDENTS/aslock2/Results/CSUNet/reconstructions/')

for pred_file in predictions_path.iterdir():
  ###  find matching target file (knee or brain)
  tgt_file = None
  for target_dir in target_paths:
      candidate = target_dir / pred_file.name
      if candidate.exists():
          tgt_file = candidate
          break
  assert tgt_file is not None, f"Target file not found for {pred_file.name}"
  break
print(f"Using target file: {tgt_file}")
print(f"Using prediction file: {pred_file}")

with h5py.File(tgt_file, "r") as target, h5py.File(pred_file, "r") as recons:
   
  # select target and reconstruction
  target = target["reconstruction_rss"][()] # "reconstruction_rss" of target files exists in multicoil_test_full set!
  recons = recons["reconstruction"][()]

  # center crop the images to the size of the target
  target = transforms.center_crop(
      target, (target.shape[-1], target.shape[-1])
  )
  recons = transforms.center_crop(
      recons, (target.shape[-1], target.shape[-1])
  )
  # apply non-zero mask to target and reconstruction
  gt, pred = determine_and_apply_mask(target, recons, tgt_file)

Using target file: /DATASERVER/MIC/SHARED/NYU_FastMRI/Preprocessed/multicoil_test_full/file_brain_AXT2_202_2020356.h5
Using prediction file: /DATASERVER/MIC/GENERAL/STUDENTS/aslock2/Results/CSUNet/reconstructions/file_brain_AXT2_202_2020356.h5


## Compare Results and Timing

In [12]:

start_orig = time.time()
loss_orig = vgg_loss_orig(gt, pred)
time_orig = time.time() - start_orig

start_opt = time.time()
loss_opt = vgg_loss_opt(gt, pred)
time_opt = time.time() - start_opt

print(f"Original VGG Loss:   {loss_orig:.12f}")
print(f"Optimized VGG Loss: {loss_opt:.12f}")
print(f"Difference:          {abs(loss_orig - loss_opt):.12f}")
print(f"Original Time:       {time_orig:.4f} seconds")
print(f"Optimized Time:      {time_opt:.4f} seconds")
print(f"Speedup:             {time_orig / time_opt:.2f}x")


Original VGG Loss:   0.048915944993
Optimized VGG Loss: 0.048915915191
Difference:          0.000000029802
Original Time:       18.4493 seconds
Optimized Time:      0.1753 seconds
Speedup:             105.24x
