<a href="https://colab.research.google.com/github/billsioros/thesis/blob/master/Nanorough_surface_Super_resolution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ✔️ Prerequisites

First of all we need to take care of a few **prerequisites**, most notably:

- Install the various pip modules that we will be using.
- Install some linux specific dependencies of our [content loss](#content-loss).
- Initialize the Random Number Generator(s), so that our experiments can be replicated.
- Determine:
  - The current working directory, as it's going to be used to reference various files such as the dataset, our model checkpoints e.t.c
  - The available hardware backend. GPU utilization is preferable, as it results in higher complition time.
- `(Optionally)` Mount Google Drive, where we can load our dataset from.

## Installing [graphviz](https://graphviz.org/) & [libgraphviz-dev](https://packages.debian.org/jessie/libgraphviz-dev)

The aforementioned packages are required by [PyINSECT](https://github.com/billsioros/PyINSECT/tree/implementing-HPGs) and more specifically its graph plotting methods.

In [1]:
!sudo apt-get install graphviz libgraphviz-dev

Reading package lists... Done
Building dependency tree       
Reading state information... Done
graphviz is already the newest version (2.40.1-2).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'sudo apt autoremove' to remove it.
The following additional packages will be installed:
  libgail-common libgail18 libgtk2.0-0 libgtk2.0-bin libgtk2.0-common
  libgvc6-plugins-gtk libxdot4
Suggested packages:
  gvfs
The following NEW packages will be installed:
  libgail-common libgail18 libgraphviz-dev libgtk2.0-0 libgtk2.0-bin
  libgtk2.0-common libgvc6-plugins-gtk libxdot4
0 upgraded, 8 newly installed, 0 to remove and 34 not upgraded.
Need to get 2,120 kB of archives.
After this operation, 7,128 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic/main amd64 libgtk2.0-common all 2.24.32-1ubuntu1 [125 kB]
Get:2 http://archive.ubuntu.com/ubuntu bionic/main amd64 libgtk2.0-0 amd64 2.24.32-1ubuntu1 [1,76

## Installing the required `pip` modules

- [torch](https://pytorch.org/) is our machine learning framework of choice.
- [numpy](https://numpy.org/), [sympy](https://www.sympy.org/en/index.html) and [scipy](https://www.scipy.org/) are used to in the context of nanorough surface generation.
- [plotly](https://plotly.com/) (which requires [pandas](https://pandas.pydata.org/)) as well as [matplotlib](https://matplotlib.org/) are used in order to plot various graphs.

In [2]:
!pip install torch numpy sympy scipy plotly pandas sklearn matplotlib==3.1.1 git+https://github.com/billsioros/PyINSECT.git@implementing-HPGs

Collecting git+https://github.com/billsioros/PyINSECT.git@implementing-HPGs
  Cloning https://github.com/billsioros/PyINSECT.git (to revision implementing-HPGs) to /tmp/pip-req-build-vdly_9fu
  Running command git clone -q https://github.com/billsioros/PyINSECT.git /tmp/pip-req-build-vdly_9fu
  Running command git checkout -b implementing-HPGs --track origin/implementing-HPGs
  Switched to a new branch 'implementing-HPGs'
  Branch 'implementing-HPGs' set up to track remote branch 'implementing-HPGs' from 'origin'.
Collecting matplotlib==3.1.1
[?25l  Downloading https://files.pythonhosted.org/packages/19/7a/60bd79c5d79559150f8bba866dd7d434f0a170312e4d15e8aefa5faba294/matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl (13.1MB)
[K     |████████████████████████████████| 13.1MB 219kB/s 
Collecting pydot>=1.4.2
  Downloading https://files.pythonhosted.org/packages/ea/76/75b1bb82e9bad3e3d656556eaa353d8cd17c4254393b08ec9786ac8ed273/pydot-1.4.2-py2.py3-none-any.whl
Collecting pygraphviz>=1.7


## Initializing (a.k.a `Seeding`) the Random Number Generator(s)

We are required to seed various random number generation engines, so that our experiments can be replicated on a later date.

In [3]:
SEED = 1234

In [4]:
import torch
import numpy as np
import random
import os

if SEED is not None:
  np.random.seed(SEED)
  random.seed(SEED)
  torch.manual_seed(SEED)
  torch.cuda.manual_seed(SEED)
  torch.backends.cudnn.deterministic = True
  os.environ['PYTHONHASHSEED'] = str(SEED)

## Determining the Current Working Directory

In [5]:
from pathlib import Path

BASE_DIR = Path.cwd()

## Mounting Google Drive

In [6]:
GDRIVE_DIR = BASE_DIR / 'drive'

In [7]:
from google.colab import drive

drive.mount(f'{GDRIVE_DIR}')

Mounted at /content/drive


## Determining available backend

By default, we are going to be utilizing the available CPU backend, if no GPU is available.

In [8]:
device = "cpu"
if torch.cuda.is_available():
  device = "cuda:0"

In [9]:
device = torch.device(device)

# 🚙 General Purpose Utilities

Here we will be defining any *general purpose* functions and classes, that we are going to be using in the following sections. At the moment, we have only defined two [decorator](https://realpython.com/primer-on-python-decorators/) functions:

## A debugging decorator

The following decorator serves at emitting details regarding the decorated function's calls. In more detai, the information emitted is:
- The function's name.
- Its positional and keyword arguements for the function call at hand.
- Any exception that the function `raises`.

In addition to that, the `debug` decorator passes a special boolean keyword arguement by the name `debug`, if and only if it is included in the function signature. You can then utilize this arguement inside the decorated function and emit additional information.

In [10]:
from functools import wraps
import inspect

def debug(method):
    signature = inspect.signature(method)

    defaults = {
      k: v.default
      for k, v in signature.parameters.items()
      if v.default is not inspect.Parameter.empty
    }

    @wraps(method)
    def wrapper(*args, **kwargs):
        called_with = ''
        if args:
            called_with += ', '.join(str(x) for x in args)
            called_with += ', '

        called_with += ', '.join(f"{x}={kwargs.get(x, defaults[x])}" for x in defaults.keys())

        if 'debug' in defaults and 'debug' not in kwargs:
          kwargs['debug'] = True

        try:
          rv = method(*args, **kwargs)
        except Exception as e:
          print(f"{method.__name__}({called_with}) raised {e}")
          raise

        print(f"{method.__name__}({called_with}) returned {rv}")

        return rv

    return wrapper

## A benchmarking decorator

The following decorator aims at calculating the decorated function's execution time and is used to benchmark our various approaches and assist us in coming up with a comprehensive comparison of their efficiency.

In [11]:
from functools import wraps
from time import time

def benchmark(method):
  @wraps(method)
  def wrapper(*args, **kwargs):
    beg = time()
    rv = method(*args, **kwargs)
    end = time()

    print("%s returned after %7.3f seconds" % (method.__name__, (end - beg)))

    return rv

  return wrapper

# 🔢 Metrics

Here we define various metrics that are going to be used throughout our implementation.

## Correlation

In [12]:
def correlation(z_ngs):
  N = z_ngs.shape[0]
  
  rdif, hhcf1d = np.arange(N // 2), np.zeros(N // 2)

  for ndif in range(N // 2):
    surf1 = z_ngs[:N, :(N - ndif)]
    surf2 = z_ngs[:N, ndif:N]
    difsur2 = (surf1 - surf2) ** 2
    hhcf1d[ndif] = np.sqrt(np.mean(np.mean(difsur2)))
  
  return rdif, hhcf1d

# 📈 Plotting Utilities


Here we define our plotting mechanisms. We are going to be using both [plotly](https://plotly.com/) as well as [matplotlib](https://matplotlib.org/), depending on the situation. `plotly`'s greatest benefit over `matplotlib` is that it produces interactible graphs, where you can zoom in and pan around by default.

## Plotting the correlation

In [13]:
import plotly.express as px

def plot_correlation(array):
  x, y = correlation(array)
  
  fig = px.line(
    # title="1-D height-height correlation function",
    # x="r(nm)", y="G(r) (nm)",
    x=x, y=y,
    log_x=True, log_y=True
  )
  
  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting a surface as a 3D surface

In [14]:
import plotly.graph_objects as go

def as_3d_surface(array, autosize=False):
  fig = go.Figure(data=[go.Surface(z=array)])

  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting a surface as a grayscale image

In [15]:
import plotly.express as px

def as_grayscale_image(array):
  fig = px.imshow(array, color_continuous_scale='gray')
  fig.update_layout(coloraxis_showscale=False)
  fig.update_xaxes(showticklabels=False)
  fig.update_yaxes(showticklabels=False)

  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting two distributions against each other

In [16]:
import matplotlib.pyplot as plt

def plot_against(first, second, title="", xlabel="", ylabel="", labels=("", "")):
  x = list(range(max(len(first), len(second))))

  plt.plot(x, first, label=labels[0])
  plt.plot(x, second, label=labels[1])

  plt.grid()
  plt.title(title)
  plt.xlabel(xlabel)
  plt.ylabel(ylabel)

  plt.xlim([min(x), max(x)])

  plt.legend()

  plt.show()

# ⛰️ Surface Generation

## The base class `SurfaceGenerator`

In [17]:
from abc import ABC, abstractmethod

import numpy as np
import sympy
from scipy import stats

class SurfaceGenerator(ABC):
    def __init__(
        self, n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha
    ):
        self.n_points = n_points
        self.rms = rms
        self.skewness = skewness
        self.kurtosis = kurtosis
        self.corlength_x = corlength_x
        self.corlength_y = corlength_y
        self.alpha = alpha

        self._mean = 0
        self._length = 0

    def __str__(self):
        return f"{self.__class__.__name__}({self.n_points}, {self.rms}, {self.skewness}, {self.kurtosis}, {self.corlength_x}, {self.corlength_y}, {self.alpha})"

    def __repr__(self):
        return f"<{self}>"

    def __call__(self, length):
        self._length = length

        return self

    def __len__(self):
        return self._length

    def __iter__(self):
        for _ in range(self._length):
            yield self.generate_surface()

    def sort(self, elements):
        indices = np.argsort(elements, axis=0)

        return elements[indices], indices

    @abstractmethod
    def autocorrelation(self, tx, ty):
        raise NotImplementedError

    def generate_surface(self):
        # 1st step: Generation of a Gaussian surface

        # Determine the autocorrelation function R(tx,ty)
        R = np.zeros((self.n_points, self.n_points))

        txmin = -self.n_points // 2
        txmax = self.n_points // 2

        tymin = -self.n_points // 2
        tymax = self.n_points // 2

        dtx = (txmax - txmin) // self.n_points
        dty = (tymax - tymin) // self.n_points

        for tx in range(txmin, txmax, dtx):
            for ty in range(tymin, tymax, dty):
                R[tx + txmax, ty + tymax] = self.autocorrelation(tx, ty)

        # According to the Wiener-Khinchine theorem FR is the power spectrum of the desired profile
        FR = np.fft.fft2(R, s=(self.n_points, self.n_points))
        AMPR = np.sqrt(dtx ** 2 + dty ** 2) * abs(FR)

        # 2nd step: Generate a white noise, normalize it and take its Fourier transform
        X = np.random.rand(self.n_points, self.n_points)
        aveX = np.mean(np.mean(X))

        dif2X = (X - aveX) ** 2
        stdX = np.sqrt(np.mean(np.mean(dif2X)))
        X = X / stdX

        XF = np.fft.fft2(X, s=(self.n_points, self.n_points))

        # 3nd step: Multiply the two Fourier transforms
        YF = XF * np.sqrt(AMPR)

        # 4th step: Perform the inverse Fourier transform of YF and get the desired surface
        zaf = np.fft.ifft2(YF, s=(self.n_points, self.n_points))
        z = np.real(zaf)

        avez = np.mean(np.mean(z))
        dif2z = (z - avez) ** 2
        stdz = np.sqrt(np.mean(np.mean(dif2z)))
        z = ((z - avez) * self.rms) / stdz

        # Define the fraction of the surface to be analysed
        xmin = 0
        xmax = self.n_points
        ymin = 0
        ymax = self.n_points
        z_gs = z[xmin:xmax, ymin:ymax]

        # 2nd step: Generation of a non-Gaussian noise NxN
        z_ngn = stats.pearson3.rvs(
            self.skewness,
            loc=self._mean,
            scale=self.rms,
            size=(self.n_points, self.n_points),
        )

        # as_grayscale_image(z_ngn)
        # 3rd step: Combination of z_gs with z_ngn to output a z_ms
        v_gs = z_gs.flatten(order="F")
        v_ngn = z_ngn.flatten(order="F")

        Igs = np.argsort(v_gs)

        vs_ngn = np.sort(v_ngn)

        v_ngs = np.zeros_like(vs_ngn)
        v_ngs[Igs] = vs_ngn

        z_ngs = np.asmatrix(v_ngs.reshape(self.n_points, self.n_points, order="F")).H

        return z_ngs

## A simple non-gaussian surface generator

In [18]:
class NonGaussianSurfaceGenerator(SurfaceGenerator):
    def __init__(self, n_points=128, rms=1, skewness=0, kurtosis=3, corlength_x=4, corlength_y=4, alpha=1):
        super().__init__(n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha)

    def autocorrelation(self, tx, ty):
        return ((self.rms ** 2) * np.exp(-(abs(np.sqrt((tx / self.corlength_x) ** 2 + (ty / self.corlength_y) ** 2))) ** (2 * self.alpha)))

### Example

In [19]:
generate = NonGaussianSurfaceGenerator()

In [20]:
for surface in generate(1):
  as_grayscale_image(surface)
#   as_3d_surface(surface)
#   plot_correlation(surface)

## A Besel function based non-gaussian surface generator

In [21]:
class BeselNonGaussianSurfaceGenerator(NonGaussianSurfaceGenerator):
    def __init__(self, n_points=128, rms=1, skewness=0, kurtosis=3, corlength_x=4, corlength_y=4, alpha=1, beta_x=1, beta_y=1):
        super().__init__(n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha)

        self.beta_x, self.beta_y = beta_x, beta_y

    def autocorrelation(self, tx, ty):
        return super().autocorrelation(tx, ty) * sympy.besselj(0, (2 * np.pi * np.sqrt((tx / self.beta_x) ** 2 + (ty / self.beta_y) **2)))

### Example

In [22]:
besel_generate = BeselNonGaussianSurfaceGenerator(128, 1, 0, 3, 16, 16, 0.5, 4000, 4000)

In [23]:
for surface in besel_generate(1):
  as_grayscale_image(surface)
#   as_3d_surface(surface)
#   plot_correlation(surface)

# 🔄 Dataset Loading and Preprocessing

## Defining the preprocessing pipeline

In [24]:
from abc import ABC, abstractmethod

class Transform(ABC):
  def __init__(self, *args, **kwargs):
    pass
  
  @abstractmethod
  def __call__(self, *args, **kwargs):
    raise NotImplementedError

In [25]:
from torch import flatten

class Flatten(Transform):
  def __call__(self, tensor):
    return flatten(tensor)

In [26]:
class To(Transform):
  def __init__(self, device):
    self.device = device

  def __call__(self, tensor):
    return tensor.to(self.device)

In [27]:
class Normalize(Transform):
  def callback(self, dataset):
    self.min = torch.min(dataset.surfaces).item()
    self.max = torch.max(dataset.surfaces).item()

  def __call__(self, tensor):
    if self.max - self.min > 0:
      return (tensor - self.min) / (self.max - self.min)

    return torch.zeros(tensor.size())

In [28]:
class View(Transform):
  def __init__(self, *args):
    self.args = args
  
  def __call__(self, tensor):
    return tensor.view(*self.args)

## Defining the `Dataset` loading procedure

## The base `NanoroughSurfaceDataset` class

In [29]:
from torch.utils.data.dataset import  Dataset
import torch
import numpy as np

class NanoroughSurfaceDataset(Dataset):
  """A dataset of pre-generated nanorough surfaces"""
  def __init__(self, surfaces, subsampling_factor=4, transforms=[]):
    self.surfaces = np.array(surfaces)
    self.surfaces = torch.from_numpy(self.surfaces)

    self.subsampling_factor = subsampling_factor
    self.subsampling_value = int(surfaces[0].shape[1] / subsampling_factor)

    self.transforms = transforms

    for transform in self.transforms:
      if hasattr(transform, 'callback'):
        transform.callback(self)

  def __len__(self):
    return len(self.surfaces)
  
  def __getitem__(self, idx):
    surface = self.surfaces[idx]

    for transform in self.transforms:
      surface = transform(surface)

    return surface

## A dataset of pre-generated nanorough surfaces in `.mat` format

In [30]:
import scipy.io as sio

class NanoroughSurfaceMatLabDataset(NanoroughSurfaceDataset):
  """A dataset of pre-generated nanorough surfaces in `.mat` format"""
  def __init__(self, surface_dir, subsampling_factor=4, variable_name='data', transforms=[]):
    assert surface_dir.is_dir(), "%s does not exist or is not a dictionary" % (surface_dir,)

    surfaces = []
    for file in surface_dir.iterdir():
      if file.is_dir() or file.suffix != '.mat':
        continue

      surfaces.append(self.from_matlab(file, variable_name))

    super().__init__(surfaces, subsampling_factor=subsampling_factor, transforms=transforms)

  @classmethod
  def from_matlab(cls, path_to_mat, variable_name):
    matlab_array = sio.loadmat(str(path_to_mat))
    numpy_array = matlab_array[variable_name]
    
    return numpy_array

# 🏋️ Training and Testing Utilities

## Performing a single training epoch

In [31]:
def per_epoch(generator, discriminator, dataloader, optimizer_generator, optimizer_discriminator, criterion, content_loss=None, loss_weights=None, log_every_n=None, debug=False):
  generator.train()

  if content_loss is None:
    content_loss_weight, criterion_weight = 0, 1
  else:
    content_loss_weight, criterion_weight = loss_weights

  generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake = 0, 0, 0, 0
  for train_iteration, X_batch in enumerate(dataloader):
    if log_every_n is not None and not train_iteration % log_every_n:
      print(f"Training Iteration #{train_iteration:04d}")

    # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
    ## Train with all-real batch
    discriminator.zero_grad()
    # Format batch
    label = torch.full((X_batch.size(0),), 1, dtype=X_batch.dtype, device=X_batch.device)
    # Forward pass real batch through D
    output = discriminator(X_batch, debug=debug).view(-1)
    # Calculate loss on all-real batch
    discriminator_error_real = criterion(output, label)
    # Calculate gradients for D in backward pass
    discriminator_error_real.backward()
    discriminator_output_real_batch = output.mean().item()

    ## Train with all-fake batch
    # Generate batch of latent vectors
    noise = torch.randn(X_batch.size(0), *generator.feature_dims, dtype=X_batch.dtype, device=X_batch.device)
    # Generate fake image batch with G
    fake = generator(noise, debug=debug)
    label.fill_(0)
    # Classify all fake batch with D
    output = discriminator(fake.detach(), debug=debug).view(-1)
    # Calculate D's loss on the all-fake batch
    discriminator_error_fake = criterion(output, label)
    # Calculate the gradients for this batch
    discriminator_error_fake.backward()
    # Add the gradients from the all-real and all-fake batches
    discriminator_error_total = discriminator_error_real + discriminator_error_fake
    # Update D
    optimizer_discriminator.step()

    # (2) Update G network: maximize log(D(G(z)))
    generator.zero_grad()
    label.fill_(1)  # fake labels are real for generator cost
    # Since we just updated D, perform another forward pass of all-fake batch through D
    output = discriminator(fake, debug=debug).view(-1)
    # Calculate G's loss based on this output
    if content_loss_weight <= 0:
      discriminator_error_fake = criterion(output, label)
    else:
      discriminator_error_fake = content_loss_weight * content_loss(fake.cpu().detach().numpy()) + criterion_weight * criterion(output, label)
    # Calculate gradients for G, which propagate through the discriminator
    discriminator_error_fake.backward()
    discriminator_output_fake_batch = output.mean().item()
    # Update G
    optimizer_generator.step()

    generator_loss += discriminator_error_fake / len(dataloader)
    discriminator_loss += discriminator_error_total / len(dataloader)
    discriminator_output_real += discriminator_output_real_batch / len(dataloader)
    discriminator_output_fake += discriminator_output_fake_batch / len(dataloader)
    
  return generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake

## Spliting the original dataset into Training and Testing subsets

In [32]:
from torch.utils.data import random_split

def train_test_split(dataset, train_ratio=0.8):
  train_size = int(len(dataset) * train_ratio)
  test_size = len(dataset) - train_size

  train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

  return train_dataset, test_dataset

## Configuring the Training and Testing `DataLoader`s

In [33]:
from torch.utils.data import DataLoader

def train_test_dataloaders(dataset, train_ratio=0.8, **kwargs):
  train_dataset, test_dataset = train_test_split(dataset, train_ratio=train_ratio)

  train_dataloader = DataLoader(train_dataset, **kwargs)
  test_dataloader = DataLoader(test_dataset, **kwargs)

  return train_dataloader, test_dataloader

## The Training Manager

In [34]:
class Configuration:
  def __init__(self, **kwargs):
    for key, value in kwargs.items():
      if isinstance(value, dict):
        value = Configuration(**value)

      setattr(self, key, value)
  
  def to_dict(self):
    rv = {}
    for key, value in self.__dict__.items():
      if isinstance(value, Configuration):
        value = value.to_dict()
      
      rv[key] = value
    
    return rv
  
  def __str__(self):
    return str(self.to_dict())

  def __repr__(self):
    return f"<{self.__class__.__name__} '{str(self)}'>"

In [35]:
from torch.optim import Adam
from functools import partial

class TrainingManager(Configuration):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

    if not hasattr(self, "debug"):
      self.debug = False

    if not hasattr(self, "verbose"):
      self.verbose = False

    if not hasattr(self, "benchmark"):
      self.benchmark = False

    if not hasattr(self, "log_every_n"):
      self.log_every_n = None
    
    if not hasattr(self, "checkpoint_dir"):
      self.checkpoint_dir = None
    
    if self.checkpoint_dir is not None:
      if not hasattr(self, "checkpoint_multiple"):
        self.checkpoint_multiple = False
    
    if not hasattr(self, "content_loss"):
      self.content_loss = None

    if isinstance(self.criterion, tuple):
      self.criterion, self.criterion_weight = self.criterion
    else:
      self.criterion_weight = 0.5
  
    if isinstance(self.content_loss, tuple):
      self.content_loss, self.content_loss_weight = self.content_loss
    else:
      self.content_loss_weight = 0.5

  def __call__(self, generator, discriminator, dataset):
    train_dataloader, test_dataloader = train_test_dataloaders(dataset, train_ratio=self.train_ratio, **self.dataloader.to_dict())

    optimizer_generator = Adam(generator.parameters(), **self.optimizer.to_dict())
    optimizer_discriminator = Adam(discriminator.parameters(), **self.optimizer.to_dict())

    train_epoch_f = self.train_epoch

    if self.benchmark is True:
      train_epoch_f = benchmark(train_epoch_f)

    if self.debug is True:
      tmp = train_epoch_f
      if self.verbose is True:
        train_epoch_f = debug(tmp)
      else:
        train_epoch_f = lambda *args, **kwargs: tmp(*args, **{**kwargs, "debug": True})

    generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes = [], [], [], []
    for epoch in range(self.n_epochs):
      generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake = train_epoch_f(
        generator, discriminator,
        train_dataloader,
        optimizer_generator, optimizer_discriminator,
        self.criterion,
        content_loss=self.content_loss, loss_weights=(self.content_loss_weight, self.criterion_weight),
        log_every_n=self.log_every_n
      )
      
      if self.checkpoint_dir is not None and (not generator_losses or generator_loss < min(generator_losses)):
        generator_mt, discriminator_mt = f'{generator.__class__.__name__}', f'{discriminator.__class__.__name__}'
        
        if self.checkpoint_multiple is True:
          generator_mt += f'_{epoch:03d}'
          discriminator_mt += f'_{epoch:03d}'

        generator_mt += '.mt'
        discriminator_mt += '.mt'

        torch.save(generator.state_dict(), self.checkpoint_dir / generator_mt)
        torch.save(discriminator.state_dict(), self.checkpoint_dir / discriminator_mt)

      generator_losses.append(generator_loss)
      discriminator_losses.append(discriminator_loss)
      discriminator_output_reals.append(discriminator_output_real)
      discriminator_output_fakes.append(discriminator_output_fake)
      
      if self.verbose is True:
        print("Epoch: %02d, Generator Loss: %7.3f, Discriminator Loss: %7.3f" % (epoch, generator_loss, discriminator_loss))
        print("Epoch: %02d, Discriminator Output: [Real: %7.3f, Fake: %7.3f]" % (epoch, discriminator_output_real, discriminator_output_fake))
    
    return generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes

<a name="content-loss"></a>
# 💸 Designing a content loss function

## An **n-gram graph** implementation targeting floating point data

In [36]:
from pyinsect.documentModel.representations.DocumentNGramGraph import DocumentNGramGraph

class FloatDocumentNGramGraph(DocumentNGramGraph):
  def addEdgeInc(self,a,b,w=1):
    #A = repr(a)#str(a)
    #B = repr(b)#str(b)
    #merging can also be done in other ways
    #add an extra class variable
    A = tuple(a)
    B = tuple(b)
    if (A,B) in self._edges:
        edata = self._Graph.get_edge_data(A, B)
        # DEBUG LINES
        # print "updating edge between (",A,B,")"
        # print "to weight",(edata['weight']+1)

        r = edata['weight']+w
    else:
        # DEBUG LINES
        # print "adding edge between (",A,B,")"

        r = w
    # update/add edge weight
    self.setEdge(A, B, r)

### Monkey Patching `NGramGraphCollector` #FIXME

In [37]:
DocumentNGramGraph.addEdgeInc = FloatDocumentNGramGraph.addEdgeInc

## Quantizing our input data

The `Quantizer` sub-classes are responsible for quantizing our input data consisting of floating point values. These floating point values are going to serve as symbols for the **n-gram graph representation** and having an infinite amount of symbols will do us no good.

In [38]:
from abc import ABC, abstractstaticmethod

class Quantizer(Configuration):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)
  
  @abstractstaticmethod
  def __call__(self, tensor):
    raise NotImplementedError

In [39]:
from sklearn.preprocessing import KBinsDiscretizer

class KBinsDiscretizerQuantizer(Configuration):
  def __init__(self, surfaces, **kwargs):
    if 'encode' not in kwargs:
      kwargs['encode'] = 'ordinal'

    self.underlying = KBinsDiscretizer(**kwargs)
    self.surfaces = self.underlying.fit_transform(surfaces)

  def __call__(self, tensor):
    return self.underlying.transform(tensor)
  
  def __str__(self):
    return str({
        'underlying': self.underlying,
        'shape': self.surfaces.shape
    })

### Example Usage

In [40]:
tensors = torch.rand(10, 4, 4).reshape(10, -1)

In [41]:
tensors

tensor([[0.0290, 0.4019, 0.2598, 0.3666, 0.0583, 0.7006, 0.0518, 0.4681, 0.6738,
         0.3315, 0.7837, 0.5631, 0.7749, 0.8208, 0.2793, 0.6817],
        [0.2837, 0.6567, 0.2388, 0.7313, 0.6012, 0.3043, 0.2548, 0.6294, 0.9665,
         0.7399, 0.4517, 0.4757, 0.7842, 0.1525, 0.6662, 0.3343],
        [0.7893, 0.3216, 0.5247, 0.6688, 0.8436, 0.4265, 0.9561, 0.0770, 0.4108,
         0.0014, 0.5414, 0.6419, 0.2976, 0.7077, 0.4189, 0.0655],
        [0.8839, 0.8083, 0.7528, 0.8988, 0.6839, 0.7658, 0.9149, 0.3993, 0.1100,
         0.2541, 0.4333, 0.4451, 0.4966, 0.7865, 0.6604, 0.1303],
        [0.3498, 0.3824, 0.8043, 0.3186, 0.2908, 0.4196, 0.3728, 0.3769, 0.0108,
         0.9455, 0.7661, 0.2634, 0.1880, 0.5174, 0.7849, 0.1412],
        [0.3112, 0.7091, 0.1775, 0.4443, 0.1230, 0.9638, 0.7695, 0.0378, 0.2239,
         0.6772, 0.5274, 0.6325, 0.0910, 0.2323, 0.7269, 0.1187],
        [0.3951, 0.7199, 0.7595, 0.5311, 0.6449, 0.7224, 0.4416, 0.3634, 0.8818,
         0.9874, 0.7316, 0.2814, 0.06

In [42]:
quantizer = KBinsDiscretizerQuantizer(tensors)

In [43]:
quantizer

<KBinsDiscretizerQuantizer '{'underlying': KBinsDiscretizer(encode='ordinal', n_bins=5, strategy='quantile'), 'shape': (10, 16)}'>

In [44]:
tensor = quantizer(tensors[0].reshape(1, -1)).reshape(4, 4)

In [45]:
tensor

array([[0., 2., 1., 0.],
       [0., 3., 0., 3.],
       [2., 1., 4., 2.],
       [4., 4., 0., 4.]])

## Implementing **n-gram Graph** based content loss quantizing content loss

In [46]:
from abc import ABC, abstractstaticmethod

class ContentLoss(Configuration):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

  @abstractstaticmethod
  def __call__(self, surface, metric):
    raise NotImplementedError

In [47]:
from pyinsect.collector.NGramGraphCollector import NGramGraphCollector

class NGramGraphContentLoss(ContentLoss):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

    collector = NGramGraphCollector()
    
    for surface in self.surfaces:
      collector.addText(surface)
  
    self._representative = collector.getRepresentativeGraph()

  def __len__(self):
    return len(self.surfaces)
  
  def __call__(self, surface, metric):
    graph = FloatDocumentNGramGraph(Data=surface.reshape(-1))

    try:
      return metric.getSimilarityDouble(self._representative, graph)
    except ZeroDivisionError:
      return float('inf')

  def __str__(self):
    return str({'shape': self.surfaces.shape})

In [85]:
class KBinsDiscretizerQuantizedNGramGraphContentLoss(NGramGraphContentLoss):
  def __init__(self, surfaces, **kwargs):
    self.quantizer = KBinsDiscretizerQuantizer(surfaces, **kwargs)

    super().__init__(surfaces=self.quantizer.surfaces, **kwargs)

  def __call__(self, surface, metric):
    surface = self.quantizer(surface)

    return super().__call__(surface, metric)

### Example Usage

In [49]:
tensors = torch.rand(10, 4, 4).reshape(10, -1)

In [50]:
tensors

tensor([[8.4465e-01, 7.1955e-01, 7.1014e-01, 4.1303e-01, 5.8708e-02, 7.7635e-02,
         1.8859e-01, 1.7628e-01, 7.3538e-01, 6.5478e-01, 3.5743e-01, 1.1912e-01,
         9.9223e-01, 8.7573e-01, 7.3779e-01, 9.9492e-01],
        [2.3380e-01, 2.1526e-01, 2.0733e-01, 4.7579e-01, 5.8631e-02, 8.9582e-01,
         5.1286e-01, 7.4900e-01, 2.2541e-01, 4.4849e-01, 5.6576e-01, 3.6315e-01,
         9.7195e-01, 2.7156e-01, 6.5525e-01, 1.6075e-01],
        [4.4082e-01, 3.6735e-01, 7.1573e-01, 8.4927e-01, 6.2160e-01, 4.5460e-01,
         3.7197e-01, 8.9201e-01, 3.8186e-01, 8.6096e-01, 2.7753e-01, 9.1208e-01,
         9.9800e-01, 8.4274e-01, 7.8677e-01, 6.8610e-01],
        [2.5415e-01, 5.0597e-01, 6.7217e-01, 3.6935e-01, 5.8963e-01, 7.9284e-01,
         3.6734e-02, 4.4899e-02, 2.0082e-01, 5.6099e-01, 4.0732e-01, 6.0775e-01,
         3.8110e-01, 1.7268e-01, 5.6638e-01, 8.3864e-01],
        [1.9978e-01, 9.3829e-01, 6.2306e-01, 9.9302e-01, 4.1706e-02, 2.1394e-02,
         6.6963e-01, 3.3555e-02, 9.0252

In [51]:
content_loss = KBinsDiscretizerQuantizedNGramGraphContentLoss(tensors)

In [52]:
content_loss

<KBinsDiscretizerQuantizedNGramGraphContentLoss '{'shape': (10, 16)}'>

In [53]:
from pyinsect.documentModel.comparators.NGramGraphSimilarity import SimilarityVS

content_loss(tensors[2].reshape(1, -1), SimilarityVS())

0.008968609865470852

# 🙃 A naive-approach

## Our naive `Generator` and `Discriminator` networks

In [54]:
import torch
from torch import nn

class PerceptronGenerator(nn.Module):
  def __init__(self, in_features, out_features, dtype=torch.float64):
    super().__init__()

    self.in_features, self.out_features = in_features, out_features

    self.feature_dims = (in_features,)

    self.linear = nn.Linear(in_features, out_features)
    self.activation = nn.ReLU()

    self.to(dtype=dtype)
  
  def forward(self, batch, debug=False):
    if debug is True:
      print(f"[DEBUG]: {self.__class__.__name__}: Input {batch.shape}")
      
    batch = self.activation(self.linear(batch))

    if debug is True:
      print(f"[DEBUG]: {self.__class__.__name__}: Output {batch.shape}")
    
    return batch
  
  @classmethod
  def from_dataset(cls, dataset, dtype=torch.float64, device=None):
    in_features = dataset.subsampling_value ** 2
    out_features = (dataset.subsampling_factor * dataset.subsampling_value) ** 2

    model =  cls(in_features, out_features, dtype=dtype)

    if device.type == 'cuda' and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

In [55]:
import torch
from torch import nn

class PerceptronDiscriminator(nn.Module):
  def __init__(self, in_features, dtype=torch.float64):
    super().__init__()

    self.feature_dims = (in_features,)

    self.linear = nn.Linear(in_features, 1)
    self.activation = nn.Sigmoid()

    self.to(dtype=dtype)
  
  def forward(self, batch, debug=False):
    if debug is True:
      print(f"[DEBUG]: {self.__class__.__name__}: Input {batch.shape}")

    batch = self.activation(self.linear(batch))

    if debug is True:
      print(f"[DEBUG]: {self.__class__.__name__}: Output {batch.shape}")
    
    return batch
  
  @classmethod
  def from_generator(cls, generator, dtype=torch.float64, device=None):
    model =  cls(generator.out_features, dtype=dtype)

    if device.type == 'cuda' and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

## Loading the Dataset

In [86]:
DATASET_PATH = GDRIVE_DIR / 'MyDrive' / 'Thesis' / 'Datasets' / 'surfaces.zip'

In [87]:
from zipfile import ZipFile

if DATASET_PATH.is_file():
  SURFACES_DIR = BASE_DIR / 'surfaces'
  SURFACES_DIR.mkdir(parents=True, exist_ok=True)
  
  with ZipFile(DATASET_PATH, 'r') as zip_file:
    zip_file.extractall(SURFACES_DIR)

In [88]:
if SURFACES_DIR.is_dir():
  dataset = NanoroughSurfaceMatLabDataset(SURFACES_DIR, transforms=[Flatten(), Normalize(), To(device)])
else:
  generate = NonGaussianSurfaceGenerator()
  dataset = NanoroughSurfaceDataset(list(generate(1000)), transforms=[Flatten(), Normalize(), To(device)])

## Instantiating the content-loss

In [89]:
content_loss = KBinsDiscretizerQuantizedNGramGraphContentLoss(dataset.surfaces.view(dataset.surfaces.shape[0], -1))

## Instantiating the **Generator** and the **Discriminator** Networks

In [90]:
generator = PerceptronGenerator.from_dataset(dataset, device=device)

In [91]:
print(generator)

PerceptronGenerator(
  (linear): Linear(in_features=1024, out_features=16384, bias=True)
  (activation): ReLU()
)


In [92]:
discriminator = PerceptronDiscriminator.from_generator(generator, device=device)

In [93]:
print(discriminator)

PerceptronDiscriminator(
  (linear): Linear(in_features=16384, out_features=1, bias=True)
  (activation): Sigmoid()
)


## Training

In [94]:
from torch.nn import BCELoss

criterion = BCELoss().to(device)

In [95]:
from pathlib import Path

CHECKPOINT_DIR = BASE_DIR / 'checkpoint'
CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)

In [98]:
from functools import partial
from pyinsect.documentModel.comparators.NGramGraphSimilarity import SimilarityVS

training_manager = TrainingManager(
    benchmark=True,
    verbose=True,
    debug=False,
    checkpoint_dir=CHECKPOINT_DIR,
    checkpoint_multiple=False,
    train_epoch=per_epoch,
    log_every_n=10,
    criterion=criterion,
    content_loss=lambda surface: content_loss(surface, SimilarityVS()),
    n_epochs=10,
    train_ratio=0.8,
    optimizer={
      'lr': 0.0005,
      'weight_decay': 0
    },
    dataloader={
      'batch_size': 256,
      'shuffle': True,
      'num_workers': 0,
    }
)

In [None]:
generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes = training_manager(generator, discriminator, dataset)

Training Iteration #0000
per_epoch returned after 193.232 seconds
Epoch: 00, Generator Loss:   0.494, Discriminator Loss:   1.434
Epoch: 00, Discriminator Output: [Real:   0.718, Fake:   0.474]
Training Iteration #0000
per_epoch returned after 193.308 seconds
Epoch: 01, Generator Loss:   0.339, Discriminator Loss:   1.344
Epoch: 01, Discriminator Output: [Real:   0.795, Fake:   0.595]
Training Iteration #0000
per_epoch returned after 193.629 seconds
Epoch: 02, Generator Loss:   0.549, Discriminator Loss:   1.290
Epoch: 02, Discriminator Output: [Real:   0.746, Fake:   0.441]
Training Iteration #0000
per_epoch returned after 194.298 seconds
Epoch: 03, Generator Loss:   0.413, Discriminator Loss:   1.323
Epoch: 03, Discriminator Output: [Real:   0.712, Fake:   0.541]
Training Iteration #0000


In [None]:
plot_against(generator_losses, discriminator_losses, title="Mean Generator vs Discriminator loss per epoch", xlabel="Epoch", ylabel="Loss", labels=("Generator", "Discriminator"))

In [None]:
plot_against(discriminator_output_reals, discriminator_output_fakes, title="Mean Discriminator Output per epoch", xlabel="Epoch", ylabel="Discriminator Output", labels=("Real Data", "Generator Data"))

# 😎 A CNN based approach

## Our CNN based `Generator` and `Discriminator` networks

In [None]:
from torch import nn

class CNNGenerator(nn.Module):
  def __init__(self, in_channels=100, out_channels=128, training_channels=1, dtype=torch.float64):
    super().__init__()

    self.in_channels = in_channels
    self.out_channels = out_channels
    self.training_channels = training_channels

    self.feature_dims = (in_channels, 1, 1)

    self.module_list = nn.ModuleList([
      nn.Sequential(
        nn.ConvTranspose2d(in_channels, out_channels * 16, 4, 1, 0, bias=False),
        nn.BatchNorm2d(out_channels * 16),
        nn.ReLU(True)
      ),
      nn.Sequential(
        nn.ConvTranspose2d(out_channels * 16, out_channels * 8, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 8),
        nn.ReLU(True)
      ),
      nn.Sequential(
        nn.ConvTranspose2d(out_channels * 8, out_channels * 4, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 4),
        nn.ReLU(True)
      ),
      nn.Sequential(
        nn.ConvTranspose2d(out_channels * 4, out_channels * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 2),
        nn.ReLU(True)
      ),
      nn.Sequential(
        nn.ConvTranspose2d(out_channels * 2, out_channels, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels),
        nn.ReLU(True)
      ),
      nn.Sequential(
        nn.ConvTranspose2d(out_channels, training_channels, 4, 2, 1, bias=False),
        nn.ReLU()
      )                              
    ])

    self.to(dtype)

  @classmethod
  def from_device(cls, device):
    model =  cls()

    if device.type == 'cuda' and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

  def forward(self, batch, debug=False):
    if debug is True:
        print(f"[DEBUG]: {self.__class__.__name__}: Input {batch.shape}")

    for i, module in enumerate(self.module_list):
      batch = module(batch)

      if debug is True:
        print(f"[DEBUG]: {self.__class__.__name__}: {'%02d' % (i + 1,) if not i == len(self.module_list) - 1 else 'Output'} {batch.shape}")

    return batch

In [None]:
from torch import nn

class CNNDiscriminator(nn.Module):
  def __init__(self, in_channels=1, out_channels=128, dtype=torch.float64):
    super().__init__()

    self.in_channels = in_channels
    self.out_channels = out_channels

    self.module_list = nn.ModuleList([
      nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 4, 2, 1, bias=False),
        nn.LeakyReLU(0.2, inplace=True)
      ),
      nn.Sequential(
        nn.Conv2d(out_channels, out_channels * 2, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 2),
        nn.LeakyReLU(0.2, inplace=True)
      ),
      nn.Sequential(
        nn.Conv2d(out_channels * 2, out_channels * 4, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 4),
        nn.LeakyReLU(0.2, inplace=True)
      ),
      nn.Sequential(
        nn.Conv2d(out_channels * 4, out_channels * 8, 4, 2, 1, bias=False),
        nn.BatchNorm2d(out_channels * 8),
        nn.LeakyReLU(0.2, inplace=True)
      ),
      nn.Sequential(
        nn.Conv2d(out_channels * 8, 1, 8, 1, 0, bias=False),
        nn.Sigmoid()
      )
    ])

    self.to(dtype=dtype)

  @classmethod
  def from_device(cls, device):
    model =  cls()

    if device.type == 'cuda' and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

  def forward(self, batch, debug=False):
    if debug is True:
        print(f"[DEBUG]: {self.__class__.__name__}: Input {batch.shape}")

    for i, module in enumerate(self.module_list):
      batch = module(batch)

      if debug is True:
        print(f"[DEBUG]: {self.__class__.__name__}: {'%02d' % (i + 1,) if not i == len(self.module_list) - 1 else 'Output'} {batch.shape}")

    return batch

## Loading the Dataset

In [None]:
DATASET_PATH = GDRIVE_DIR / 'MyDrive' / 'Thesis' / 'Datasets' / 'surfaces.zip'

In [None]:
from zipfile import ZipFile

if DATASET_PATH.is_file():
  SURFACES_DIR = BASE_DIR / 'surfaces'
  SURFACES_DIR.mkdir(parents=True, exist_ok=True)
  
  with ZipFile(DATASET_PATH, 'r') as zip_file:
    zip_file.extractall(SURFACES_DIR)

In [None]:
if SURFACES_DIR.is_dir():
  dataset = NanoroughSurfaceMatLabDataset(SURFACES_DIR, transforms=[Normalize(), To(device), View(1, 128, 128)])
else:
  generate = NonGaussianSurfaceGenerator()
  dataset = NanoroughSurfaceDataset(list(generate(1000)), transforms=[Normalize(), To(device), View(1, 128, 128)])

## Instantiating the content-loss

In [None]:
content_loss = KBinsDiscretizerQuantizedNGramGraphContentLoss(dataset.surfaces.view(dataset.surfaces.shape[0], -1))

## Instantiating the **Generator** and the **Discriminator** Networks

In [None]:
generator = CNNGenerator.from_device(device)

In [None]:
print(generator)

In [None]:
discriminator = CNNDiscriminator.from_device(device)

In [None]:
print(discriminator)

## Training

In [None]:
from torch.nn import BCELoss

criterion = BCELoss().to(device)

In [None]:
from functools import partial
from pyinsect.documentModel.comparators.NGramGraphSimilarity import SimilarityVS

training_manager = TrainingManager(
    benchmark=True,
    verbose=True,
    debug=False,
    checkpoint_dir=None,
    train_epoch=per_epoch,
    log_every_n=10,
    criterion=criterion,
    content_loss=lambda surface: content_loss(surface, SimilarityVS()),
    n_epochs=10,
    train_ratio=0.8,
    optimizer={
      'lr': 0.0002,
      'betas': (0.5, 0.999)
    },
    dataloader={
      'batch_size': 256,
      'shuffle': True,
      'num_workers': 0,
    }
)

In [None]:
generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes = training_manager(generator, discriminator, dataset)

In [None]:
plot_against(generator_losses, discriminator_losses, title="Mean Generator vs Discriminator loss per epoch", xlabel="Epoch", ylabel="Loss", labels=("Generator", "Discriminator"))

In [None]:
plot_against(discriminator_output_reals, discriminator_output_fakes, title="Mean Discriminator Output per epoch", xlabel="Epoch", ylabel="Discriminator Output", labels=("Real Data", "Generator Data"))