In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
from PIL import Image
from tifffile import imread

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from PIL import Image
from sklearn.model_selection import train_test_split

from time import process_time

base_dir = '/content/drive/MyDrive/Honors Project'
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

Dataset Class

In [None]:
class PerlinNoiseDataset(Dataset):
    def __init__(self, x, y):
      self.x = torch.Tensor(x).float()
      self.y = torch.Tensor(y).long()

      # Take the logarithm of the tensor
      self.x = torch.log1p(self.x)

      # Normalize the tensor
      self.x = (self.x - self.x.mean()) / self.x.std()

    def __len__(self):
      """ Gets the number of images in the busses dataset.
          Returns:
            int : number of unique images in dataset
      """
      return len(self.x)

    def __getitem__(self, ix):
      """ Given an index, returns an image and its corresponding object bounding
          boxes with their associated labels.
          Args:
            idx (int) : index of busses data to retrieve
      """
      return self.x[ix].to(DEVICE), self.y[ix].to(DEVICE)

Evaluation Metric Function

In [None]:
def intersectionOverUnion(pred, label, num_classes=2):
    """ Computes intersection over union metric.
        Args:
          pred (tensor) : predicted segmentation
          label (tensor) : ground truth segmentation
          num_classes (int) : number of regions in the ground truth/prediction
        Returns:
          the average intersection over union for the batch
    """
    _, pred = torch.max(pred, 1)

    iou_list = []

    for cls in range(num_classes):
        pred_mask = (pred == cls)
        true_mask = (label == cls)

        intersection = (pred_mask & true_mask).float().sum((1, 2))
        union = (pred_mask | true_mask).float().sum((1, 2))

        iou = (intersection + 1e-6) / (union + 1e-6)
        iou_list.append(iou.mean().item())

    return np.mean(iou_list)

Loss Metric Function

In [None]:
def AveragePixelLoss(img, pred, target, eucdist_coef=0, ce_coef=1):
    """ Computes the Euclidean distance between the average pixel values of the
        perceived regions in the predicted segmentation. Adds this distance to
        the standard cross-entropy loss, with coefficients dictating the
        relative weight of each component into the final loss.
        Args:
          img (tensor) : original input image
          pred (tensor) : predicted segmentation
          target (tensor) : ground truth segmentation
          eucdis_coef (int) : weight of Euclidean distance component in final loss
          ce_coef (int) : weight of cross-entropy component in final loss
        Returns:
          loss value corresponding to the batch
    """
    # Regular cross-entropy loss
    cel = nn.CrossEntropyLoss()
    cross_entropy = cel(pred, target)

    # Using averaging
    sm = nn.Softmax(dim=1)
    pred_softmax = sm(pred)

    n_pixels = img.shape[2] * img.shape[3]

    # Calculate the average pixel loss
    pixel_losses = torch.empty(pred.shape[0])
    for batch in range(pred.shape[0]):
      m0 = torch.sum(pred_softmax[batch, 0, :, :] * img[batch, 0, :, :]) / n_pixels
      m1 = torch.sum(pred_softmax[batch, 1, :, :] * img[batch, 0, :, :]) / n_pixels
      pixel_losses[batch] = -((m0 - m1) ** 2)

    average_pixel_comp = pixel_losses.mean()

    return ce_coef * cross_entropy + euc_dist_coef * average_pixel_comp


Typical U-Net Blocks in U-Net

In [None]:
class conv_block(nn.Module):
    """ Helper block with convolutions to be used in U-Net.
    """
    def __init__(self, in_c, out_c, kernel_size=3):
        super().__init__()
        padding = (kernel_size - 1) // 2
        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size=kernel_size, padding=padding)
        self.bn1 = nn.BatchNorm2d(out_c)
        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size=kernel_size, padding=padding)
        self.bn2 = nn.BatchNorm2d(out_c)
        self.relu = nn.ReLU()

    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        return x


class encoder_block(nn.Module):
    """ Block with learnable convolutions and max pooling layers that is used
        to learn feature maps and reduce the spatial dimensions of the input.
        To be used in the encoder of a U-Net.
    """
    def __init__(self, in_c, out_c, kernel_size=3):
        super().__init__()
        self.conv = conv_block(in_c, out_c, kernel_size=kernel_size)
        self.pool = nn.MaxPool2d((2, 2))

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)
        return x, p


class decoder_block(nn.Module):
    """ Block with learnable transpose convolutions and max pooling layers that
        is used to reconstruct the original data and increase the spatial
        dimensions of the input. To be used in the decoder of a U-Net.
    """
    def __init__(self, in_c, out_c):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_c, out_c, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(out_c+out_c, out_c)
        self.conv_noskip = conv_block(out_c, out_c)

    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

In [None]:
class UShape(nn.Module):
  """ Class that can be used for all U-shaped architectures.
      Pass in in_c, out_c, encoder block, and decoder block to customize.
  """
  def __init__(self, encoder=encoder_block, decoder=decoder_block, in_c=1, out_c=2, hid1=64, hid2=128, hid3=256, bot=512, drop_p=0.5):
    super().__init__()
    """ Encoder """
    self.e1 = encoder(in_c, hid1)
    self.e2 = encoder(hid1, hid2)
    self.e3 = encoder(hid2, hid3)
    """ Bottleneck """
    self.b = nn.Sequential(
            conv_block(hid3, bot),
            nn.Dropout(drop_p)
    )
    """ Decoder """
    self.d1 = decoder(bot, hid3)
    self.d2 = decoder(hid3, hid2)
    self.d3 = decoder(hid2, hid1)
    """ Classifier """
    self.outputs = nn.Conv2d(hid1, out_c, kernel_size=1, padding=0)

  def forward(self, inputs):
    """ Encoder """
    s1, p1 = self.e1(inputs)
    s2, p2 = self.e2(p1)
    s3, p3 = self.e3(p2)
    """ Bottleneck """
    b = self.b(p3)
    """ Decoder """
    d1 = self.d1(b, s3)
    d2 = self.d2(d1, s2)
    d3 = self.d3(d2, s1)
    """ Classifier """
    outputs = self.outputs(d3)
    return outputs


class UNet(nn.Module):
  def __init__(self):
    super().__init__()
    self.ushape = UShape()

  def forward(self, inputs):
    outputs = self.ushape(inputs)
    return outputs

Typical U-Net

In [None]:
class UNet(nn.Module):
  """ Standard U-Net
  """
  def __init__(self):
    super().__init__()
    self.ushape = UShape()

  def forward(self, inputs):
    outputs = self.ushape(inputs)
    return outputs

Helper Functions/Classes for Model Running

In [None]:
def plot_train_test(training_set, testing_set, plot_type, model_name=""):
  """
  Plots loss over epochs.
  """
  plt.plot(training_set, label = "Training " + plot_type)
  plt.plot(testing_set, label = "Testing " + plot_type)
  plt.xlabel("Epoch")
  plt.ylabel(plot_type)
  plt.legend()
  if model_name:
    plt.savefig(f'{base_dir}/Results/Plots/{model_name}_{plot_type}.png')
  plt.show()
  plt.clf()

In [None]:
class ModelRunner():
  """Class for training/testing model"""
  def __init__(self, model, train_dl, test_dl, epochs=70, eucdist_coef=0, ce_coef=1):
    """ Initialized the modelrunner.
        Args:
          model (Pytorch model) : the U-Net to be trained
          train_dl (dataloader) : allows acccess to the training dataset
          test_dl (dataloader) : allows acccess to the testing dataset
          epochs (int) : the number of times to send the training data through
                         the model
    """
    self.LR = 1e-4
    self.EPOCHS = epochs
    self.optimizer = Adam(model.parameters(), lr=self.LR)

    self.model = model.to(DEVICE)
    self.train_dl = train_dl
    self.test_dl = test_dl

    self.training_losses = []
    self.testing_losses = []
    self.training_ious = []
    self.testing_ious = []
    self.train_time = 0

    self.LOSS_FUNC = UnsupervisedAveragePixelLoss
    self.euc_dist_coef = eucdist_coef
    self.ce_coef = ce_coef

  def train(self, do_print):
    """ Trains the model on the training dataset.
    """
    training_loss_per_epoch = []
    training_iou_per_epoch = []

    self.model.train()
    start_time = process_time()
    for data, labels in self.train_dl:
      data = data.unsqueeze(1)
      labels = labels.clone().detach().long()

      self.optimizer.zero_grad()
      y_pred = self.model(data)
      loss = self.LOSS_FUNC(img, pred, target, eucdist_coef=self.eucdist_coef, ce_coef=self.ce_coef)
      iou = intersectionOverUnion(y_pred, labels)
      loss.backward()
      self.optimizer.step()

      training_loss_per_epoch.append(loss.item())
      training_iou_per_epoch.append(iou)

    end_time = process_time()
    self.train_time += (end_time - start_time)

    avg_loss = np.mean(training_loss_per_epoch)
    avg_iou = np.mean(training_iou_per_epoch)
    self.training_losses.append(avg_loss)
    self.training_ious.append(avg_iou)
    if do_print:
        print("avg train loss (Unsupervised Average Pixel Loss):", avg_loss)
        print("avg train IOU:", avg_iou)

  def test(self, do_print):
    """ Tests the model on the testing dataset.
    """
    testing_loss_per_epoch = []
    testing_iou_per_epoch = []
    self.model.eval()
    for data, labels in self.test_dl:
      data = data.unsqueeze(1)
      labels = labels.clone().detach().long()

      with torch.no_grad():
        y_pred = self.model(data)

        loss = self.LOSS_FUNC(img, pred, target, eucdist_coef=self.eucdist_coef, ce_coef=self.ce_coef)

        iou = intersectionOverUnion(y_pred, labels)
        testing_iou_per_epoch.append(iou)
        testing_loss_per_epoch.append(loss.item())

    avg_loss = np.mean(testing_loss_per_epoch)
    avg_iou = np.mean(testing_iou_per_epoch)
    self.testing_losses.append(avg_loss)
    self.testing_ious.append(avg_iou)
    if do_print:
        print("avg test loss (Unsupervised Average Pixel):", avg_loss)
        print("avg test IOU:", avg_iou)

  def display_results(self):
    """ Plots the training/testing losses and IOUs over the training cycle
        and prints the final epoch's average loss, IOU, and training time.
    """
    final_loss = self.testing_losses[-1]
    final_iou = self.testing_ious[-1]
    train_time = round(self.train_time, 2)

    plot_train_test(self.training_losses, self.testing_losses, "Cross Entropy Loss")
    plot_train_test(self.training_ious, self.testing_ious, "IOU")
    print(f"Final test loss (Unsupervised Average Pixel) is {final_loss}")
    print(f"Final test IOU is {final_iou}")
    print(f"\nTotal training time is {train_time} seconds\n")

  def get_trained_model(self):
    """ Returns the trained U-Net Model.
    """
    return self.model

  def run_loop(self):
    """ Trains and tests for a given number of epochs. Then, displays the results
        of the training.
    """
    for epoch in range(self.EPOCHS):
      do_print = epoch % 50 == 0
      if do_print:
        print(f"Running epoch {epoch + 1} out of {self.EPOCHS}")
      self.train(do_print)
      self.test(do_print)
    self.display_results()

In [None]:
# Load data
image_file_path = f'{base_dir}/Data/combined_alphas.npz'
image_data = np.load(image_file_path)

# Get the total number of data points
total_data_points = len(image_data['data'])

# extract data
loaded_data = image_data['data']
loaded_labels = image_data['labels']
alpha1s = image_data['alpha1s']
alpha2s = image_data['alpha2s']

print(f"Loaded {len(alpha1s)} data points")

# Split data into train and test sets
X_train, X_test, y_train, y_test, alpha1s_train, alpha1s_test, alpha2s_train, alpha2s_test = \
    train_test_split(loaded_data, loaded_labels, alpha1s, alpha2s, test_size=0.2, random_state=42)

BATCH_SIZE = 8
train_dataset = PerlinNoiseDataset(X_train, y_train)
train_dl = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataset = PerlinNoiseDataset(X_test, y_test)
test_dl = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

Loaded 825 data points


In [None]:
def run_model(model, model_name, euc_coef=0, ce_coef=1, save_model=False):
  print(model_name)
  model_runner = ModelRunner(model, train_dl, test_dl, epochs=70, eucdist_coef=euc_coef, ce_coef=ce_coef)
  model_runner.run_loop()
  trained_model = model_runner.get_trained_model()
  do_save = ""
  if save_model:
    torch.save(trained_model.state_dict(), f'{base_dir}/Models/{model_name}.pth')

    #write results to csv file
    final_loss = model_runner.testing_losses[-1]
    final_iou = model_runner.testing_ious[-1]
    train_time = round(model_runner.train_time, 2)
    file_path = f'{base_dir}/Results/quantitative_results_pixelaveraging.csv'
    df = pd.read_csv(file_path)
    mask = df['Model Name'] == model_name
    if mask.any():
        df.loc[mask, ['Final Test Cross Entropy Loss', 'Final Test IOU', 'Training Time (s)']] = [final_loss, final_iou, train_time]
    else:
        df.loc[len(df)] = [model_name, final_loss, final_iou, train_time]
    df.to_csv(file_path, index=False)
    do_save = model_name

  #plot losses/ious and save
  plot_train_test(model_runner.training_losses, model_runner.testing_losses, "Unsupervised Pixel Averaging Loss", do_save)
  plot_train_test(model_runner.training_ious, model_runner.testing_ious, "IOU", do_save)

Supervised Training

In [None]:
model = UNet()
run_model(model, "UNet_losscoef_1e-3", euc_coef=.001, save_model=True)
model = UNet()
run_model(model, "UNet_losscoef_1e-2", euc_coef=.01, save_model=True)
model = UNet()
run_model(model, "UNet_losscoef_1e-1", euc_coef=.1, save_model=True)
model = UNet()
run_model(model, "UNet_losscoef_1", euc_coef=1, save_model=True)
model = UNet()
run_model(model, "UNet_losscoef_1e1", euc_coef=10, save_model=True)
model = UNet()
run_model(model, "UNet_losscoef_1e2", euc_coef=100, save_model=True)

Unsupervised Training

In [None]:
model = UNet()
run_model(model, "UNet_unsupervised_averagepixel_loss", euc_coef=1, ce_coef=0, save_model=True)

UNet_unsupervised_averagepixel_loss


KeyboardInterrupt: 

Code to Test with Synthetic Data

In [None]:
def make_plots(data, labels, trained_model, alpha1s, alpha2s, counter=0, model_name=""):
    y_pred = trained_model(data.unsqueeze(1))
    #_, y_pred = torch.max(y_pred, 1) #turn into segmentation mask
    for i in range(len(data)):
        d = data[i]
        l = labels[i]
        y = y_pred[i]
        _, y = torch.max(y, 0)
        alpha1 = alpha1s[i]
        alpha2 = alpha2s[i]

        print(f"alpha1 = {alpha1}, alpha2 = {alpha2}")
        #print(d.min(), d.max())

        fig, axes = plt.subplots(1, 3, figsize=(12, 4))
        # Display the original image
        d = d.cpu().numpy()
        axes[0].imshow(d, cmap='gray')
        axes[0].set_title('Original Image')
        axes[0].axis('off')

        # Display prediction
        # y = (y >= 0.5).float()
        y = y.squeeze(0).cpu().detach().numpy()
        axes[1].imshow(y, cmap='gray')#, vmin=0, vmax=1)
        axes[1].set_title('Pred')
        axes[1].axis('off')

        # Display Label
        l = l.cpu().numpy()
        axes[2].imshow(l, cmap='gray')
        axes[2].set_title('Label')
        axes[2].axis('off')

        if model_name:
          plt.savefig(f'{base_dir}/Results/Segmentations/{model_name}_Validation_{counter}.png')
          plt.show()
          plt.clf()
        plt.show()

def show_results(trained_model, model_name=""):
    BATCH_SIZE = 8
    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    n_images = 5

    train_dataset = PerlinNoiseDataset(X_train[:n_images], y_train[:n_images])
    train_dl = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_dataset = PerlinNoiseDataset(X_test[:n_images], y_test[:n_images])
    test_dl = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # load pretrained model
    trained_model = trained_model.to(DEVICE)
    trained_model.eval()

    with torch.no_grad():
        # print("Training Examples:")
        # for data, labels in train_dl:
        #     make_plots(data, labels, trained_model, alpha1s_train[:n_images], alpha2s_train[:n_images])

        print("\n\nTesting Examples:")
        counter=1
        for data, labels in test_dl:
            make_plots(data, labels, trained_model, alpha1s_test[:n_images], alpha2s_test[:n_images], counter, model_name)
            counter += 1

In [None]:
def test_model(model, model_name, test_function):
  print(f'{model_name}:')
  model.load_state_dict(torch.load(f'{base_dir}/Models/{model_name}.pth', map_location=torch.device(DEVICE)))
  test_function(model, model_name)

model = UNet()
test_model(model, "UNet_losscoef_1e-3", show_results)
model = UNet()
test_model(model, "UNet_losscoef_1e-2", show_results)
model = UNet()
test_model(model, "UNet_losscoef_1e-1", show_results)
model = UNet()
test_model(model, "UNet_losscoef_1", show_results)
model = UNet()
test_model(model, "UNet_losscoef_1e1", show_results)
model = UNet()
test_model(model, "UNet_losscoef_1e2", show_results)


model = UNet()
test_model(model, "UNet_unsupervised_averagepixel_loss", show_results)