<a href="https://colab.research.google.com/github/billsioros/thesis/blob/master/Nanorough_surface_Super_resolution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prerequisites

## Pip Modules

In [None]:
!pip install torch numpy sympy scipy plotly pandas install matplotlib

Collecting install
  Downloading https://files.pythonhosted.org/packages/f0/a5/fd2eb807a9a593869ee8b7a6bcb4ad84a6eb31cef5c24d1bfbf7c938c13f/install-1.3.4-py3-none-any.whl
Installing collected packages: install
Successfully installed install-1.3.4


## RNG Initialization

We are going to be seed the random number generator engines, so that our results are reproducible accross different set ups

In [None]:
import torch
import numpy as np
import random
import os

SEED = 1234

if SEED is not None:
  np.random.seed(SEED)
  random.seed(SEED)
  torch.manual_seed(SEED)
  torch.cuda.manual_seed(SEED)
  torch.backends.cudnn.deterministic = True
  os.environ['PYTHONHASHSEED'] = str(SEED)

## Determining the Current Working Directory

In [None]:
from pathlib import Path

BASE_DIR = Path.cwd()

## Mounting Google Drive

In [None]:
GDRIVE_DIR = BASE_DIR / 'drive'

In [None]:
from google.colab import drive

drive.mount(f'{GDRIVE_DIR}')

Mounted at /content/drive


## Determining available backend

In [None]:
device = "cpu"
if torch.cuda.is_available():
  device = "cuda:0"

In [None]:
device = torch.device(device)

# General Utilities

In [None]:
from time import time
from functools import wraps

## A debugging decorator

In [None]:
import inspect

def debug(method):
    signature = inspect.signature(method)

    defaults = {
      k: v.default
      for k, v in signature.parameters.items()
      if v.default is not inspect.Parameter.empty
    }

    @wraps(method)
    def wrapper(*args, **kwargs):
        called_with = ''
        if args:
            called_with += ', '.join(str(x) for x in args)
            called_with += ', '

        called_with += ', '.join(f"{x}={kwargs.get(x, defaults[x])}" for x in defaults.keys())

        try:
          rv = method(*args, **kwargs)
        except Exception as e:
          print(f"{method.__name__}({called_with}) raised {e}")
          raise

        print(f"{method.__name__}({called_with}) returned {rv}")

        return rv

    return wrapper

## A benchmarking decorator

In [None]:
def benchmark(method):
  @wraps(method)
  def wrapper(*args, **kwargs):
    beg = time()
    rv = method(*args, **kwargs)
    end = time()

    print("%s returned after %7.3f seconds" % (method.__name__, (end - beg)))

    return rv

  return wrapper

# Metrics

## Correlation

In [None]:
def correlation(z_ngs):
  N = z_ngs.shape[0]
  
  rdif, hhcf1d = np.arange(N // 2), np.zeros(N // 2)

  for ndif in range(N // 2):
    surf1 = z_ngs[:N, : (N - ndif)]
    surf2 = z_ngs[:N, ndif:N]
    difsur2 = (surf1 - surf2) ** 2
    hhcf1d[ndif] = np.sqrt(np.mean(np.mean(difsur2)))
  
  return rdif, hhcf1d

# Plotting Utilities


## Plotting the correlation

In [None]:
import plotly.express as px

def plot_correlation(array):
  x, y = correlation(array)
  
  fig = px.line(
    # title="1-D height-height correlation function",
    # x="r(nm)", y="G(r) (nm)",
    x=x, y=y,
    log_x=True, log_y=True
  )
  
  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting a surface as a 3D surface

In [None]:
import plotly.graph_objects as go

def as_3d_surface(array, autosize=False):
  fig = go.Figure(data=[go.Surface(z=array)])

  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting a surface as a grayscale image

In [None]:
import plotly.express as px

def as_grayscale_image(array):
  fig = px.imshow(array, color_continuous_scale='gray')
  fig.update_layout(coloraxis_showscale=False)
  fig.update_xaxes(showticklabels=False)
  fig.update_yaxes(showticklabels=False)

  fig.update_layout(
    # title=title,
    autosize=True,
    width=500, height=500,
    # margin=dict(l=65, r=50, b=65, t=90)
  )

  fig.show()

## Plotting a model's Learning Curves

In [None]:
import matplotlib.pyplot as plt

def train_vs_test(train_losses, test_losses):
  x = list(range(max(len(train_losses), len(test_losses))))

  plt.plot(x, train_losses, label='train')
  plt.plot(x, test_losses, label='test')

  plt.grid()
  plt.title('Average MSE Loss / Epoch')
  plt.xlabel('Average MSE Loss')
  plt.ylabel('Epoch')

  plt.xlim([min(x), max(x)])

  plt.legend()

# Surface Generation

## The base class `SurfaceGenerator`

In [None]:
from abc import ABC, abstractmethod

import numpy as np
import sympy
from scipy import stats

class SurfaceGenerator(ABC):
    def __init__(self, n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha):
        self.n_points = n_points
        self.rms = rms
        self.skewness = skewness
        self.kurtosis = kurtosis
        self.corlength_x = corlength_x
        self.corlength_y = corlength_y
        self.alpha = alpha

        self._mean = 0
        self._length = 0

    def __str__(self):
        return f"{self.__class__.__name__}({self.n_points}, {self.rms}, {self.skewness}, {self.kurtosis}, {self.corlength_x}, {self.corlength_y}, {self.alpha})"

    def __repr__(self):
        return f'<{self}>'

    def __call__(self, length):
        self._length = length

        return self

    def __len__(self):
      return self._length

    def __iter__(self):
        for _ in range(self._length):
            yield self.generate_surface()

    def sort(self, elements):
        indices = np.argsort(elements, axis=0)

        return elements[indices], indices

    @abstractmethod
    def autocorrelation(self, tx, ty):
        raise NotImplementedError

    def generate_surface(self):
        # 1st step: Generation of a Gaussian surface

        # Determine the autocorrelation function R(tx,ty)
        R = np.zeros((self.n_points, self.n_points))

        txmin = -self.n_points // 2
        txmax = self.n_points // 2

        tymin = -self.n_points // 2
        tymax = self.n_points // 2

        dtx = (txmax - txmin) // self.n_points
        dty = (tymax-tymin) // self.n_points

        for tx in range(txmin, txmax, dtx):
            for ty in range(tymin, tymax, dty):
                R[tx + txmax, ty + tymax] = self.autocorrelation(tx, ty)

        # According to the Wiener-Khinchine theorem FR is the power spectrum of the desired profile
        FR = np.fft.fft2(R, (self.n_points, self.n_points))
        AMPR = np.sqrt(dtx ** 2 + dty ** 2) * abs(FR)

        # 2nd step: Generate a white noise, normalize it and take its Fourier transform
        X = np.random.rand(self.n_points, self.n_points)
        aveX = np.mean(np.mean((X)))

        dif2X = (X - aveX) ** 2
        stdX = np.sqrt(np.mean(np.mean(dif2X)))
        X = X / stdX
        XF = np.fft.fft2(X, s=(self.n_points, self.n_points))

        # 3nd step: Multiply the two Fourier transforms
        YF = XF * np.sqrt(AMPR)

        # 4th step: Perform the inverse Fourier transform of YF and get the desired surface
        zaf = np.fft.ifft2(YF, s=(self.n_points, self.n_points))
        z = np.real(zaf)

        avez = np.mean(np.mean(z))
        dif2z = (z-avez) ** 2
        stdz = np.sqrt(np.mean(np.mean(dif2z)))
        z = ((z - avez) * self.rms) / stdz

        # Define the fraction of the surface to be analysed
        xmin = 0
        xmax = self.n_points
        ymin = 0
        ymax = self.n_points
        z_gs = z[xmin:xmax, ymin:ymax]

        # 2nd step: Generation of a non-Gaussian noise NxN
        z_ngn = stats.pearson3.rvs(
            self.skewness,
            loc=self._mean, scale=self.rms, size=(self.n_points, self.n_points)
        )

        # 3rd step: Combination of z_gs with z_ngn to output a z_ms
        v_gs = z_gs.flatten()
        v_ngn = z_ngn.flatten()

        _, Igs = self.sort(v_gs)

        vs_ngn, _ = self.sort(v_ngn)

        v_ngs = vs_ngn[Igs]

        return v_ngs.reshape(self.n_points, self.n_points)

## A simple non-gaussian surface generator

In [None]:
class NonGaussianSurfaceGenerator(SurfaceGenerator):
    def __init__(self, n_points=500, rms=1, skewness=0, kurtosis=3, corlength_x=20, corlength_y=20, alpha=1):
        super().__init__(n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha)

    def autocorrelation(self, tx, ty):
        return ((self.rms ** 2) * np.exp(-(abs(np.sqrt((tx / self.corlength_x) ** 2 + (ty / self.corlength_y) ** 2))) ** (2 * self.alpha)))

### Example

In [None]:
g = NonGaussianSurfaceGenerator()

In [None]:
# for surface in g(1):
#   as_grayscale_image(surface)
#   as_3d_surface(surface)
#   plot_correlation(surface)

## A Besel function based non-gaussian surface generator

In [None]:
class BeselNonGaussianSurfaceGenerator(NonGaussianSurfaceGenerator):
    def __init__(self, n_points=500, rms=1, skewness=0, kurtosis=3, corlength_x=20, corlength_y=20, alpha=1, beta_x=1, beta_y=1):
        super().__init__(n_points, rms, skewness, kurtosis, corlength_x, corlength_y, alpha)

        self.beta_x, self.beta_y = beta_x, beta_y

    def autocorrelation(self, tx, ty):
        return super().autocorrelation(tx, ty) * sympy.besselj(0, (2 * np.pi * np.sqrt((tx / self.beta_x) ** 2 + (ty / self.beta_y) **2)))

### Example

In [None]:
g = BeselNonGaussianSurfaceGenerator()

In [None]:
# for surface in g(1):
#   as_grayscale_image(surface)
#   as_3d_surface(surface)
#   plot_correlation(surface)

# Dataset Loading and Preprocessing

## Defining the preprocessing pipeline

In [None]:
from abc import ABC, abstractmethod

class Transform(ABC):
  def __init__(self, *args, **kwargs):
    pass
  
  @abstractmethod
  def __call__(self, *args, **kwargs):
    raise NotImplementedError

In [None]:
from torch import flatten

class Flatten(Transform):
  def __call__(self, tensor):
    return flatten(tensor)

In [None]:
class To(Transform):
  def __init__(self, device):
    self.device = device

  def __call__(self, tensor):
    return tensor.to(self.device)

## Defining the `Dataset` loading procedure

## The base `NanoroughSurfaceDataset` class

In [None]:
from torch.utils.data.dataset import  Dataset
import torch
import numpy as np

class NanoroughSurfaceDataset(Dataset):
  """A dataset of pre-generated nanorough surfaces"""
  def __init__(self, surfaces, subsampling_factor=4, transforms=[]):
    self.surfaces = np.array(surfaces)
    self.surfaces = torch.from_numpy(self.surfaces)

    self.subsampling_factor = subsampling_factor
    self.subsampling_value = int(surfaces[0].shape[1] / subsampling_factor)

    self.transforms = transforms

  def __len__(self):
    return len(self.surfaces)
  
  def __getitem__(self, idx):
    surface = self.surfaces[idx]

    for transform in self.transforms:
      surface = transform(surface)

    return surface

## A dataset of pre-generated nanorough surfaces in `.mat` format

In [None]:
import scipy.io as sio

class NanoroughSurfaceMatLabDataset(NanoroughSurfaceDataset):
  """A dataset of pre-generated nanorough surfaces in `.mat` format"""
  def __init__(self, surface_dir, subsampling_factor=4, variable_name='data', transforms=[]):
    assert surface_dir.is_dir(), "%s does not exist or is not a dictionary" % (surface_dir,)

    surfaces = []
    for file in surface_dir.iterdir():
      if file.is_dir() or file.suffix != '.mat':
        continue

      surfaces.append(self.from_matlab(file, variable_name))

    super().__init__(surfaces, subsampling_factor=subsampling_factor, transforms=transforms)

  @classmethod
  def from_matlab(cls, path_to_mat, variable_name):
    matlab_array = sio.loadmat(str(path_to_mat))
    numpy_array = matlab_array[variable_name]
    
    return numpy_array

# Training and Testing Utilities

## Performing a single training epoch

In [None]:
def train_epoch(generator, discriminator, dataloader, optimizer_generator, optimizer_discriminator, criterion, log_every_n=None, device=None, debug=False):
  generator.train()

  generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake = 0, 0, 0, 0
  for train_iteration, X_batch in enumerate(dataloader):
    if log_every_n is not None and not train_iteration % log_every_n:
      print(f"Training Iteration #{train_iteration:04d}")

    # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
    ## Train with all-real batch
    discriminator.zero_grad()
    # Format batch
    label = torch.full((X_batch.size(0),), 1, dtype=torch.float64, device=device)
    # Forward pass real batch through D
    output = discriminator(X_batch, debug=debug).view(-1)
    # Calculate loss on all-real batch
    discriminator_error_real = criterion(output, label)
    # Calculate gradients for D in backward pass
    discriminator_error_real.backward()
    discriminator_output_real_batch = output.mean().item()

    ## Train with all-fake batch
    # Generate batch of latent vectors
    noise = torch.randn(X_batch.size(0), *generator.feature_dims, device=device)
    # Generate fake image batch with G
    fake = generator(noise, debug=debug)
    label.fill_(0)
    # Classify all fake batch with D
    output = discriminator(fake.detach(), debug=debug).view(-1)
    # Calculate D's loss on the all-fake batch
    discriminator_error_fake = criterion(output, label)
    # Calculate the gradients for this batch
    discriminator_error_fake.backward()
    # Add the gradients from the all-real and all-fake batches
    discriminator_error_total = discriminator_error_real + discriminator_error_fake
    # Update D
    optimizer_discriminator.step()

    # (2) Update G network: maximize log(D(G(z)))
    generator.zero_grad()
    label.fill_(1)  # fake labels are real for generator cost
    # Since we just updated D, perform another forward pass of all-fake batch through D
    output = discriminator(fake, debug=debug).view(-1)
    # Calculate G's loss based on this output
    discriminator_error_fake = criterion(output, label)
    # Calculate gradients for G, which propagate through the discriminator
    discriminator_error_fake.backward()
    discriminator_output_fake_batch = output.mean().item()
    # Update G
    optimizer_generator.step()

    generator_loss += discriminator_error_fake / len(dataloader)
    discriminator_loss += discriminator_error_total / len(dataloader)
    discriminator_output_real += discriminator_output_real_batch / len(dataloader)
    discriminator_output_fake += discriminator_output_fake_batch / len(dataloader)
    
  return generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake

## Spliting the original dataset into Training and Testing subsets

In [None]:
from torch.utils.data import random_split

def train_test_split(dataset, train_ratio=0.8):
  train_size = int(len(dataset) * train_ratio)
  test_size = len(dataset) - train_size

  train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

  return train_dataset, test_dataset

## Configuring the Training and Testing `DataLoader`s

In [None]:
from torch.utils.data import DataLoader

def train_test_dataloaders(dataset, train_ratio=0.8, **kwargs):
  train_dataset, test_dataset = train_test_split(dataset, train_ratio=train_ratio)

  train_dataloader = DataLoader(train_dataset, **kwargs)
  test_dataloader = DataLoader(test_dataset, **kwargs)

  return train_dataloader, test_dataloader

## The Training Manager

In [None]:
class Configuration:
  def __init__(self, **kwargs):
    for key, value in kwargs.items():
      if isinstance(value, dict):
        value = Configuration(**value)

      setattr(self, key, value)
  
  def to_dict(self):
    rv = {}
    for key, value in self.__dict__.items():
      if isinstance(value, Configuration):
        value = value.to_dict()
      
      rv[key] = value
    
    return rv

In [None]:
from torch.optim import Adam
from functools import partial

class TrainingManager(Configuration):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

    if not hasattr(self, "debug"):
      self.debug = False

    if self.debug is True:
      self.verbose = True

    if not hasattr(self, "verbose"):
      self.verbose = False
    
    if self.verbose is True:
      self.benchmark = True

    if not hasattr(self, "benchmark"):
      self.benchmark = False

    if not hasattr(self, "log_every_n") or self.verbose is False:
      self.log_every_n = None
    
    if not hasattr(self, "checkpoint_dir"):
      self.checkpoint_dir = None
  
  def __call__(self, generator, discriminator, dataset):
    train_dataloader, test_dataloader = train_test_dataloaders(dataset, train_ratio=self.train_ratio, **self.dataloader.to_dict())

    optimizer_generator = Adam(generator.parameters(), **self.optimizer.to_dict())
    optimizer_discriminator = Adam(discriminator.parameters(), **self.optimizer.to_dict())

    train_epoch_f = self.train_epoch

    if self.benchmark is True:
      train_epoch_f = benchmark(train_epoch_f)

    if self.debug is True:
      train_epoch_f = debug(partial(train_epoch_f, debug=True))

    generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes = [], [], [], []
    for epoch in range(self.n_epochs):
      generator_loss, discriminator_loss, discriminator_output_real, discriminator_output_fake = train_epoch_f(generator, discriminator, train_dataloader, optimizer_generator, optimizer_discriminator, self.criterion, self.log_every_n)
      
      if self.checkpoint_dir is not None and (not generator_losses or train_loss < min(generator_losses)):
        torch.save(generator.state_dict(), self.checkpoint_dir / f'{epoch:03d}.mt')

      generator_losses.append(generator_loss)
      discriminator_losses.append(discriminator_loss)
      discriminator_output_reals.append(discriminator_output_real)
      discriminator_output_fakes.append(discriminator_output_fake)
      
      if self.verbose is True:
        print("Epoch: %02d, Generator Loss: %7.3f, Discriminator Loss: %7.3f" % (epoch, generator_loss, discriminator_loss))
        print("Epoch: %02d, Discriminator Output: [Real: %7.3f, Fake: %7.3f]" % (epoch, discriminator_output_real, discriminator_output_fake))
    
    return generator_losses, discriminator_losses, discriminator_output_reals, discriminator_output_fakes

# A naive-approach

In [None]:
import torch
from torch import nn

class PerceptronGenerator(nn.Module):
  def __init__(self, in_features, out_features, dtype=torch.float64):
    super().__init__()

    self.in_features, self.out_features = in_features, out_features

    self.feature_dims = (in_features,)

    self.linear = nn.Linear(in_features, out_features).type(dtype)
    self.activation = nn.ReLU()
  
  def forward(self, batch, debug=False):
    if debug is True:
      print(batch)
      
    return self.activation(self.linear(batch))
  
  @classmethod
  def from_dataset(cls, dataset, dtype=torch.float64, device=None):
    in_features = dataset.subsampling_value ** 2
    out_features = (dataset.subsampling_factor * dataset.subsampling_value) ** 2

    model =  cls(in_features, out_features, dtype=dtype)

    if torch.cuda.is_available() and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

In [None]:
import torch
from torch import nn

class PerceptronDiscriminator(nn.Module):
  def __init__(self, in_features, dtype=torch.float64):
    super().__init__()

    self.feature_dims = (in_features,)

    self.linear = nn.Linear(in_features, 1).type(dtype)
    self.activation = nn.Sigmoid()
  
  def forward(self, batch, debug=False):
    if debug is True:
      print(batch)

    return self.activation(self.linear(batch))
  
  @classmethod
  def from_generator(cls, generator, dtype=torch.float64, device=None):
    model =  cls(generator.out_features, dtype=dtype)

    if torch.cuda.is_available() and torch.cuda.device_count() > 1:
      model = nn.DataParallel(model)

    model = model.to(device)

    return model

## Loading the Dataset

In [None]:
if GDRIVE_DIR.is_dir():
  DATASET_PATH = GDRIVE_DIR / 'MyDrive' / 'Thesis' / 'Datasets' / 'surfaces.zip'

In [None]:
SURFACES_DIR = BASE_DIR / 'surfaces'
SURFACES_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
from zipfile import ZipFile

if DATASET_PATH.is_file():
  with ZipFile(DATASET_PATH, 'r') as zip:
    zip.extractall(SURFACES_DIR)

In [None]:
if SURFACES_DIR.is_dir():
  dataset = NanoroughSurfaceMatLabDataset(SURFACES_DIR, transforms=[Flatten(), To(device)])
else:
  # Here we might automatic surface generation
  # raise a `NotImplementedError` for now
  raise NotImplementedError

## Instantiating the **Generator** and the **Discriminator** Networks

In [None]:
generator = PerceptronGenerator.from_dataset(dataset, device=device)

In [None]:
discriminator = PerceptronDiscriminator.from_generator(generator, device=device)

## Training

In [None]:
from torch.nn import BCELoss

criterion = BCELoss().to(device)

In [None]:
from functools import partial

training_manager = TrainingManager(
    benchmark=True,
    verbose=True,
    debug=True,
    checkpoint_dir=None,
    train_epoch=partial(train_epoch, device=device),
    log_every_n=10,
    criterion=criterion,
    n_epochs=10,
    train_ratio=0.8,
    optimizer={
      'lr': 0.001,
      'weight_decay': 0
    },
    dataloader={
      'batch_size': 128,
      'shuffle': True,
      'num_workers': 0,
    }
)

In [None]:
model, train_losses, test_losses = training_manager(generator, discriminator, dataset)

Training Iteration #0000
tensor([[ 0.9899,  0.8543,  0.4182,  ...,  1.0619,  1.5418,  1.0455],
        [ 0.3082,  0.1400,  0.0048,  ...,  0.3592,  1.1984,  1.2893],
        [-0.6707, -0.4744, -0.2135,  ...,  1.8230,  0.3966, -0.1570],
        ...,
        [-0.2438, -0.2079, -0.4279,  ..., -0.4433, -0.3043,  0.0804],
        [-0.8506, -0.5990, -0.0849,  ..., -1.3179, -0.4964, -0.0228],
        [-0.7159,  0.0222,  0.0817,  ...,  0.7098,  0.0684, -1.0482]],
       device='cuda:0', dtype=torch.float64)
tensor([[ 0.3482, -0.9704, -1.4304,  ..., -1.0486,  0.4644, -0.8822],
        [-0.8711,  0.3937, -0.2729,  ...,  0.6861,  0.3185, -0.8307],
        [-0.4645, -2.4283,  0.2489,  ..., -0.9476, -0.5412, -0.2385],
        ...,
        [-1.1282, -1.4746,  1.1907,  ...,  0.0145,  1.0656,  0.3169],
        [ 0.3159,  0.3231,  0.4856,  ...,  0.3573,  1.8360, -0.4365],
        [-0.7868, -0.2171, -0.6506,  ..., -0.1228,  1.4908,  1.7980]],
       device='cuda:0')


AttributeError: ignored

In [None]:
train_vs_test(train_losses, test_losses)