<a href="https://colab.research.google.com/github/haishan-shi/A1/blob/master/NB03_NeuralNets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1 Basic Neural Networks

## 1.1 Prepare Environment

### 1.1.1 Library Import

In [0]:
import numpy as np
import math
import torch
import plotly.express as px
import pandas as pd
import random

### 1.1.2 Visualisation Functions

#### A Simple visualiser

In [0]:
def simple_visualise(mod):
    xx, yy = np.meshgrid(np.arange(-5, 5.01, 0.05), 
                         np.arange(-5, 5.01, 0.05))
    xx = xx.flatten()
    yy = yy.flatten()
    d = pd.DataFrame(data=dict(x=xx, y=yy, 
                               z=[mod.forward((x, y)) 
                                  for x, y in zip(xx, yy)]))
    fig = px.scatter(d, x='x', y='y', color='z')
    fig.show()

## 1.2 Neural Networks Model Definition

### 1.2.1 Definition and Naive Implementation

Let us literally translate the definition of a neural network into computer implementation:
Neural network: Multiple _layers_ of _perceptron(s)_.
```python
def compute_neural_network(x):
    # 0. prepare the input for the first layer
    layer_input = x
    for layer_idx in [0, 1, 2]:
        # 1. fill output of this layer by executing each
        #    perceptron in this layer
        layer_output = []
        for perceptron in net_layers[layer_idx]:
            perceptron.compute_output(layer_input) 
            # Note all perceptrons in this layer share the same
            # `layer_input`
        #!!----------------------------------    
        # 2. pass the output of THIS layer
        #    to the NEXT layer as the input
        #------------------------------------
        layer_input = layer_output
        # END OF LOOP OVER `layer_idx`
```
Recall that a perception is to get the weighted sum of all attributes in an input, followed by some _activation_. See below:
```python
def compute_perceptron(x):
    weighted_sum = sum([xi * wi for xi, wi in zip(x, weights)])
    return activation_function(weighted_sum)
```
Of course, we will need to get the weights and the activation function setup. So we will use an object class to represent both the perceptrons and the networks.

NB: If you don't understand the construction `[t for t in list_of_t_values]`, please checkout tutorials about Python list.

In [0]:
def sigmoid_func(a):
    return 1 / (1 + math.exp(-a))



class Perceptron2D:
    """Perceptron model: linearly combine data attributes followed by a non-linear activation
    This is a simplified implementation and deals with data with 2 attributes. 
    You can also refer to the more complete implementation in the note of Week 3.
    """
    def __init__(self, w0=1, w1=0, activation_func=sigmoid_func):
        self.w0 = w0
        self.w1 = w1
        self.act = activation_func
    
    def forward(self, x):
        wsum = x[0] * self.w0 + x[1] * self.w1
        sigmoid_wsum = self.act(wsum) # sigmoid for activation
        return sigmoid_wsum
        

In [0]:
# Let's have a look at how the perceptron worked on 2D data
p = Perceptron2D(0.5, -2.5)
simple_visualise(p)
# Please note the effect of activation by examining the z-value.

__EXERCISE__

In the code cell above, adjust the model parameters and observe the change of the model behaviour.

__EXERCISE__

Use a different activation function. Such as
$$
\begin{align}
y(h) = \left\{ \begin{array}{c}
0, \textrm{ if } h \leq 0 \\
h, \textrm{ if } h > 0
\end{array} \right.
\end{align}
$$

Add implement your activation like:
```python
def relu_func(h):
    # compute y
    # HINT: consider using `max`
    return y
```
Then use your function to construct a Perceptron check the behaviour of the perceptron.

In [0]:
def my_activation_func(h):
    return max(0, h)
p = Perceptron2D(0.1, -0.5, activation_func=my_activation_func)
simple_visualise(p)

In [0]:
def k2(x):
    return [xi**2 for xi in x]

class Perceptron2DX:
    """Perceptron model wrapped. We transform the input before
    processing them using the perceptron.
    """
    def __init__(self, percep, xtransform=k2):
        self.perceptron = percep
        self.xtransform = xtransform
    
    def forward(self, x):
        return self.perceptron.forward(self.xtransform(x))

In [0]:
pp = Perceptron2DX(p)
simple_visualise(pp)

__EXERCISE__

__1__:
Use a different transform function. Such as
$$
\begin{align}
x'_1 = \sin(\omega_1 \cdot x_1) \\
x'_2 = \cos(\omega_2 \cdot x_2)
\end{align}
$$

__2__:
Using your transform function on a _different perceptron core_, e.g.
```python
vanilla_perceptron_2 = Perceptron2D(
        -1, 1, 
        activation_func=my_activation_func)
pp2a = Perceptron2DX(vanilla_perceptron_2, 
                     xtransform=new_transform)
```


In [0]:
def new_transform(x):
    tx0 = math.sin(x[0] * 5)
    tx1 = math.cos(x[1])
    return [tx0, tx1]

pp2 = Perceptron2DX(p, xtransform=new_transform)
simple_visualise(pp2)

In [0]:
ly0_p0 = Perceptron2D(+1, -1, activation_func=my_activation_func)
ly0_p1 = Perceptron2D(-1, -3, activation_func=my_activation_func)
ly1_p = Perceptron2D(-1, 0.5, activation_func=math.tanh)
def new_transform(x):
    tx0 = ly0_p0.forward(x)
    tx1 = ly0_p1.forward(x)
    return [tx0, tx1]

pp3 = Perceptron2DX(ly1_p, xtransform=new_transform)
simple_visualise(pp3)

In [0]:
# Try yourself: can you change the parameters so 
ly0_p0 = Perceptron2D(+1, -1, activation_func=lambda x:x)
ly0_p1 = Perceptron2D(-1, -3, activation_func=lambda x:x)
ly1_p = Perceptron2D(-1, 0.5, activation_func=math.tanh)
def new_transform(x):
    tx0 = ly0_p0.forward(x)
    tx1 = ly0_p1.forward(x)
    return [tx0, tx1]

pp3 = Perceptron2DX(ly1_p, xtransform=new_transform)
simple_visualise(pp3)

### 1.2.2 Multiple Layer Perceptron

Alternatively (to the nested perceptrons above), we can define an NeuralNet class to hold all perceptrons in all layers. The advantage is that now we can easily extend the network to have more layers. 

#### Naive Implementation

In [0]:
# Define a class, so our network can manage its "perceptrons" easily
class NeuralNet:
    """NeuralNet represents a simple neural network object class.
    As an example, it consists of 2 layers of perceptrons. 
    The first layer has 2 perceptrons and the second one has 1.
    
    The perceptrons deal with data of 2 attributes.
    """
    def __init__(self, perc0_w=(-1, 1), perc1_w=(2, -1), perc2_w=(0.5, 0.5)):
        self.layers = [[Perceptron2D(*perc0_w), Perceptron2D(*perc1_w)], 
                       [Perceptron2D(*perc2_w)]] # *(w0,w1) expand the values in tuple
        
    def forward(self, x):
        """
        Compute the network output layer by layer. "forward" is a conventional
        term for execute computing of a net.
        """
        # use layer-0 to process x and get what's to feed to layer-1
        layer1_input = [p.forward(x) for p in self.layers[0]]
        # get the final output from layer-1
        final_output = [p.forward(layer1_input) for p in self.layers[1]]
        # note we have only two layers, so I didn't use a loop over the layers
        return final_output[0]
        

In [0]:
net0 = NeuralNet()
simple_visualise(net0)

__EXERCISE__ (optional, similar to one above)

Adjust parameters to show how the network behaviour changes.

#### Computation using Matrix Operations

In [0]:
def matmul(A, B):
    """
    matmul computs A x B for two matrices
    :param A: a collective object contains rows of a matrix
        A[i], i-th row, another collective object contains the elements
        A[i][j], the element
    :param B: similar to A
    """
    # figure out the size of A and B and the result
    rows = len(A)
    elements_inner = len(A[0])
    assert elements_inner == len(B), "Rows of B must be the same as Cols of A"
    cols = len(B[0])
    # Initialise C to the appropriate size
    C = [[0.0 for ci in range(cols)] for ri in range(rows)]
    
    # Fill C: 2 outer loops are for each element
    for r in range(rows):
        for c in range(cols):
            # Compute element [i][j]
            for k in range(elements_inner):
                C[r][c] += A[r][k] * B[k][c]
    return C

HINT: read the code and try it while watching the accompany video.

In [0]:
# Define a class, so our network can manage its "perceptrons" easily
class NeuralNetV2:
    """NeuralNetV2 represents a simple neural network object class.
    This object will manage all the neurons in the network. 
    """
    def __init__(self, neuron_numbers_in_layers=[2, 2, 1],
                 weights=None):
        """
        :param neuron_numbers_in_layers: first/last -- inputs and outputs
        :param weights: a dict, weights["in0out1"] represents the weights
          for computing layer1 from layer0, if layer0 has 3 inputs and layer1
          has 2 outputs, then the weight will be a matrix of 3 x 2, i.e.
          weights["in0out1"][i][j] is the weight for computing element-j in layer1
          by using element-i in layer0.
        """
        # Weights between 
        # Layer1 and 2, Layer 2 and 3, ...
        
        self.weights = dict()
        self.activ_fn = dict()
        self.neuron_nums = neuron_numbers_in_layers
        self.layer_num = len(neuron_numbers_in_layers) - 1
        
        # DEFINE (!not DO!, we dont have input X now) computation between layers
        for l_in in range(self.layer_num):
            l_out = l_in + 1
            pkey = f"in{l_in}out{l_out}"
            try:
                # try to use provided weight
                W = weights[pkey] # NOTE: should copy
            except:
                # if not provided ...
                n_in = neuron_numbers_in_layers[l_in]
                n_out = neuron_numbers_in_layers[l_out]
                W = [[random.gauss(0, 0.1) for j in range(n_out)] 
                     for i in range(n_in)] # see init above
            self.weights[pkey] = W
            self.activ_fn[pkey] = math.tanh # you may want to try your own

        
    def forward(self, x):
        """
        DO computations:
        
        Compute the network output layer by layer. "forward" is a conventional
        term for execute computing of a net.
        :param x: a list of list: x[i], sample-i, having a number of input attributes
          if you have only one sample, input it as 
              [[0, 1, 0]], 
              NOT [0, 1, 0]
        """
        layer_input = x
        for l_in in range(self.layer_num):
            # Use layer-in to process x and get what's to feed to layer-out
            # Setup 
            l_out = l_in + 1
            pkey = f"in{l_in}out{l_out}"
            # Compute the weighted sum 
            layer_pre_activation = matmul(layer_input, self.weights[pkey])
            # Perform activation(the construction below is equivalent to nested loops)
            layer_out = list(map(lambda x_:list(map(self.activ_fn[pkey], x_)), 
                            layer_pre_activation))
            layer_input = layer_out # feed the output of this layer to the next layer
        return layer_out
    
    
class VisWrap:
    "Wrap I/O for the new network object for visualisation"
    def __init__(self, nn):
        self.nn = nn
        
    def forward(self, x):
        return self.nn.forward([x])[0][0]

In [0]:
nn2 = NeuralNetV2()
nn2_wrap = VisWrap(nn2)
simple_visualise(nn2_wrap)

In [0]:
# Let's try to adjust the weights. Carefully keep the number of weights 
# consistent with the number of neurons you had set to the layers.
nn2 = NeuralNetV2(
    neuron_numbers_in_layers=[2, 3, 1],
    weights={"in0out1":[[0, 0.5, 1], [-1, -5, 2]],
             "in1out2":[[-1], [+1], [0.5]]})
nn2.activ_fn["in1out2"] = lambda x:max(0, x)
nn2_wrap = VisWrap(nn2)
simple_visualise(nn2_wrap)

## 1.3 Training Neural Nets via Backprop

It is not trivial to come up with a simple rule to adjust all the parameters in the entire neural network stucture (recall when we consider a single perceptron, we did propose intuitive scheme to improve the fitness of the model to data). 

The idea is to take a divide-and-conquer scheme. Let's take a careful look at the computation in one perceptron. Throw the machine learning terminology in wind and focus on the computation steps only.
$$
\begin{align}
a &\leftarrow w_0 \cdot x_0 + w_1 \cdot x_1 \\
h &\leftarrow g(a) \\
Loss &\leftarrow Compare(h, y)
\end{align}
$$

Let us think about the statement "to make the loss smaller" with a bit care: which specific means we could possibily take to "make" the final loss change? During training, we change the model parameters, including $w_{i,j}$. So we need to know the influence on the final loss of each model parameter. In this section we will examine an example of so-called "backpropagation" process.

### 1.3.1 Adjust Parameters to Modify Model Behaviour

Given training data $\{(x_1, y_1), (x_2, y_2), \dots\}$, we would like the net to predict for each $x_i$ the target value $y_i$. If this is the case, then our mission has completed. Of course, this is generally NOT the case if we start from a random set of model parameters.

For example, if we have one training sample $(x=(2.5, 2), y=1.0)$, let us compare the prediction given by the network above and the target value $y=1.0$: 

In [0]:
# First, check the current output of the net
def sigm(x):
    return 1/(1+math.exp(-x))

nn2 = NeuralNetV2(
    neuron_numbers_in_layers=[2, 3, 1],
    weights={"in0out1":[[0, 0.5, 1], [-1, -5, 2]],
             "in1out2":[[-1], [+1], [0.5]]})
nn2.activ_fn["in0out1"] = sigm
nn2.activ_fn["in1out2"] = sigm

nn2_wrap = VisWrap(nn2)
simple_visualise(nn2_wrap)

Let's check the nets behaviour at one data:

In [0]:
nn2.forward([(2.5, 2)])

This is smaller than the desired output 1.0. So we want the output to increase at this $x$. Let's learn how to adjust the network using gradients computed through backprop.

### 1.3.2 Manual Backprop Through Layers

Let us implement the backpropagation for a three layer simple net. 

#### Helper Functions

In [0]:
##############################################################
# HELPER FUNCTIONS
# You do NOT need to learn those to USE modern neural networks
# Those functions provide basic array functions in LOW efficiency
# but clear manner. You may want to check them if you want to
# UNDERSTAND the technical details of NN.
# --------
# First define sigmoid gradient
import math
def sigm(x): # redefine here for reference
    return 1/(1+math.exp(-x))

def gsigmoid(x):
    return math.exp(-x)/(1+math.exp(-x))**2

def elementwise_apply(f, nested_list):
    return list(map(lambda x_:list(map(f, x_)), nested_list))

def elementwise_times(nested_list1, nested_list2):
    return [[a * b for a, b in zip(r1, r2)] 
            for r1, r2 in zip(nested_list1, nested_list2)]

def mat_tr(nested_list):
    return [c for c in zip(*nested_list)]

def shape(nested_list):
    return len(nested_list), len(nested_list[0])
##############################################################

In [0]:
# UNIT Test: gsigmoid -- Test the others.
test_x = [-3, -1, 0, 1, 3, 5]
test_eps = 1e-4
for x_ in test_x:
    numerical_diff = (sigm(x_ + test_eps) - sigm(x_)) / test_eps
    analytic_diff = gsigmoid(x_)
    print(f"NumDiff: {numerical_diff:.3f} ~ AnaDiff: {analytic_diff:.3f}")

#### Backprop

Below I explicily write out the forward method followed by a backward computation.

In [0]:
def special_forward(nn, x):
    """
    This is a special version for manual implementing and testing the backpropagation algorithm. 
    We only use the network 
    We don't use the computation and activation of network `nn` 
    """
    
    # Copy-and-paste and simplify forward computation here:
    layer1_input = x
    layer1_pre_activation = matmul(layer1_input, nn.weights["in0out1"])
    layer1_out = elementwise_apply(sigm, layer1_pre_activation)
    
    layer2_input = layer1_out # feed the output of this layer to the next layer
    
    layer2_pre_activation = matmul(layer2_input, nn.weights["in1out2"])
    layer2_out = elementwise_apply(sigm, layer2_pre_activation)

    layer2_pre_activation_g = elementwise_apply(gsigmoid, layer2_pre_activation)
    w12_g = matmul(mat_tr(layer2_input), layer2_pre_activation_g)
    layer1_out_g = matmul(layer2_pre_activation_g, mat_tr(nn.weights["in1out2"]))
    layer1_pre_activation_g = elementwise_times(
        elementwise_apply(gsigmoid, layer1_pre_activation),
        layer1_out_g)
    w01_g = matmul(mat_tr(layer1_input), layer1_pre_activation_g)

    return layer2_out, w01_g, w12_g
    
# TODO: Mark this somewhere else: this Les Mis is FUNNY! https://www.youtube.com/watch?v=dF495ERjRUo

In [0]:
out, w01_g, w12_g = special_forward(nn2, [[2.5, 2]])

#### Numerical verification

Next, let us check element by element how does our backprop work. The plan is to adjust each adjustable parameter a bit and check the change of the final output.

In [0]:
# test adjusting weights in0out1
wkey = "in0out1"
w_rows, w_cols = shape(nn2.weights[wkey])
numerical_g = [[0 for c in range(w_cols)] for r in range(w_rows)]
test_eps = 1e-4
test_x = [[2.5, 2]]
old_out = nn2.forward(test_x)

for r in range(w_rows):
    for c in range(w_cols):
        old_value = nn2.weights[wkey][r][c] # save the old value to put back after test        
        nn2.weights[wkey][r][c] += test_eps
        new_out = nn2.forward(test_x)
        diff = new_out[0][0] - old_out[0][0] # check the effect of adjusting the corresponding parameter
        numerical_g[r][c] = diff / test_eps
        # put the old value back
        nn2.weights[wkey][r][c] = old_value
        

In [0]:
print("numerical differential")
print(numerical_g)
print("analytical differential")
print(w01_g)

__EXECISE__ [Optional]

1. Read the code and Explain what you had observed.
2. Check the computation for weights transforms data from layer 1 to 2.
3. Integrate the backpropagation function into the network class.



### 1.3.3 Using Computational Framework

Modern framework allows us to easily perform all the steps above. The example above can be reformulated as

In [0]:
import torch
import torch.nn as nn

In [0]:
class MyNN(nn.Module):
    def __init__(self, neuron_numbers_in_layers=[2, 3, 1]):
        super(MyNN, self).__init__()
        
        self.layers = nn.ModuleList(
            [nn.Linear(in_features=nin, out_features=nout)
             for nin, nout in zip(neuron_numbers_in_layers[:-1], 
                                  neuron_numbers_in_layers[1:])])
        
    def forward(self, x):
        h = x
        for l in self.layers:
            h = torch.tanh(l(h))
        return h
    
class TorchVisWrap:
    def __init__(self, nn):
        self.nn = nn
    def forward(self, x):
        y = self.nn(torch.Tensor(x).unsqueeze(dim=0))
        return y.item()
        

In [0]:
torch.manual_seed(42)
nn3 = MyNN([2, 6, 1])
nn3_wrap = TorchVisWrap(nn3)
simple_visualise(nn3_wrap)

Let us perform training, call it changing a neural network behaviour or searching in the hypotheses space of neural networks, it is up to your viewpoint. Eg. We want the net to generate
    + for (4, -4)
    - for (4, 4)
    + for (-4, 4)
    - for (-4, -4)


In [0]:
from torch.optim import Adam
optim = Adam(nn3.parameters(), lr=0.01) # manager: adjust params according to grads

In [0]:
train_steps = 50
visualise_every_n_steps = 10
trn_X = torch.Tensor([[4, -4], [4, 4], [-4, 4], [-4, -4]])
trn_y = torch.Tensor([[1.0], [-1.0], [1.0], [-1.0]])
for it in range(train_steps):
    loss = ((trn_y - nn3(trn_X))**2).sum()
    optim.zero_grad() # reset gradients (to clear computed gradients from previous steps)
    loss.backward() # In one stroke, all gradients are computed!
    optim.step()  # apply the gradients to the parameters
    if it % visualise_every_n_steps == 0:
        # Check the effect
        simple_visualise(nn3_wrap)

# 2 Deep NN

## 2.0 Prepare Environment

### Prepare LaTeX and other rendering software.

In [0]:
!wget -qO- "https://yihui.name/gh/tinytex/tools/install-unx.sh" | sh
!/root/bin/tlmgr install standalone preview babel-english setspace doublestroke relsize jknapltx rsfs fundus-calligra wasysym xcolor ragged2e physics microtype platex-tools ms
!/root/bin/tlmgr install dvisvgm
!ln -s /root/.TinyTeX/bin/x86_64-linux/dvisvgm /usr/bin/
!ln -s /root/bin/latex /usr/bin/

In [0]:
try:
    import cairo
except ImportError:
    !apt-get install libcairo2-dev
    !pip3 install pycairo
try:
    from manimlib.scene.scene import Scene
except ImportError:    
    !pip install manimlib

import math
import random
import torch
import torchvision
import torch.nn as nn

Renderable video in notebook.

Use

https://stackoverflow.com/questions/57377185/how-play-mp4-video-in-google-colab

to make

https://github.com/krassowski/jupyter-manim/blob/master/jupyter_manim/__init__.py

work in colab

In [0]:
#@title
import os
import sys
from contextlib import ExitStack, suppress, redirect_stdout, redirect_stderr
from io import StringIO
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest.mock import patch
from warnings import warn

import manimlib
from IPython import get_ipython
from IPython.core.magic import Magics, magics_class, cell_magic
from IPython.display import HTML
from base64 import b64encode

import json


_renderer_code = """
from manimlib.imports import *
import copy
import json
with open("tmpscript.json", "r") as f:
    vscript = json.load(f)

def add_list_to_dict(d, k, l):
    if d.get(k) is None:
        d[k] = l
    else:
        d[k] += l
        
class RendererMAnimV2(Scene):
    CONFIG = {
        "x_min": -4,
        "x_max": 4,
        "y_min": -3,
        "y_max": 3,
    }

    def construct(self):
        ######Code######
        #Making shapes
        gobjs = {}
        anims = []
        # first pass, analysis graph links and arrange the graph

        
        node_create_anim = {}
        value_ready_anim = {}
        forward_anim = {}
        message_ready_anim = {}
        backward_anim = {}

        for a in vscript:
            if a['graph_object_type'] in ["DataNode", "OpNode"]:
                a['x'] = -4 + 8 * a['x']
                a['y'] = -3 + 6 * a['y']


        for a in vscript:
            if a['graph_object_type'] == "DataNode":
                # Create a data node
                g = Circle(radius=0.2, color=WHITE)\
                    .shift(np.array([a['x'], a['y'], 0]))
                data_val = a['data']
                g_data = TextMobject(f"{data_val:.3g}")\
                    .scale(0.5)\
                    .shift(g.get_corner(UR))
                gobjs[a['nodeid']] = g
                gobjs[a['nodeid']+'_data'] = g_data
                

                if a['text'] == a['nodeid']: # no-name
                    g_t = TextMobject(a['nodeid'])\
                        .next_to(g, UP).scale(0.3)\
                        .set_color("#00ff00")
                else:
                    g_t = TextMobject(a['text'])\
                        .shift(g.get_center()).scale(0.4)
                create_animations_1 = [
                    ShowCreation(g), 
                    Write(g_t)]

                from_id = a['from_ids'][0]
                g_from = gobjs.get(from_id)
                if g_from is None:
                    create_animations_2 = [ShowCreation(g_data)]
                else:
                    g_from_resu = gobjs[from_id+'_resu']
                    g_link = Line(g_from.get_edge_center(RIGHT), 
                        g.get_edge_center(LEFT))
                    create_animations_2 = [
                        FadeOutAndShift(g_from_resu, 
                            g.get_center() - g_from_resu.get_center()),
                        ShowCreation(g_link),
                        ShowCreation(g_data),
                    ]

                if a['animation_type'] == "Create":
                    anims += [create_animations_1, create_animations_2]
                    agrp = a['depend_ord']
                    add_list_to_dict(node_create_anim, agrp, create_animations_1)
                    add_list_to_dict(value_ready_anim, agrp, create_animations_2)
                    

            elif a['graph_object_type'] == "OpNode":
                g = Square(side_length=0.2, color=GREEN)\
                    .shift(np.array([a['x'], a['y'], 0]))
                g_t = TextMobject(a['text'])\
                    .shift(g.get_center()).scale(0.4)
                gobjs[a['nodeid']] = g

                op_create_animations = [ShowCreation(g), Write(g_t)]
                for from_id in a['from_ids']:
                    g_from = gobjs[from_id]
                    g_link = Line(
                        g_from.get_edge_center(RIGHT), 
                        g.get_edge_center(LEFT))
                    op_create_animations.append(ShowCreation(g_link))

                if a['animation_type'] == "Create":
                    anims.append(op_create_animations)
                    agrp = a['depend_ord']
                    add_list_to_dict(node_create_anim, agrp, op_create_animations)

            elif a['graph_object_type'] == "Link":
                g_from = gobjs[a['from_id']] #type: Mobject
                g_to = gobjs[a['to_id']]
                g = Line(g_from.get_edge_center(RIGHT), g_to.get_edge_center(LEFT))
                gobjs[(a['from_id'], a['to_id'])] = g
                if a['animation_type'] == "Create":
                    anims[-1] += [ShowCreation(g)]

            elif a['graph_object_type'] == "Message":
                # animate inputs
                input_message_animations = []
                output_data_animations = []
                for va_out, to_id in zip(a['out_values'], a['to_ids']):
                    for va_in, from_id in zip(a['in_values'], a['from_ids']):
                        g_to = gobjs[to_id]
                        g_input_data_cp = copy.deepcopy(gobjs[from_id+'_data'])
                        
                        input_message_animations.append(
                            FadeOutAndShift(g_input_data_cp, 
                                g_to.get_center() - g_input_data_cp.get_center())
                        )
                        
                    g_output_resu = TextMobject(f"{va_out:.3g}")\
                        .scale(0.5)\
                        .shift(g_to.get_corner(UR))

                    output_data_animations.append(
                        ShowCreation(g_output_resu)
                    )
                    gobjs[to_id+'_resu'] = g_output_resu 
                
                if a['animation_type'] == "Forward":
                    anims.append(input_message_animations)
                    anims.append(output_data_animations)
                    agrp = a['depend_ord']
                    add_list_to_dict(forward_anim, agrp, input_message_animations)
                    add_list_to_dict(value_ready_anim, agrp, output_data_animations)


            elif a['graph_object_type'] == "GradNode":
                g_node_id = a['nodeid']
                g_grad_id = g_node_id + '_grad'
                g_grad_old = gobjs.get(g_grad_id)

                grad_val = a['grad']
                g_node = gobjs[g_node_id] # type: Mobject
                g_grad_new = TextMobject(f"{grad_val:.3g}")\
                    .scale(0.5)\
                    .shift(g_node.get_corner(UL))
                gobjs[g_grad_id] = g_grad_new

                grad_node_anim = ShowCreation(g_grad_new) if g_grad_old is None \
                    else Transform(g_grad_old, g_grad_new)

                if a['animation_type'] == "Create":
                    anims.append([grad_node_anim])
                    agrp = a['depend_ord']
                    add_list_to_dict(message_ready_anim, agrp, [grad_node_anim])

            elif a['graph_object_type'] == "GMessage":
                bprop_animations = []
                g_from = gobjs[a['from_ids'][0]]
                for va, to_id in zip(a['values'], a['to_ids']):
                    g_to = gobjs[to_id]
                    g_back_message = TextMobject(f"{va:.3g}")\
                        .scale(0.5)\
                        .shift(g_from.get_corner(UL))
                    bprop_animations.append(
                        FadeOutAndShift(g_back_message,
                            g_to.get_corner(UR) - g_back_message.get_center())
                    )
                anims.append(bprop_animations)
                agrp = a['depend_ord']
                add_list_to_dict(backward_anim, agrp, bprop_animations)
            else:
                pass



        #Showing shapes
        play_short = True
        if play_short:
            agroups = sorted(np.unique([_['depend_ord'] for _ in vscript]))
            for agrp in agroups:
                for anim_type in \
                    [node_create_anim, forward_anim, value_ready_anim]:
                    if anim_type.get(agrp) is not None:
                        self.play(*anim_type[agrp])
            for agrp in reversed(agroups):
                for anim_type in [backward_anim, message_ready_anim]:
                    if anim_type.get(agrp) is not None:
                        self.play(*anim_type[agrp])
        else:
            for a in anims:
                self.play(*a)
"""

__version__ = 0.11

std_out = sys.stdout


def video(path, width=854, height=480, controls=True, autoplay=True):
    mp4 = open(path,'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

    return HTML(f"""
    <video
      width="{width}"
      height="{height}"
      autoplay="{'autoplay' if autoplay else ''}"
      {'controls' if controls else ''}
    >
        <source src="{data_url}" type="video/mp4">
    </video>
    """)


class StringIOWithCallback(StringIO):

    def __init__(self, callback, **kwargs):
        super().__init__(**kwargs)
        self.callback = callback

    def write(self, s):
        super().write(s)
        self.callback(s)


@magics_class
class ManimMagics(Magics):
    path_line_start = 'File ready at '

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.defaults = {
            'autoplay': True,
            'controls': True,
            'silent': True,
            'width': 854,
            'height': 480
        }

    video_settings = {'width', 'height', 'controls', 'autoplay'}
    magic_off_switches = {
        'verbose': 'silent',
        'no-controls': 'controls',
        'no-autoplay': 'autoplay'
    }

    @cell_magic
    def manim(self, line, cell):
        # execute the code - won't generate any video, however it will introduce
        # the variables into the notebook's namespace (enabling autocompletion etc);
        # this also allows users to get feedback on some code errors early on
        get_ipython().ex(cell)
        g_animator.arrange_nodes()
        with open("tmpscript.json", "w") as f:
            json.dump(g_animator.plot_script, f, indent=2)

        line = "RendererMAnimV2 " + line
        user_args = line.split(' ')
#         script_code = f"""import json
# vscript = json.loads('''""" + \
# json.dumps(g_animator.plot_script, indent=2) + """''')
# """
#         print("**** line")
#         print(line)
#         print("**** animation script")
#         print(script_code + _renderer_code)
        
        # path of the output video
        path = None

        settings = self.defaults.copy()

        # disable the switches as indicated by the user
        for key, arg in self.magic_off_switches.items():
            if '--' + key in user_args:
                user_args.remove('--' + key)
                settings[arg] = False

        resolution_index = (
            user_args.index('-r') if '-r' in user_args else
            user_args.index('--resolution') if '--resolution' in user_args else
            None
        )
        if resolution_index is not None:
            # the resolution is passed as "height,width"
            try:
                h, w = user_args[resolution_index + 1].split(',')
                settings['height'] = h
                settings['width'] = w
            except (IndexError, KeyError):
                warn('Unable to retrieve dimensions from your resolution setting, falling back to the defaults')

        silent = settings['silent']

        def catch_path_and_forward(lines):
            nonlocal path
            for line in lines.split('\n'):
                if not silent:
                    print(line, file=std_out)

                if line.startswith(self.path_line_start):
                    path = line[len(self.path_line_start):].strip()

        # Using a workaround for Windows permissions issue as in this questions:
        # https://stackoverflow.com/q/15169101
        #f = NamedTemporaryFile('w', suffix='.py', delete=False)
        #

        with open("tmprender.py", "w") as f:
            f.write(_renderer_code)

    
        try:
            # f.write(cell)
            # f.write(_renderer_code)
            # f.close()
            # args = ['manim', f.name, *user_args]
            args = ["manim", "tmprender.py", *user_args]
            

            stdout = StringIOWithCallback(catch_path_and_forward)

            with ExitStack() as stack:

                enter = stack.enter_context

                enter(patch.object(sys, 'argv', args))
                enter(suppress(SystemExit))
                enter(redirect_stdout(stdout))

                if silent:
                    stderr = StringIO()
                    enter(redirect_stderr(stderr))

                manimlib.main()
        finally:
            os.remove(f.name)

        if path:
            path = Path(path)
            assert path.exists()

            # To display a video in Jupyter, we need to have access to it
            # so it has to be within the working tree. The absolute paths
            # are usually outside of the accessible range.
            relative_path = path.relative_to(Path.cwd())

            video_settings = {
                k: v
                for k, v in settings.items()
                if k in self.video_settings
            }

            return video(relative_path, **video_settings)
        else:
            just_show_help = '-h' in user_args or '--help' in user_args

            if not just_show_help:
                warn('Could not find path in the manim output')

            # If we were silent, some errors could have been silenced too.
            if silent:
                # Let's break the silence:
                print(stdout.getvalue())
                print(stderr.getvalue(), file=sys.stderr)


ip = get_ipython()
ip.register_magics(ManimMagics)


In [0]:
#@title
########################################
# Advanced logger
import numpy as np
class AnimatorSceneV2:
    def __init__(self):
        self.nodes = {}
        self.edges = {}
        self.x_alloc = 0
        self.plot_script = []
        
    def make_animation_create(self, obj):
        new_node = None
        if isinstance(obj, NeuData):
            from_id = None
            depend_ord = 0
            if obj.from_op is not None:
                from_id = obj.from_op.id_
                depend_ord = self.nodes[from_id]['depend_ord'] + 1
            self.nodes[obj.id_] = dict(depend_ord=depend_ord)
            self.plot_script.append(dict(
                graph_object_type="DataNode",
                nodeid=obj.id_,
                depend_ord=depend_ord,
                from_ids=[from_id,],
                text=obj.name,
                data=obj.data,
                animation_type="Create"))
        elif isinstance(obj, UnaryNeuOp):
            oprand_key = obj.oprand.id_
            oprand_node = self.nodes.get(oprand_key)
            assert oprand_node is not None, \
                ValueError("Oprand node missing in animation")
            
            # make node
            depend_ord = oprand_node['depend_ord'] + 1
            self.nodes[obj.id_] = dict(depend_ord=depend_ord)
            self.plot_script.append(dict(
                graph_object_type="OpNode",
                nodeid=obj.id_,
                from_ids=[oprand_key,],
                depend_ord=depend_ord,
                text=obj.name,
                animation_type="Create"))
            
        elif isinstance(obj, BinaryNeuOp):
            oprand_key1 = obj.oprand1.id_
            oprand_node1 = self.nodes.get(oprand_key1)
            assert oprand_node1 is not None, \
                ValueError("Oprand node missing in animation")
            oprand_key2 = obj.oprand2.id_
            oprand_node2 = self.nodes.get(oprand_key2)
            assert oprand_node2 is not None, \
                ValueError("Oprand node missing in animation")
            
            # make node
            depend_ord = max(oprand_node1['depend_ord'], oprand_node2['depend_ord']) + 1
            self.nodes[obj.id_] = dict(depend_ord=depend_ord)
            self.plot_script.append(dict(
                graph_object_type="OpNode",
                nodeid=obj.id_,
                from_ids=[oprand_key1, oprand_key2],
                depend_ord=depend_ord,
                text=obj.name,
                animation_type="Create"))
            
    def make_animation_forward(self, op_obj, resu):
        if isinstance(op_obj, UnaryNeuOp):
            oprand_key = op_obj.oprand.id_
            oprand_node = self.nodes.get(oprand_key)
            assert oprand_node is not None, \
                ValueError("Oprand nodes missing in animation")
            self.plot_script.append(dict(
                graph_object_type="Message",
                from_ids=[oprand_key],
                to_ids=[op_obj.id_],
                depend_ord = self.nodes[op_obj.id_]['depend_ord'],
                in_values=[op_obj.oprand.data],
                out_values=[resu],
                animation_type="Forward"))
            
        elif isinstance(op_obj, BinaryNeuOp):
            oprand1_key = op_obj.oprand1.id_
            oprand2_key = op_obj.oprand2.id_
            assert (self.nodes.get(oprand1_key) is not None) and\
                (self.nodes.get(oprand2_key) is not None), \
                ValueError("Oprand nodes missing in animation")
            self.plot_script.append(dict(
                graph_object_type="Message",
                from_ids=[oprand1_key, oprand2_key],
                to_ids=[op_obj.id_],
                depend_ord = self.nodes[op_obj.id_]['depend_ord'],
                in_values=[op_obj.oprand1.data, op_obj.oprand2.data],
                out_values=[resu],
                animation_type="Forward"))
            
    def make_animation_backward(self, obj, msgs):
        if isinstance(obj, NeuData):
            self.plot_script.append(dict(
                graph_object_type="GradNode",
                nodeid=obj.id_, # == msgs[0]['node_id']
                depend_ord = self.nodes[obj.id_]['depend_ord'],
                grad=msgs[0]['msg'],
                animation_type="Create"))
        elif isinstance(obj, NeuOp):
            self.plot_script.append(dict(
                graph_object_type="GMessage",
                from_ids=[obj.id_,],
                to_ids=[m['back_node_id'] for m in msgs],
                depend_ord = self.nodes[obj.id_]['depend_ord'],
                values=[m['msg'] for m in msgs],
                animation_type="Backward"))
            
    def arrange_nodes(self):
        nodes = [_ for _ in self.plot_script
            if _['graph_object_type'] in ["DataNode", "OpNode"]]
        group_ids = [_["depend_ord"] for _ in nodes]
        group_num = np.unique(group_ids).size
        
        # adjust nodes dependency order: so to bring-forward some nodes 
        # close to where they are referred to
        for nd in nodes:
            nd_sub = [_ for _ in nodes if nd["nodeid"] in _["from_ids"]]
            if len(nd_sub) > 0:
                new_ord = max(nd["depend_ord"], max([_["depend_ord"]-1 for _ in nd_sub]))
                if new_ord > nd["depend_ord"]:
                    print(f'{nd["nodeid"]}:{nd["depend_ord"]}->{new_ord}')
                nd["depend_ord"] = new_ord

        # allocate screen areas for each group in computational dependency
        x_ticks = np.linspace(0, 1, group_num+1)
        group_areas = [dict(bottom=0, top=1, left=x0, right=x1) 
                       for x0, x1 in zip(x_ticks[:-1], x_ticks[1:])]

        # in each area arrange the group members
        for nd in nodes:
            nd['sub_group'] = []
        for gid, a in zip(reversed(sorted(np.unique(group_ids))), reversed(group_areas)):
            group_nodes = [_ for _ in nodes if _["depend_ord"] == gid]
            y_locs = np.linspace(0, 1, len(group_nodes) + 2)[1:-1]

            # sort nodes in a group according how they will be used to produce next group of nodes
            for i, nd in enumerate(sorted(group_nodes,  
                                          key=lambda nd:sum(nd['sub_group']) \
                                          / max(len(nd['sub_group']), 1e-6))):
                # affect the order of nodes in proceeding group
                for nd_ in nodes:
                    if nd_['nodeid'] in nd['from_ids']:
                        nd_['sub_group'].append(i)

                # set up self position
                nd['x'] = (a['left'] + a['right']) / 2
                nd['y'] = y_locs[i]
        return self.plot_script

### Libraries and Helpers

In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as fn
import PIL
from PIL.Image import Image
from torchvision.datasets import MNIST, CIFAR10
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.dataset import Subset
from torchvision.transforms import Compose, ToTensor, Normalize, ToPILImage
from torch.optim import Adam, SGD
mnist_dataset_trn = MNIST(root='./data', download=True, train=True,
                          transform=ToTensor())
mnist_dataset_tst = MNIST(root='./data', download=True, train=False,
                          transform=ToTensor())
mnist_train_loader = DataLoader(mnist_dataset_trn, batch_size=4)
mnist_test_loader = DataLoader(mnist_dataset_tst, batch_size=4, shuffle=True)
mnist_examples = mnist_dataset_trn[0]
device = torch.device("cuda:0") if torch.cuda.is_available \
    else torch.device("cpu")

def evaluate_model(model, tst_dataloader, max_iter=None):
    total_corr = 0
    total_num = 0
    model.eval()
    for i, (x, y) in enumerate(tst_dataloader):
        with torch.no_grad():
            pred_class_p = model(x.to(device))
        corr_num = (torch.argmax(pred_class_p, dim=1).cpu() == y).sum()
        total_corr += corr_num.item()
        total_num += len(y)
        if max_iter is not None:
            if i >= max_iter:
                break
    accu = total_corr / total_num
    return accu

def simple_logger_factory(tst_dataloader=None):
    def logger(model, info):
        print(f"Epoch {info['epoch']}, Iter {info['iter']}, " +
            f"Train Loss {info['loss']}")
    def logger_eval(model, info):
        test_accu = evaluate_model(model, tst_dataloader)
        print(f"Epoch {info['epoch']}, Iter {info['iter']}, " +
            f"Train Loss {info['loss']} Test Accuracy {test_accu}")
    return logger if tst_dataloader is None else logger_eval

simple_mnist_logger = simple_logger_factory(mnist_test_loader)
    
def train_model(model, trn_dataloader, start_iter=0,
                max_iters=None, max_epoches=1, 
                trainer_class=Adam,
                evaluate_every_n_iters=1000,
                save_check_point_every_n_iters=10000,
                evaluate_callback_fn=simple_logger_factory(),
                **kwargs):
    optim = trainer_class(model.parameters(), **kwargs)
    model.train()
    epoches = 0
    iters = start_iter
    running_loss = 0
    while True:
        for i, (x, y) in enumerate(trn_dataloader):
            optim.zero_grad()
            pred_class_p = model(x.to(device))
            loss = fn.nll_loss(pred_class_p, y.to(device))
            loss.backward()
            optim.step()

            iters += 1
            running_loss += loss.item() 
            if iters % evaluate_every_n_iters == 0:
                evaluate_callback_fn(
                    model, dict(epoch=epoches, iter=iters, 
                                loss=running_loss / evaluate_every_n_iters))
                running_loss = 0
            if max_iters is not None and iters >= max_iters:
                break

        epoches += 1
        if epoches >= max_epoches:
            break
        
    return

def count_param_num(model):
    n = 0
    for p in model.parameters():
        n += p.numel()
    return n

## 2.1 Elements of Deep Neural Networks

In [0]:
# Example of classifying hand-written digits.
x, y = mnist_dataset_trn[42]
print(f"An example of digit {y}")
ToPILImage()(x)

### 2.1.1 Shallow Network

In [0]:
# A "shallow" network
class Net1(nn.Module):
    def __init__(self):
        super(Net1, self).__init__()
        self.lin1 = nn.Linear(in_features=28*28, out_features=64)
        self.lin2 = nn.Linear(in_features=64, out_features=10)
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.relu(self.lin1(x.view(n, -1)))
        y = fn.log_softmax(self.lin2(h), dim=1)
        return y
nn1 = Net1().to(device)
train_model(nn1, mnist_train_loader, evaluate_callback_fn=simple_mnist_logger)

### 2.1.2 Go Deeper

In [0]:
# A naive deep network
class NetD(nn.Module):
    def __init__(self):
        super(NetD, self).__init__()
        self.lin1 = nn.Linear(in_features=28*28, out_features=64)
        self.lin2 = nn.Linear(in_features=64, out_features=64)
        self.lin3 = nn.Linear(in_features=64, out_features=64)
        self.lin4 = nn.Linear(in_features=64, out_features=64)
        self.lin5 = nn.Linear(in_features=64, out_features=10)
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.relu(self.lin1(x.view(n, -1)))
        h = fn.relu(self.lin2(h))
        h = fn.relu(self.lin3(h))
        h = fn.relu(self.lin4(h))
        h = self.lin5(h)
        y = fn.log_softmax(h, dim=1)
        return y
dnn1 = NetD().to(device)
train_model(dnn1, mnist_train_loader, evaluate_callback_fn=simple_mnist_logger)

### 2.1.3 Essentials: Convolutional Net

Paper: [link](http://vision.stanford.edu/cs598_spring07/papers/Lecun98.pdf); [link-2](http://yann.lecun.com/exdb/publis/pdf/lecun-99.pdf)

In [0]:
# A naive deep network
class NetCD(nn.Module):
    def __init__(self):
        super(NetCD, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3)
        self.lin4 = nn.Linear(in_features=32*3*3, out_features=64)
        self.lin5 = nn.Linear(in_features=64, out_features=10)
        
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.max_pool2d(fn.relu(self.conv1(x)), kernel_size=2)
        h = fn.max_pool2d(fn.relu(self.conv2(h)), kernel_size=2)
        h = fn.relu(self.conv3(h)).view(n, -1)
        h = fn.relu(self.lin4(h))
        h = self.lin5(h)
        y = fn.log_softmax(h, dim=1)
        return y

dnn2 = NetCD().to(device)
train_model(dnn2, mnist_train_loader, evaluate_callback_fn=simple_mnist_logger)

In [0]:
count_param_num(dnn2), count_param_num(dnn1), count_param_num(nn1)

### 2.1.4 Essentials: Direct Link

ResNet Paper: [link](https://arxiv.org/abs/1512.03385)

Tutorial Video:

In [0]:
#@title
from IPython.display import YouTubeVideo
YouTubeVideo('XPpmzulEmjA', width=600, height=400)

In [0]:
# A naive deep network
class NetCDR(nn.Module):
    def __init__(self):
        super(NetCDR, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.lin4 = nn.Linear(in_features=576, out_features=64)
        self.lin5 = nn.Linear(in_features=64, out_features=10)
        
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.max_pool2d(fn.relu(self.conv1(x)), kernel_size=2)
        h = fn.max_pool2d(fn.relu(self.conv2(h)) + h , kernel_size=2)
        h = (fn.relu(self.conv3(h)) + h).view(n, -1)
        h = fn.relu(self.lin4(h))
        h = self.lin5(h)
        y = fn.log_softmax(h, dim=1)
        return y

dnn3 = NetCDR().to(device)
train_model(dnn3, mnist_train_loader, evaluate_callback_fn=simple_mnist_logger)

### 2.1.5 Essentials: Optimiser

Adam Paper: [link](https://arxiv.org/abs/1412.6980)

Overview tutorial: [link](http://ruder.io/optimizing-gradient-descent/)

Tutorial Video:

In [0]:
#@title
from IPython.display import YouTubeVideo
YouTubeVideo('JXQT_vxqwIs', width=600, height=400)

In [0]:
nn1a = Net1().to(device)
train_model(nn1a, mnist_train_loader, evaluate_every_n_iters=100, 
            max_iters=2000,
            trainer_class=Adam, lr=0.0001)

nn1b = Net1().to(device)
train_model(nn1b, mnist_train_loader, evaluate_every_n_iters=100, 
            max_iters=2000,
            trainer_class=SGD, lr=0.0001) 

### 2.1.6 Essentials: Mini-batches

In [0]:
nn1a = Net1().to(device)
mnist_train_loader128 = DataLoader(mnist_dataset_trn, batch_size=128)
train_model(nn1a, mnist_train_loader128, evaluate_every_n_iters=10, 
            max_iters=100, trainer_class=Adam, lr=0.0001)

nn1b = Net1().to(device)
mnist_train_loader4 = DataLoader(mnist_dataset_trn, batch_size=4)
train_model(nn1b, mnist_train_loader128, evaluate_every_n_iters=80, 
            max_iters=800, trainer_class=Adam, lr=0.0001)


### 2.1.7 Essentials: Batch Normalisation

BatchNorm Paper: [link](https://arxiv.org/abs/1502.03167)

In [0]:
# A naive deep network
class NetCDRB(nn.Module):
    def __init__(self):
        super(NetCDRB, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        self.bn12 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.bn23 = nn.BatchNorm2d(16)
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.bn34 = nn.BatchNorm2d(16)
        self.lin4 = nn.Linear(in_features=576, out_features=64)
        self.lin5 = nn.Linear(in_features=64, out_features=10)
        
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.max_pool2d(fn.relu(self.conv1(x)), kernel_size=2)
        h = self.bn12(h)
        h = fn.max_pool2d(fn.relu(self.conv2(h)) + h , kernel_size=2)
        h = self.bn23(h)
        h = self.bn34(fn.relu(self.conv3(h)) + h).view(n, -1)
        h = fn.relu(self.lin4(h))
        h = self.lin5(h)
        y = fn.log_softmax(h, dim=1)
        return y

dnn4 = NetCDRB().to(device)
train_model(dnn4, mnist_train_loader, evaluate_callback_fn=simple_mnist_logger)

### 2.1.7 Essentials: Regularisation by Dropout

Paper [link](http://jmlr.org/papers/volume15/srivastava14a.old/srivastava14a.pdf)


In [0]:
#@title
from IPython.display import YouTubeVideo
YouTubeVideo('vAVOY8frLlQ', width=600, height=400)

In [0]:
# A naive deep network
class NetCDRD(nn.Module):
    def __init__(self):
        super(NetCDRD, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        self.dropout1 = nn.Dropout2d(inplace=True)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.dropout2 = nn.Dropout2d(inplace=True)
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1)
        self.lin4 = nn.Linear(in_features=576, out_features=64)
        self.lin5 = nn.Linear(in_features=64, out_features=10)
        
    
    def forward(self, x):
        n = x.shape[0]
        h = fn.max_pool2d(fn.relu(self.conv1(x)), kernel_size=2)
        h = self.dropout1(h)
        h = fn.max_pool2d(fn.relu(self.conv2(h)) + h , kernel_size=2)
        h = self.dropout2(h)
        h = (fn.relu(self.conv3(h)) + h).view(n, -1)
        h = fn.relu(self.lin4(h))
        h = self.lin5(h)
        y = fn.log_softmax(h, dim=1)
        return y
dnn5_dp = NetCDRD().to(device)

small_train = Subset(mnist_dataset_trn, torch.arange(600))
train_loader_sm = DataLoader(small_train, batch_size=4)
train_model(dnn5_dp, train_loader_sm, evaluate_callback_fn=simple_mnist_logger,
            evaluate_every_n_iters=150, max_epoches=10)


In [0]:
dnn5_nodp = NetCDR().to(device)
train_model(dnn5_nodp, train_loader_sm, evaluate_callback_fn=simple_mnist_logger,
            evaluate_every_n_iters=150, max_epoches=10)

## 2.2 Automatic Differential Architecture Builder

### 2.2.1 Managed variable and operator

Modern architectures allow the user to  describe the computations in a data model (e.g. neural networks) naturally, and automatically configure necessary steps and the data structures for back propagation.  To be concrete, in the simple “processing step”:
```
Y = X1 + X2
```
The user has actually describe a relationship that “the value of a variable Y is the sum of the values of variables X1 and X2”.  Put it into details, 
> `Y`  is the result of an `operation of adding` two numbers

> The `operation` takes its input 1 from `X1`;  takes its input 2 from `X2`

Describing this seemingly simple process in details seems to be tautology. The advantage is that now it is possible to automatically construct the backpropagation chain to compute how the changes in the input X1 and X2 will affect the output Y. 

Let us represent the _variables_ and _operators_ we will use in _tractable_ computations. 

__Variables__: 
- _contain_ the following information:
    - `data`: the content value
    - `grad`: its effect on one target value, say, $\frac{\partial L}{\partial x}$
    - `from_op`: "ancester" of this piece of information, `None` or some operator. E.g. in $y = x_1 + x_2$, $x_1$ and $x_2$ have none ancester, $y$ is `from_op`erator "+".
    - `id_`: and unique global ID
- _perform_ the following functions:
    - applying operator to `self` (and `another` `variable` object), see `__add__` function below.
    - backpropating information of `grad` to the `from_op` (if it exists)

__Operators__:
- _contain_:
    - a
- _perform_ the following functions:
    - applyi
    
    
__EXERCISE__: Examine the operation of `Y = X1 + X2`

__REMARK__
You may think one target is too limited, but ...

In [0]:
class NeuData:
    d_count = 0 # A class-wise *global* counter
    
    def __init__(self, data=0, grad=0, from_op=None, name=None):
        # Essential information
        # 1. the "content" of this variable
        self.data = data 
        # 2. how this variable affects the "total objective"
        # of a system. For machine learning problems, the
        # system is determined by a global evaluation, often
        # called a "loss"
        self.grad = grad
        # 3. the operation from which this variable has been computed
        # if this variable is given (no computation is needed)
        # this is None
        self.from_op = from_op 
        
        # Comp. Graph maintainance (and diagnosis and visualisation)
        self.id_ = f"D{NeuData.d_count}"
        NeuData.d_count += 1 # a global serial number

        # Further information for diagnosis and visualisation
        self.name = self.id_ if name is None else name
        
        # If configured, output log information
        # Optionally, we can output some report at this point.
        # E.g.
        print(f"Variable {self.name} has been created" + 
              ("." if self.from_op is None else 
              f" from the result of {self.from_op.name}"))
        
    def __add__(self, another):
        """
        This function builds a "computational structure" performing
        ADDING between two Variables. 
        
        The method __add__ will be called when the following piece
        of code gets executed:
            `variable_1 + variable_2`
        then invocation would be
            `variable_1.__add__(another=variable_2)` # self=variable_1
            
        To have the entire "chain of information processing" trackable, 
        we take the computation and management in our own hands:
            1. create a *managed* operator to do the addition
            2. create a new variable to contain the result -- do NOT
              forget to tell the new variable where it comes from:
              the operation we just created above.
        """
        add_op = NeuOpAdd(self, another) # create an add-op, using self and 
            # another as its inputs.
        return NeuData(add_op.forward(), from_op=add_op) # create a new 
            # variable, maintain the computation structure by 
            # - storing the value 
            #   - returned by the newly created op, which is 
            #   - which is commanded to perform its duty --
            #   - adding input-1 and input-2; 
            # - establishing new var's dependency on the `from_op`
            
    def backward(self, g=1.0):
        """
        This function creates a ring in the chain of back propagation.
        When  informed that the change of my value will cause a change
        in the final objective `L` by a certain amount `g`, the variable
            1. accumulates `g` to `self.grad`, as one variable can have
              multiple ways of affecting L, e.g. 
              L = X1 + Y; 
              Y = X1 + X2
              Then a chenage X1 will affect L in two ways.
            2. propagates the information to 
        """
        # accumulate grad
        self.grad += g
        # report
        print(f"{self.name} update grad: {self.grad-g:.3g} "
              f"-> {self.grad:.3g}")
        # backprop grad
        if self.from_op is not None:
            self.from_op.backward(g)
            
            
class NeuOpAdd:
    op_count = 0 # global operator counter
    
    def __init__(self, oprand1, oprand2, name=None):
        """
        Init of a PLUS operator, allocate an ID, ensure a name and
        link the operator to the oprands.
        """
        self.id_ = f"Op+{NeuOpAdd.op_count}"
        NeuOpAdd.op_count += 1
        self.name = self.id_ if name is None else name
        # establish link to oprands
        self.oprand1 = oprand1
        self.oprand2 = oprand2
        # report its creation
        print(f"Operator {self.name} has been created, getting inputs"
              f" from {self.oprand1.name} and {self.oprand2.name}")
        
    def forward(self):
        """
        The computation. It is trivial for this operator.
        """
        resu = self.oprand1.data + self.oprand2.data
        # report
        print(f"{self.name} forwarding... \n"
              f"\t input-1 {self.oprand1.name}: value {self.oprand1.data:.3g}\n"
              f"\t input-2 {self.oprand2.name}: value {self.oprand2.data:.3g}\n"
              f"\t ouput: {resu:.3g}")
        return resu      
    
    def backward(self, g):
        """
        As a simple +, both inputs has equal effect on its output, i.e.
        if the output of THIS + OPERATOR affects the global L by a factor 
        of g, so does the input-1 as well as input-2
        """
        # backprop for op-1
        op1_g = g
        # report
        print(f"{self.name} transforming grad {g:.3g} to {op1_g:.3g}"
              f" and pass to {self.oprand1.name}")
        self.oprand1.backward(op1_g)
        
        # backprop for op-2
        op2_g = g
        print(f"{self.name} transforming grad {g:.3g} to {op2_g:.3g}"
              f" and pass to {self.oprand2.name}")
        self.oprand2.backward(op2_g)
        
        
              

__EXERCISE__

Run the following code blocks and explain the outputs.

In [0]:
x0 = NeuData(4, name="x0")
x1 = NeuData(3, name="x1")
L = x0 + x1
print("================\nComputation completed. \n"
      f"L name:{L.name}, id:{L.id_}, value:{L.data}")

In [0]:
L.backward()
del x0, x1, L # explicitly remove those variables lest confusion

In [0]:
x0 = NeuData(4, name="x0")
x1 = NeuData(3, name="x1")
x2 = NeuData(10, name="x2")
x3 = x0 + x1
L = x3 + (x2 + x3)
L.backward(1.0)
del x0, x1, x2, x3, L

### 2.2.2 Full Version of Managed Variable/Op/Func/NeuralNets

We provide a primary but full-fledged computation framework of constructing NN models that can perform automatic backprop PROGRAMMING.

In [0]:
########################################
# Ops
VERBOSE = True
class NeuOp(object):
    """
    Now we define a system of operators, this is the root class
    defining the basic bookkeeping behaviour of operators.
    """
    op_count = 0
    
    def __init__(self, name=None):
        super(NeuOp, self).__init__()
        self.id_ = f"Op{NeuOp.op_count}"
        NeuOp.op_count += 1
        if name is None:
            name = self.id_
        self.name = name
    
    def backward(self, g):
        raise NotImplementedError
        
    def forward(self):
        raise NotImplementedError
        
        
    def make_animation_forward_(self, resu):
        try:
            g_animator.make_animation_forward(self, resu)
        except Exception as e:
            pass
            
    def make_animation_backward_(self, msgs):
        try:
            g_animator.make_animation_backward(self, msgs)
        except Exception as e:
            pass
        
class UnaryNeuOp(NeuOp):
    """
    Operators who has only one oprand (input).
    """
    def __init__(self, oprand, name=None):
        super(UnaryNeuOp, self).__init__(name)
        self.oprand = oprand
        try:
            g_animator.make_animation_create(self)
        except:
            pass
        
    def backward(self, g):
        self.make_animation_backward_([
            dict(back_node_id=self.oprand.id_, msg=g)])
        self.oprand.backward(g)
                
class NeuOpNeg(UnaryNeuOp):
    """
    Negation, the "-" operator.
    """
    def __init__(self, oprand, name="-"):
        super(NeuOpNeg, self).__init__(oprand, name)
    
    def backward(self, g):
        bg = -g
        super(NeuOpNeg, self).backward(bg)

        
    def forward(self):
        resu = -self.oprand.data
        self.make_animation_forward_(resu)
        return resu
        
class BinaryNeuOp(NeuOp):
    """
    Binary Operators
    """
    def __init__(self, oprand1, oprand2, name=None):
        super(BinaryNeuOp, self).__init__(name)
        self.oprand1 = oprand1
        self.oprand2 = oprand2
        try:
            g_animator.make_animation_create(self)
        except:
            pass
    
class NeuOpAdd(BinaryNeuOp):
    """
    Our old friend "+"
    """
    def __init__(self, oprand1, oprand2, name="+"):
        super(NeuOpAdd, self).__init__(oprand1, oprand2, name)
    
    def backward(self, g):
        self.make_animation_backward_([
            dict(back_node_id=self.oprand1.id_, msg=g),
            dict(back_node_id=self.oprand2.id_, msg=g)])
        
        self.oprand1.backward(g)
        self.oprand2.backward(g)
        
    def forward(self):
        resu = self.oprand1.data + self.oprand2.data
        self.make_animation_forward_(resu)
        return resu
    
class NeuOpMul(BinaryNeuOp):
    """* operator, note the backprop"""
    def __init__(self, oprand1, oprand2, name="*"):
        super(NeuOpMul, self).__init__(oprand1, oprand2, name)
    
    def backward(self, g):
        # output = in1 * in2, so dL/d_output * in2 = dL/d_in1, similarly for d_in2
        self.make_animation_backward_([
            dict(back_node_id=self.oprand1.id_, msg=g * self.oprand2.data),
            dict(back_node_id=self.oprand2.id_, msg=g * self.oprand1.data)])
        
        self.oprand1.backward(g * self.oprand2.data)
        self.oprand2.backward(g * self.oprand1.data)
        
    def forward(self):
        resu = self.oprand1.data * self.oprand2.data
        self.make_animation_forward_(resu)
        return resu

########################################
# Data
class NeuData:
    d_count = 0
    
    def __init__(self, data=0, grad=None, from_op=None, name=None):
        # essential information
        self.data = data
        self.grad = grad
        self.from_op = from_op
        
        # comp. graph maintainance and visualisation
        self.id_ = f"D{NeuData.d_count}"
        NeuData.d_count += 1
        if name is None:
            name = self.id_
        self.name = name
                
        try:
            g_animator.make_animation_create(self)
        except:
            pass
        
    def __neg__(self):
        neg_op = NeuOpNeg(self)
        return NeuData(neg_op.forward(), from_op=neg_op)
        
    def __add__(self, another):
        add_op = NeuOpAdd(self, another)
        return NeuData(add_op.forward(), from_op=add_op)
    
    def __sub__(self, another):
        add_op = NeuOpAdd(self, -another)
        return NeuData(add_op.forward(), from_op=add_op)
    
    def __mul__(self, another):
        mul_op = NeuOpMul(self, another)
        return NeuData(mul_op.forward(), from_op=mul_op)
    
    def backward(self, g=1.0):
        if self.grad is None:
            self.grad = g
        else:
            self.grad += g
            
        if VERBOSE:
            print(f"{self.name} grad: {self.grad:.3f}")
        if self.from_op is not None:
            self.from_op.backward(g)
            
    def __setattr__(self, name, value):
        
        if name == "grad" and value is not None:
            try:
                g_animator.make_animation_backward(
                    self, 
                    [dict(node_id=self.id_, msg=value)])
            except Exception as e:
                pass
        
        super(NeuData, self).__setattr__(name, value)
            
        
    def __str__(self):
        return f"{self.data}"
    
    def __repr__(self):
        return f"{self.name}:{self.data:.3f}" if self.grad is None \
            else f"{self.name}:{self.data:.3f}:{self.grad:.3f}"
    
########################################
# Elementwise Functions
import math
class NeuFunction(object):
    """
    Functions are customised Uniary operators.
    The barrier for understanding is the dealing with backprop.
    We create a unary op, and hack the object's back-prop function.
    """
    def __init__(self, name=None):
        super(NeuFunction, self).__init__()
        self.name = name
        
    def forward(self, x):
        import types
        op = UnaryNeuOp(x, name=self.name)
        # op will be our placeholder in the differentiable computational graph.
        # we will never use op's forward computation (not defined anyway)
        # but we will take advantage of the *automatic* invocation of op's
        # `backward` method, which will be called when the output data pass
        # gradient messages through. 
        

        # We hijack the op's function and let it point to the `backward` function
        # of THIS object. NOTE, not THIS CLASS, `self` here would represent an instance
        # of a SUBCLASS, which will implement appropriate gradient transform.
        # See the Sigmoid function example below.
        op_original_back = op.backward
        def _back(op_instance, g):
            g_back = self.backward(op_instance, g) # self is subclass instance
            op_original_back(g_back)
        op.backward = types.MethodType(_back, op) # wrap it as a class-method function.
            # see https://stackoverflow.com/questions/10374527/dynamically-assigning-function-implementation-in-python
            # Amber's answer.
        return op
        
    def backward(self, g):
        raise NotImplementedError
        
    def __call__(self, x):
        """
        This will make the object "callable", i.e.
        f(x) === f.forward(x)
        """
        return self.forward(x)

def _sigmoid(x):
    return 1 / (1 + math.exp(-x))

class FuncSigmoid(NeuFunction):
    def __init__(self, name=None):
        super(FuncSigmoid, self).__init__(name)
    
    def forward(self, x):
        op = super(FuncSigmoid, self).forward(x)
        resu = _sigmoid(x.data)
        op.make_animation_forward_(resu)
        out = NeuData(resu, from_op=op)
        return out
    
    def backward(self, op, g):
        x = op.oprand
        y = _sigmoid(x.data)
        g = g * y * (1 - y)
        if VERBOSE:
            print(f"{self.name} pass-grad: {g:.3f}")
        return g

########################################
# Neural network components

def flatten_data(dlist):
    """Yield items from any nested iterable; see Reference."""
    from collections.abc import Iterable
    for x in dlist:
        if isinstance(x, Iterable):
            for sub_x in flatten_data(x):
                yield sub_x
        else:
            assert isinstance(x, NeuData), ValueError("Must be neu data")
            yield x
            
class NeuParameterisedModule(object):
    module_count = 0
    def __init__(self, name=None):
        if name is None:
            name = f"nn{NeuParameterisedModule.module_count}"
        self.name = name
        NeuParameterisedModule.module_count += 1
        self.parameters_ = []
        
    def __setattr__(self, name, value):
        """
        We hack the setting attribute to do bookkeeping of learnable parameters:
        """
        if name == "parameters_": # parameters are directly set
            print("set param")
            value = list(flatten_data(value))
            
        if isinstance(value, NeuParameterisedModule): # member sub-modules added
            print("add sub module")
            self.parameters_ += value.parameters_
            
        super(NeuParameterisedModule, self).__setattr__(name, value)
    
        
    def forward(self, x):
        raise NotImplementedError
        
    def __call__(self, x):
        return self.forward(x)

import random    
class Linear(NeuParameterisedModule):
    """
    Linear layer
    """
    def __init__(self, in_features, out_features, name=None):
        super(Linear, self).__init__(name)
            
        self.weights = \
            [[NeuData(random.gauss(0, 1), name=f"{self.name}:w{_o,_i}") 
              for _i in range(in_features)]
             for _o in range(out_features)]
        self.bias = \
            [NeuData(0, name=f"{self.name}:b{_o}") 
             for _o in range(out_features)]
        
        self.parameters_ = self.weights + self.bias
        
    def forward(self, x):
        from functools import reduce
        out = []
        for w, b in zip(self.weights, self.bias):
            weighted = [wj * xj for wj, xj in zip(w, x)]
            out.append(reduce(lambda a1, a2:a1 + a2, weighted + [b,]))
            
        return out
    



In [0]:
# Unit test of Functions
f = FuncSigmoid()
x1 = NeuData(0.2, name="x1")
w1 = NeuData(3, name="w1")
h = x1 * (w1 * w1)
y = f(h) + f(w1)
y.backward()
print(f"Auto-back w1.grad: {w1.grad}")

# Verify numerically
f = lambda x: 1 / (1 + math.exp(-x))
x1 = 0.2
w1 = 3
h = x1 * (w1 * w1)
y = f(h) + f(w1)

t_eps = 1e-3
w1 += t_eps
h = x1 * (w1 * w1)
y_new = f(h) + f(w1)
print(f"Numerical w1.grad: {(y_new - y) / t_eps}")


In [0]:
# Test using our module to create a neural net
class MyNN(NeuParameterisedModule):
    def __init__(self, name="nn"):
        super(MyNN, self).__init__(name)
        self.lin1 = Linear(in_features=2, out_features=3, name=f"{self.name}:l1")
        self.lin2 = Linear(in_features=3, out_features=1, name=f"{self.name}:l2")
        
    def forward(self, x):
        f = FuncSigmoid()
        pre_h = self.lin1(x)
        h = [ f(hi) for hi in pre_h ]
        pre_out = self.lin2(h)
        out = [ f(yi) for yi in pre_out ]
        return out
    
random.seed(42)    
mynn = MyNN()
print(mynn.parameters_)
y = mynn([NeuData(1), NeuData(2)])
print(y[0])
y[0].backward()

In [0]:
g_animator = AnimatorSceneV2()
x0 = NeuData(30, name="x0")
x1 = NeuData(5, name="x1")
w1 = NeuData(2, name="w1")
L = x0 + x1 * w1
L.backward(1.0)

In [0]:
%%manim -m
print("Animation")

## 2.3 Deep Architecture Applied

### 2.3.1 Prepare Data

We copy the library, data and environment preparation below for quick and show reference.

In [0]:
# The example is adopted from pytorch document:
# see https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#sphx-glr-beginner-blitz-cifar10-tutorial-py
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
from torch.nn import functional as fn
import matplotlib.pyplot as plt
import numpy as np

# Change the type of the virtual machine in "Runtime/Runtime Type"
device = torch.device("cuda:0") if torch.cuda.is_available() \
    else torch.device("cpu")

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')



# functions to show an image
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(trainloader)
images, labels = dataiter.next()

# show images
imshow(torchvision.utils.make_grid(images))

# print labels
print(' '.join(classes[l] for l in labels))

### 2.3.2 Define and Train Network.

In [0]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        batch_size = x.shape[0]
        h = fn.max_pool2d(fn.relu(self.conv1(x)), kernel_size=2)
        h = fn.max_pool2d(fn.relu(self.conv2(h)), kernel_size=2)
        h = h.view(batch_size, -1)
        h = fn.relu(self.fc1(h))
        h = fn.relu(self.fc2(h))
        y = self.fc3(h)
        return y

net = Net()
net = net.to(device)

In [0]:
import torch.optim as optim
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = fn.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')

criterion = nn.CrossEntropyLoss()

# 3 Generative Adversarial Nets

## 3.0 Helpers

### Env Prepare

In [0]:
import numpy as np
import scipy.stats as ss
import plotly.graph_objects as go

# Neural network preparation
import torch
import torch.nn as nn
import torch.nn.functional as fn
import torch.optim

### Utilities

In [0]:
def get_gmm_samples(sample_num, norm_params, 
                    weights=None, rng=None):
    """
    Generate samples from a Gaussian distribution
    """
    # take samples in two steps:
    # 1. a stream of indices from which to choose the component
    norm_params = np.atleast_2d(np.array(norm_params))
    component_num = norm_params.shape[0]
    if weights is None:
        weights = np.ones(component_num) / component_num
    if rng is None:
        rng = np.random.RandomState(42)
    component_num = norm_params.shape[0]
    mixture_idx = rng.choice(component_num, size=sample_num, 
                             replace=True, p=weights)
    # 2. sample the corresponding gaussian

    x_samples = np.array([
        (rng.randn() + norm_params[i][0]) * norm_params[i][1]
        for i in mixture_idx])
    return x_samples

def get_gmm_pdf(norm_params, weights=None):
    norm_params = np.atleast_2d(np.array(norm_params))
    component_num = norm_params.shape[0]
    if weights is None:
        weights = np.ones(component_num) / component_num
    
    # define the PDF for mixture model.
    def gmm_pdf(x):
        p = np.zeros_like(x)
        for (l, s), w in zip(norm_params, weights):
            p += ss.norm.pdf(x, loc=l, scale=s) * w
        return p
            
    min_val = (norm_params[:, 0] - 3 * norm_params[:, 1]).min()
    max_val = (norm_params[:, 0] + 3 * norm_params[:, 1]).max()
    x_grid = np.linspace(min_val, max_val, 200)
    mixture_pdf_on_grid = gmm_pdf(x_grid)
        
    return gmm_pdf, x_grid, mixture_pdf_on_grid


def get_gmm_logpdf_tensor(norm_params, weights=None):
    norm_params = np.atleast_2d(np.array(norm_params))
    component_num = norm_params.shape[0]
    if weights is None:
        weights = np.ones(component_num) / component_num
        
    norm_params_t = torch.Tensor(norm_params)
    weights_t = torch.Tensor(weights)
    
    components = [torch.distributions.normal.Normal(l, s)
                  for l, s in norm_params_t]
    
    
    # define the PDF for mixture model.
    def gmm_logpdf(x):
        return torch.stack([
            nd.log_prob(torch.Tensor(x)) * w
            for nd, w in zip(components, weights_t)]).sum(dim=0)
            
    return gmm_logpdf
       

### Visualiser Functions

In [0]:
def get_xaxis(xx):
    xaxis=go.layout.XAxis(
            range=[xx.min(), xx.max()],
            showgrid=False,
            zeroline=False,
            showline=True,
            gridcolor='#bdbdbd',
            gridwidth=1,
            zerolinecolor='#969696',
            zerolinewidth=2,
            linecolor='#636363',
            linewidth=2,
            mirror=True,
            showspikes=True
        )
    return xaxis


def get_yaxis(max_y):
    yaxis=go.layout.YAxis(
            range=[0, max_y],
            title="Probability",
            showgrid=False,
            zeroline=False,
            showline=True,
            gridcolor='#bdbdbd',
            gridwidth=1,
            linecolor='#636363',
            linewidth=2,
            mirror=True,
        )
    return yaxis



def get_layout(xx, max_y=1.0):
    layout = go.Layout(
        hovermode="closest",
        xaxis=get_xaxis(xx),
        yaxis=get_yaxis(max_y),
        yaxis2=go.layout.YAxis(
            range=[0, 10],
            title="Samples",
            titlefont=dict(color="#ff7f0e"),
            tickfont=dict(color="#ff7f0e"),
            anchor="free",
            overlaying="y",
            side="right",
            position=1.00
        ),
        height=400,
        width=600,)
    return layout

def get_training_frames(x, preds, color="red"):
    
    frames=[go.Frame(
        data=[go.Scatter(
            x=x,
            y=y,
            mode="markers",
            marker=dict(
            line_width=1,
            color=color,
            opacity=0.8))])
        for y in preds]
    return frames
    

def visualise_pdf(x_values, prob_values=None, show_pdf_text=False, show=True,
    color="blue", fill=True, name="PDF", fig=None,
                 samples=None):
    if show_pdf_text:
        pdf_text = [f"Prob(x={x:.2f}) = {y:.2f}"
                    for x, y in zip(x_values, prob_values)]
        pdf_hoverinfo = "text"
    else:
        pdf_text = ""
        pdf_hoverinfo = "none"
    fig_data = []
    
    if prob_values is not None:
        pdf_scatter = go.Scatter(
            x=x_values, y=prob_values, 
            marker=dict(
                line_width=1,
                color=color,
                colorscale="Cividis",
                symbol="square",
                opacity=0.5
            ),
            mode="lines",
            fill="tozeroy" if fill else "none",
            name=name,
            text=pdf_text,
            hoverinfo=pdf_hoverinfo)
        new_y_max = np.max(prob_values) * 1.05
        fig_data.append(pdf_scatter)
    else:
        new_y_max = None

    
    
    if samples is not None:
        sample_histogram = go.Histogram(
            x=samples, 
            xbins=dict(start=x_values.min(), 
                       end=x_values.max(), 
                       size=0.1),
            yaxis="y2", 
            opacity=0.8,
            marker=dict(color="orange"),
            name="Samples")
        
        fig_data.append(sample_histogram)
    
    if fig is None:
        layout = get_layout(x_grid, np.max(prob_values) * 1.05)
        fig = go.Figure(
                data=fig_data,
                layout=layout,
                layout_title_text="Probabilistic Density",
            )
    else:
        for tr_ in fig_data:
            fig.add_trace(tr_)
        if new_y_max is not None:
            orig_y_max = fig.layout.yaxis.range[1]
            fig.update_layout({"yaxis":{"range":[0, max(new_y_max, orig_y_max)]}})
        
    if show:
        fig.show()
    return fig

        
def ___samples(
    norm_params,
    weights=None,
    bootstrap_times_to_display=3,
    show_sample=False,
    estimator=np.median,
    rng=None,
    sample_num=None):
    pdf, x_grid, mixture_pdf = get_gmm_pdf(norm_params, weights)
   
    if show_sample:
        x_samples = get_gmm_samples(sample_num, norm_params, rng)
        sample_histogram = go.Histogram(
            x=x_samples, 
            xbins=dict(start=min_val, end=max_val, size=0.05),
            yaxis="y2", 
            opacity=0.8,
            marker=dict(color="orange"),
            name="Samples")
        
        fig_data.append(sample_histogram)
            

    

## 3.1 Generative Model Basics

What are GAN models? What the tutorial video below.

In [0]:
#@GAN Introduction
from IPython.display import YouTubeVideo
YouTubeVideo('PhCM3qoRZHE', width=600, height=400)

### 3.1.1 To Model Data by Approximating the PDF

To begin with, let us have a data distribution. A distribution is fully characterised by the _probility density function_.

Check the pdf below. We will use this function as our running example.

In [0]:
ex_gmm_parameters = [[0, 1], [3, 0.5]]
pdf_fn, x_grid, _ = get_gmm_pdf(ex_gmm_parameters)
pdf_values = pdf_fn(x_grid)
fig = visualise_pdf(x_grid, pdf_values, show=True, show_pdf_text=True)

Then we create a model for the distribution. Let's use a neural network.

In [0]:
class PDFNet1D(nn.Module):
    def __init__(self, hidden_units=2):
        super(PDFNet1D, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = torch.tanh(self.lin1(x)) 
        p = torch.sigmoid(self.lin2(h)) # as PDF only takes positive values
        return p

class NumpyWrapper:
    def __init__(self, net):
        self.net = net
    
    def __call__(self, x):
        x_in = np.atleast_2d(x).T if x.ndim < 2 else x
            
        with torch.no_grad():
            y = self.net(torch.Tensor(x_in)).cpu().numpy()
        return y.reshape(x.shape)

In [0]:
torch.manual_seed(42)
pdf_net_ = PDFNet1D()
pdf_approx = NumpyWrapper(pdf_net_)
# x_grid is the range of x values, created by the "true" pdf builder
fig2 = visualise_pdf(x_grid, pdf_approx(x_grid), show=True, show_pdf_text=True, fig=fig,
                     name="Approx", color="red", fill=False)

So we notice our function is not fitting so well with the true PDF. Let's do a bit training.

In [0]:
from torch.optim import Adam
import matplotlib.pyplot as plt
cmap=plt.cm.get_cmap('Blues')

NEURON_NUM = 20
MAX_TRAIN = 10000
EVERY_REPORT = 2000

# Define a net
torch.manual_seed(42)
pdf_net_ = PDFNet1D(NEURON_NUM)
pdf_approx = NumpyWrapper(pdf_net_)
fig = visualise_pdf(x_grid, pdf_fn(x_grid), show=False, show_pdf_text=True)

# Perform Training
optim = Adam(pdf_net_.parameters())
x_trn = torch.Tensor(x_grid[:, np.newaxis])
y_trn = torch.Tensor(pdf_fn(x_grid)[:, np.newaxis])
print(pdf_net_(x_trn).shape)

preds = []
for i in range(MAX_TRAIN):
    optim.zero_grad()
    pred = pdf_net_(x_trn)
    loss = fn.mse_loss(pred, y_trn)
    loss.backward()
    optim.step()
    if i % EVERY_REPORT == 0: 
        print(loss.item())
        preds.append(pred.detach().cpu().numpy().squeeze())

        fig = visualise_pdf(x_grid, pdf_approx(x_grid), show=False, show_pdf_text=True, 
                             fig=fig,
                             name=f"Approx @ {i}", 
                             color=cmap(i/EVERY_REPORT), fill=False)  

In [0]:
fig.show() 

### 3.1.2 Generative Model 

It seems that we can achieve learning a PDF in such a simple way. There are two issues:

1. The loss is computed over the entire data space, so we must scan entire data space, which is not practically feasilbe.
2. After training, the model can tell if an X belongs to the data, which is not entirely useful if explicit generation is needed.

#### Generator

We adopt the idea of a generator: it takes a _random number_ as input, and produces a data sample.

In [0]:
class Generator(nn.Module):
    def __init__(self, hidden_units=4):
        super(Generator, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.bn1 = nn.BatchNorm1d(hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, 
                              out_features=hidden_units)
        self.bn2 = nn.BatchNorm1d(hidden_units)
        self.lin3 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = self.bn1(torch.tanh(self.lin1(x)))
        h = self.bn2(torch.tanh(self.lin2(h)))
        out = self.lin3(h)
        return out

The idea is to model the _transform_ from a uniform box into the true data distribution. Let's see an example. The `model_g` tries to map $[0, 1]$ to the distribution represented by our PDF.

In [0]:
to_numpy_1d = lambda x:x.detach().cpu().numpy().squeeze()
torch.manual_seed(42)
model_g = Generator(16)
random_input = torch.rand(100, 1)
fake_x = model_g(random_input)
fake_x_np = to_numpy_1d(fake_x)

# Visualise the samples
fig = visualise_pdf(x_grid, pdf_fn(x_grid), samples=fake_x_np)


#### Naive Improvement

Apparently this is not very successful.  so now let's consider how to improve it.   A natural idea is to adjust the parameters in the generator network so that the “fake-x” it generates has a higher probability according to the PDF function.

In [0]:
# NOTE: If you want to start from scratch, e.g. to reset the
# visualisation figure. Re-run the cell above.

MAX_TRAIN = 10000
EVERY_REPORT = 2000

# We need a pdf that handles tensor data.
logpdf_fn_t = get_gmm_logpdf_tensor(ex_gmm_parameters)

# Allocate an optimiser for model_g
optim_g = Adam(model_g.parameters(), lr=1e-4)

fig = visualise_pdf(x_grid, pdf_fn(x_grid), show=False)
for i in range(MAX_TRAIN):
    optim_g.zero_grad()
    # take a new set of random inputs
    random_input = torch.rand(100, 1)
    # map random to the fake x
    fake_x = model_g(random_input)
    # evaluate the current fake x w.r.t. PDF
    fake_prob = logpdf_fn_t(fake_x.squeeze())
    # we want the fake_prob as large as possible
    # so negate the sum as loss
    loss = - fake_prob.mean()
    loss.backward()
    optim_g.step()
    if i % EVERY_REPORT == 0: 
        print(loss.item())

        fig = visualise_pdf(x_grid, show=False, 
                            fig=fig,
                            name=f"Samples @ {i}", 
                            color=cmap(i/EVERY_REPORT), 
                            samples=to_numpy_1d(fake_x))

In [0]:
fig.show()

#### Discriminator

There is an obvious problem in the above solution:

> The high-probability strategy drives the generator to push all the samples into the area with highest probability density,  which makes sense only from a local view. Globally the samples generated are very different from the true population of the data.

A natural solution is to introduce our counter mechanism,  we do not want the samples produced by the generator to be easily distinguished from the true population.

To put it formally, the current problem is that, when the "counterfeiter" puts all effort in optimising the $p(x)$ it generates, we will have different
$p(x|TrueData)$ and $p(x|FakeData)$. The formal way of stating "true and false data are easily distinguished" is that the _posterior probabilities_
$$
p(\{True,Fake\}|x)
$$
are different.

So in the next step we will try to make the posteriors similar.

In [0]:
# Data management
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import WeightedRandomSampler

class FakeSampleInXSpaceDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, fake_samples, x_values):
        """
        :param fake_samples: samples for which the probability is to be modelled
        :param x_values: non-sample common x
        """
        self.x_samples_ = torch.cat((fake_samples, x_values))
        self.y_samples_ = torch.cat((torch.ones_like(fake_samples), 
                                    torch.zeros_like(x_values)))
            # as training data, we would like the model fit to this data
            # to generate 1.0 for fake samples and 0.0 for normal x-values

    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        sample = self.x_samples_[idx], self.y_samples_[idx]

        return sample
    


In [0]:
# Fake data density estimator

class FakeDensityEstimator(nn.Module):
    """
    Input X, Output Prob(x) According to Fake Data Population
    """
    
    def __init__(self, hidden_units=4):
        super(FakeDensityEstimator, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        #self.bn1 = nn.BatchNorm1d(hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, 
                              out_features=hidden_units)
        self.lin3 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = torch.tanh(self.lin1(x)) # h = self.bn1(h)
        h = torch.tanh(self.lin2(h))
        out = self.lin3(h)
        return out
    


def fit_fake_density(fake_density_model, fake_samples, x_values,
                     max_iter=1000):
    fake_samples = torch.Tensor(fake_samples).squeeze()
    x_values = torch.Tensor(x_values).squeeze()
    optim = Adam(fake_density_model.parameters(), lr=1e-4)
    train_dataset = FakeSampleInXSpaceDataset(fake_samples, x_values)
    samples_weight = torch.cat((
        torch.ones_like(fake_samples) / len(fake_samples) / 2,
        torch.ones_like(x_values) / len(x_values) / 2,))
    sampler = WeightedRandomSampler(samples_weight, len(samples_weight),
                                   replacement=True)
    data_loader = DataLoader(train_dataset, batch_size=16,
                            sampler=sampler)
    it = 0
    criterion = nn.BCEWithLogitsLoss()
    while it < max_iter:
        # take random samples from fake_samples and x_values respectively
        for x, y in data_loader:
            optim.zero_grad()
            pred = fake_density_model(x.unsqueeze(dim=1))
            loss =  criterion(pred.squeeze(), y) #- (pred * y).mean()
            loss.backward()
            optim.step()
            
        if it % (max_iter / 5) == 0:
            print("Fake PDF Fitting Loss", loss.item())
        it += 1

        
        

Let's test how the fake density estimator fits to the fake samples.

In [0]:
torch.manual_seed(42)

# Let us re-train the generator G, then Fit our PDF to G

# 0. Define G
model_g = Generator(16)
random_input = torch.rand(100, 1)
fake_x = model_g(random_input)

# 1. Train G for a while
MAX_TRAIN_GEN = 10
EVERY_REPORT_GEN = 5

# We need a pdf that handles tensor data.
logpdf_fn_t = get_gmm_logpdf_tensor(ex_gmm_parameters)

# Allocate an optimiser for model_g
optim_g = Adam(model_g.parameters(), lr=1e-4)


for i in range(MAX_TRAIN_GEN):
    optim_g.zero_grad()
    random_input = torch.rand(100, 1)
    fake_x = model_g(random_input)
    fake_prob = logpdf_fn_t(fake_x.squeeze())
    loss = - fake_prob.mean()
    loss.backward()
    optim_g.step()
    if i % EVERY_REPORT_GEN == 0: 
        print("G Loss", loss.item())

# 2. Fit a PDF to this G-generated samples
random_input = torch.rand(200, 1)
fake_x = model_g(random_input).detach()

fake_density_model = FakeDensityEstimator(16)
fit_fake_density(fake_density_model, fake_x.squeeze(), x_grid, max_iter=300)

In [0]:
fake_density_pdf = fn.sigmoid(
    fake_density_model(torch.Tensor(x_grid).unsqueeze(1)))

fig = visualise_pdf(x_grid, to_numpy_1d(fake_density_pdf), 
                    samples=to_numpy_1d(fake_x),
                    name="PDF Fake, Estimated",
                    color="#646432",
                    show=False)
fig = visualise_pdf(x_grid, pdf_values, 
                    name="PDF True",
                    fig=fig,
                    show=True)

#### Breakthrough: Use Discriminator to supervise generator

Now, let us adopt the important idea, the models $X_{Fake}$ a generator produces should make the probability density estimated from $X_{Fake}$ very similar to the original density. So here we will have a GAME:

In [0]:
torch.manual_seed(42)

# Let us re-train the generator G, then Fit our PDF to G

# 0. Define G and Fake density estimator
model_g = Generator(16)
fake_density_model = FakeDensityEstimator(16)
# We need a pdf that handles tensor data.
logpdf_fn_t = get_gmm_logpdf_tensor(ex_gmm_parameters)
# Allocate an optimiser for model_g
optim_g = Adam(model_g.parameters(), lr=1e-4)

MAX_GAMES = 100
EVERY_REPORT_GAME = 20
MAX_TRAIN_GEN = 10
EVERY_REPORT_GEN = 5
MAX_ADJUST_ITERS = 5


#### GAME STARTS ####
game = 0
while game < MAX_GAMES:
    # 1. Train G for a while
    for i in range(MAX_TRAIN_GEN):
        optim_g.zero_grad()
        random_input = torch.rand(100, 1)
        fake_x = model_g(random_input)
        fake_prob = logpdf_fn_t(fake_x.squeeze())
        loss = - fake_prob.mean()
        loss.backward()
        optim_g.step()
        if i % EVERY_REPORT_GEN == 0: 
            print("G Loss", loss.item())

    # 2. Fit a PDF to this G-generated samples
    random_input = torch.rand(200, 1)
    fake_x = model_g(random_input).detach()
    fit_fake_density(fake_density_model, fake_x.squeeze(), 
                     x_grid, max_iter=100)
    
    # 3. !! NEW !!
    # Use G to map another group of random numbers for some fake samples
    # Let Density estimator to check how likely those fake samples belong
    # to G-induced distribution versus the original distribution.
    for i in range(MAX_ADJUST_ITERS):
        random_input = torch.rand(200, 1)
        fake_x = model_g(random_input) # No detach! as we will use the info to 
            # adjust parameters in G.
        gen_sample_fake_likelihood = fake_density_model(fake_x).squeeze()
        gen_sample_true_likelihood = logpdf_fn_t(fake_x).squeeze()

        # The goal is to make the gen_models more like true ones
        optim_g.zero_grad()
        loss = (gen_sample_fake_likelihood - gen_sample_true_likelihood).mean()
        loss.backward()
        optim_g.step()
    print(f"{game} LikeTrueLoss", loss.item())
    if game % EVERY_REPORT_GAME == 0:
        fake_density_pdf = fn.sigmoid(
            fake_density_model(torch.Tensor(x_grid).unsqueeze(1)))

        fig = visualise_pdf(x_grid, to_numpy_1d(fake_density_pdf), 
                            samples=to_numpy_1d(fake_x),
                            name="PDF Fake, Estimated",
                            color="#646432",
                            show=False)
        fig = visualise_pdf(x_grid, pdf_values, 
                            name="PDF True",
                            fig=fig,
                            show=True)
    game += 1

In [0]:
fake_density_pdf = fn.sigmoid(
    fake_density_model(torch.Tensor(x_grid).unsqueeze(1)))

fig = visualise_pdf(x_grid, to_numpy_1d(fake_density_pdf), 
                    samples=to_numpy_1d(fake_x),
                    name="PDF Fake, Estimated",
                    color="#646432",
                    show=False)
fig = visualise_pdf(x_grid, pdf_values, 
                    name="PDF True",
                    fig=fig,
                    show=True)

### 3.1.3 GAN Framework

The final piece completing the picture of the generative model framework arises from the following comtemplation. 

> In the above scheme, we 1) estimated PDF produced by the generator $p(x|fake)$ using generated samples and 2) compared the likelihood of the generated samples under $p(x|fake)$ and $p(x|true)$. This is implemented in the following logic

```python
gen_sample_fake_likelihood = fake_density_model(fake_x) # estimate fake.prob
gen_sample_true_likelihood = logpdf_fn_t(fake_x)        # estimate true.prob
...
loss = (gen_sample_fake_likelihood - gen_sample_true_likelihood).mean() 
# diff: make the produced sample look as true as possible
```

The challenges are
- Estimating $p(x|fake)$ is slow and inaccurate.
- We don't have $p(x|true)$ in practical scenarios anyway.

The essential thoughts are
> As far as comparison is concerned, we can employ a *Discriminator* to classify whether a sample is from the true data distribution (a training sample) or a generated one.

This makes it no longer necessary 
- to *explicitly* estimate the PDF $p(x|fake)$ produced by the generator from the fake samples.
- to have access to the ground-truth $p(x|true)$.

The new framework is as follows.


In [0]:
class DiscriminatorVanilla(nn.Module):
    def __init__(self, hidden_units=2):
        super(DiscriminatorVanilla, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = fn.relu(self.lin1(x))
        p = torch.sigmoid(self.lin2(h))
        return p
    
class GeneratorVanilla(nn.Module):
    def __init__(self, hidden_units):
        super(GeneratorVanilla, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = torch.tanh(self.lin1(x))
        out = self.lin2(h)
        return out
    

# W-GAN    
class Discriminator(nn.Module):
    def __init__(self, hidden_units=4):
        super(Discriminator, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, 
                              out_features=hidden_units)
        self.lin3 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = torch.tanh(self.lin1(x))
        h = torch.tanh(self.lin2(h))
        out = self.lin3(h)
        return out
    
class Generator(nn.Module):
    def __init__(self, hidden_units=4):
        super(Generator, self).__init__()
        self.lin1 = nn.Linear(in_features=1, out_features=hidden_units)
        self.bn1 = nn.BatchNorm1d(hidden_units)
        self.lin2 = nn.Linear(in_features=hidden_units, 
                              out_features=hidden_units)
        self.bn2 = nn.BatchNorm1d(hidden_units)
        self.lin3 = nn.Linear(in_features=hidden_units, out_features=1)
        
    def forward(self, x):
        h = self.bn1(torch.tanh(self.lin1(x)))
        h = self.bn2(torch.tanh(self.lin2(h)))
        out = self.lin3(h)
        return out
    
class GAN:
    """
    Please complete the GAN framework using the example training procedure
    provided below.
    """
    def __init__():
        self.discrim = Discriminator()
        self.gen = Generator()
        
    
    def fit(self, x):
        optim_discrim = torch.optim.Adam(self.discrim)
        optim_gen = torch.optim.Adam(self.gen)
        
        for i in range(1000):
            # generate fake samples
            fake_x = 0
            # train discriminator
            for j in range(5):
                pass

Note that we adopted the tricks proposed in Weirstein GAN to stablise and speed up. Read more:
[[paper-link](https://arxiv.org/abs/1701.07875)]
[[tutorial](https://wiseodd.github.io/techblog/2017/02/04/wasserstein-gan/)]
[[tutorial-more-background](https://paper.dropbox.com/doc/Wasserstein-GAN-GvU0p2V9ThzdwY3BbhoP7)]

Below are two more videos, one from the original GAN inventor, and the other talks about other generative models.


In [0]:
#@title
from IPython.display import YouTubeVideo
YouTubeVideo('9JpdAg6uMXs', width=600, height=400)

In [0]:
#@title
from IPython.display import YouTubeVideo
YouTubeVideo('9zKuYvjFFS8', width=600, height=400)

We don't use true PDF anymore, instead, we take training samples as in practical machine learning tasks. However, we do not need labels to train generative models.

In [0]:
train_sample_num = 5000
train_samples_np = get_gmm_samples(sample_num=train_sample_num, 
                                   norm_params=ex_gmm_parameters)

train_samples = torch.Tensor(train_samples_np).unsqueeze(dim=1)

In [0]:
import tqdm
# Create the function approximators
discrim = Discriminator(16)
generator = Generator(16)

optim_discrim = torch.optim.Adam(discrim.parameters(), lr=1e-4)
optim_gen = torch.optim.Adam(generator.parameters(), lr=1e-4)

fake_sample_num = train_sample_num

max_train_iters = 20000
train_discrim_iters = 5 # W-GAN (more discrim iters)

fig = go.Figure([go.Histogram(x=train_samples.numpy().squeeze(), nbinsx=100, 
                              marker=dict(color="red"))])
fig.show()
hist_scatters = []
for i in range(max_train_iters): #tqdm.tqdm_notebook(range(max_train_iters)):
    # generate fake samples
    random_inputs = torch.randn(fake_sample_num).unsqueeze(dim=1)
    with torch.no_grad():
        fake_samples = generator(random_inputs)
    
    x = torch.cat((train_samples, fake_samples), dim=0)
    gnd = torch.cat((torch.ones(train_sample_num),
                     torch.zeros(fake_sample_num)))

    # train discriminator
    for j in range(2):
        pred = discrim(x).squeeze()
        optim_discrim.zero_grad()
        # discriminator loss
        loss_discrim = -(pred * (gnd - 0.5) * 2).sum() # (gnd - 0.5) * 2
            # gets +1 for true and -1 for fake, so we want
            # discriminator generate BIG values for true
            # SMALL values for fake samples
        loss_discrim.backward()
        optim_discrim.step()
        # Clip discriminator parameters
        for p in discrim.parameters():
            p.data.clamp_(-0.01, 0.01)
    
    gnd = torch.ones(fake_sample_num)
    random_inputs = torch.randn(fake_sample_num).unsqueeze(dim=1)
    fake_samples = generator(random_inputs)
    output = discrim(fake_samples).squeeze()
    # generator loss
    loss_gen = - output.mean() # We want the generator to make samples
        # ~to make discriminator to make BIG values.
    optim_gen.zero_grad()
    loss_gen.backward()
    optim_gen.step()
    
    
    
    if i % 1000 == 0:
        hist_scatters.append(
            go.Histogram(x=fake_samples.detach().numpy().squeeze(), nbinsx=100,
                         marker=dict(color="blue"))
        )
        fig = go.Figure([go.Histogram(x=train_samples.numpy().squeeze(), nbinsx=100, marker=dict(color="red")),
                         go.Histogram(x=fake_samples.detach().numpy().squeeze(), nbinsx=100, marker=dict(color="blue"))])
        fig.show()
        # fig.update()
        
        
        str1 = "Discriminator loss {:.2f}".format(loss_discrim.item())
        str2 = "Generator loss {:.2f}".format(loss_gen.item())

## 3.2 Practical Data Generator

### 3.2.0 Preparing Dataset

In [0]:
from torch.optim import Adam
import torchvision
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt
import os
%matplotlib inline
def show(img):
    npimg = img.detach().to("cpu").numpy()
    npimg -= npimg.min()
    npimg /= npimg.max()
    if npimg.shape[0] in [3,4]:
        plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest')
    else:
        plt.imshow(npimg.squeeze(), interpolation='nearest')
torch.random.manual_seed(1984)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
g_noise_dim=64 # see def of the networks
g_basic_channels=64


if not os.path.exists("./checkpoints"):
    os.makedirs("./checkpoints")

In [0]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 32

trainset = torchvision.datasets.CIFAR10(
    root='./data', train=True,
    download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=batch_size,
    shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(
    root='./data', train=False,
    download=True, transform=transform)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=batch_size,
    shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# Check the data
for x, y in trainloader:
    break
print(x.shape)
print(y.shape)
show(x[0])
print(classes[y[0]])

Let check the generator on practical image data.

### 3.2.1 GAN Framework Definition

#### Image Generator

Note we can control the model complexity by using different numbers of intermediate channels (`basic_channels`). The `TransposedConv` renders information from internal code to image-like outputs. See the [intuitive illustration](https://github.com/vdumoulin/conv_arithmetic).

We adopt the following practical measures:

- batch normalisation
- using `relu` for activation in generator (later will use leaky-relu in discriminator)
- use `tanh` as output in generator
- kernel size = 4

The architecture has been proposed by [DCGAN](https://arxiv.org/abs/1511.06434) and had been shown practically useful.

In [0]:
class GNet_CIFAR(nn.Module):
    def __init__(self, noise_dim=64, basic_channels=64):
        super(GNet_CIFAR, self).__init__()
        self.convtr1 = nn.ConvTranspose2d(
            in_channels=noise_dim, 
            out_channels=basic_channels*4,
            kernel_size=4,
            stride=1,
            padding=0,
            bias=False)
        self.convtr2 = nn.ConvTranspose2d(
            basic_channels*4, basic_channels*2, 
            kernel_size=4, stride=2, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(basic_channels*2)
        self.convtr3 = nn.ConvTranspose2d(
            basic_channels*2, basic_channels, 
            kernel_size=4, stride=2, padding=1, bias=False)
        self.convtr4 = nn.ConvTranspose2d(
            basic_channels, 3, 
            kernel_size=4, stride=2, padding=1, bias=False)
        
        self.bn1 = nn.BatchNorm2d(basic_channels*4)
        self.bn2 = nn.BatchNorm2d(basic_channels*2)
        self.bn3 = nn.BatchNorm2d(basic_channels)
        
        
    def forward(self, x):
        h = self.convtr1(x)
        h = self.bn1(h)
        h = fn.relu(h)
        h = self.convtr2(h)
        h = self.bn2(h)
        h = fn.relu(h)
        h = self.convtr3(h)
        h = self.bn3(h)
        h = fn.relu(h)
        h = self.convtr4(h)
        h = torch.tanh(h)
        
        return h

#### Discriminator

In [0]:
class DNet_CIFAR(nn.Module):
    def __init__(self, basic_channels=64):
        super(DNet_CIFAR, self).__init__()
        self.conv1 = nn.Conv2d(
            in_channels=3, 
            out_channels=basic_channels,
            kernel_size=4)
        
        self.conv2 = nn.Conv2d(
            basic_channels, basic_channels*2, 
            kernel_size=4)
        self.conv3 = nn.Conv2d(
            basic_channels*2, basic_channels*4, 
            kernel_size=4)
        self.linear = nn.Linear(1024, 1)
        
        
        self.bn1 = nn.BatchNorm2d(basic_channels*2)
        
        
    def forward(self, x):
        h = self.conv1(x)
        h = fn.leaky_relu(h, 0.2, inplace=True)
        h = fn.max_pool2d(h, 2)
        h = self.conv2(h)
        h = self.bn1(h)
        h = fn.leaky_relu(h, 0.2, inplace=True)
        h = fn.max_pool2d(h, 2)
        h = self.conv3(h)
        h = self.linear(h.view(h.shape[0], -1))
        # h = F.sigmoid(h)
        return h

#### GAN Training

In [0]:
dnet = DNet_CIFAR(basic_channels=g_basic_channels).to(device) # device detected in preparation above
gnet = GNet_CIFAR(basic_channels=g_basic_channels,
                  noise_dim=g_noise_dim).to(device)
learning_rate = 1e-4
optim_discrim = Adam(dnet.parameters(), lr=learning_rate)
optim_gen = Adam(gnet.parameters(), lr=learning_rate)

Discriminator trainer.

We separate true and fake samples in two batches.

In [0]:
y_ones = torch.ones(batch_size).to(device)
y_zeros = torch.zeros(batch_size).to(device)

def train_discrim(discrim_train_iters=5):
    """
    Since we will make a GAN class eventually, we will use some
    global objects available, including
    - trainloader
    - opt
    - gnet
    - dnet
    - optim_discrim
    - criterion
    """
    for i in range(discrim_train_iters):
        optim_discrim.zero_grad()
        
        x_true, _ = next(iter(trainloader))
        pred_1 = dnet(x_true.to(device)).squeeze()
        loss_dis_1 = - pred_1.mean() # criterion(pred_1, y_ones)
        
        
        z = torch.randn(batch_size, g_noise_dim, 1, 1).to(device)
        x_fake = gnet(z).detach()
        pred_2 = dnet(x_fake).squeeze()
        loss_dis_2 =  pred_2.mean() # criterion(pred_2, y_zeros)
        
        loss = loss_dis_1 + loss_dis_2
        loss.backward()
        optim_discrim.step()
        for p in dnet.parameters():
                p.data.clamp_(-0.01, 0.01)
    
    return loss

Generator Trainer

In [0]:
def train_gen():
    """
    Global variables
    - optim_gen
    - gnet
    - dnet
    - opt
    """
    optim_gen.zero_grad()
    z = torch.randn(batch_size*2, g_noise_dim, 1, 1).to(device)
    x_fake = gnet(z) #! You cannot detach it NOW!
    gen_plausib = dnet(x_fake)
    loss_gen = (1.0 - gen_plausib).mean()
    loss_gen.backward()
    optim_gen.step()
    return loss_gen, x_fake

Perform training.

In [0]:
from IPython.display import Image, display
iters = 0
while iters<15000:
    loss_dis = train_discrim()
    loss_gen, x_fake = train_gen()
    
    if iters % 100==0:
        print("{:d} steps: Loss D: {:.3f} G: {:.3f} +:{:.3f}".format(
            iters, loss_dis, loss_gen, loss_dis+loss_gen
        ))

    if iters % 1000 == 0:
        torch.save(dnet.state_dict(), f"./checkpoints/D{iters:06d}.pt")
        torch.save(dnet.state_dict(), f"./checkpoints/G{iters:06d}.pt")
        torchvision.utils.save_image(x_fake, 
                                     f"./checkpoints/gen{iters:06d}.png", 
                                     normalize=True)
        display(Image(filename=f"./checkpoints/gen{iters:06d}.png"))
        print(f"Iter {iters}, models saved.")
        # show(x_fake[0])
    iters+=1

## 3.3 Image Transformation GAN

In [0]:
import torch
import torch.nn as nn
import functools
import os
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
import torchvision.transforms as transforms
from torchvision.utils import make_grid

image_transform = transforms.Compose([
            transforms.Resize((256, 256), Image.BICUBIC),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5),
                                 (0.5, 0.5, 0.5))])

to_pil = transforms.Compose([
    transforms.Normalize((-1.0, -1.0, -1.0), (2.0, 2.0, 2.0)),
    transforms.ToPILImage()])


def load_image(image_path):
    A_img = Image.open(image_path).convert('RGB')
    return image_transform(A_img).unsqueeze(0)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Defines the generator that consists of Resnet blocks between a few
# downsampling/upsampling operations.
# Code and idea originally from Justin Johnson's architecture.
# https://github.com/jcjohnson/fast-neural-style/
class ResnetGenerator(nn.Module):
    def __init__(self, input_nc, output_nc, ngf=64, norm_layer=nn.BatchNorm2d,
                 use_dropout=False, n_blocks=6, padding_type='reflect'):
        assert(n_blocks >= 0)
        super(ResnetGenerator, self).__init__()
        self.input_nc = input_nc
        self.output_nc = output_nc
        self.ngf = ngf
        if type(norm_layer) == functools.partial:
            use_bias = norm_layer.func == nn.InstanceNorm2d
        else:
            use_bias = norm_layer == nn.InstanceNorm2d

        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0,
                           bias=use_bias),
                 norm_layer(ngf),
                 nn.ReLU(True)]

        n_downsampling = 2
        for i in range(n_downsampling):
            mult = 2**i
            model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3,
                                stride=2, padding=1, bias=use_bias),
                      norm_layer(ngf * mult * 2),
                      nn.ReLU(True)]

        mult = 2**n_downsampling
        for i in range(n_blocks):
            model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)]

        for i in range(n_downsampling):
            mult = 2**(n_downsampling - i)
            model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2),
                                         kernel_size=3, stride=2,
                                         padding=1, output_padding=1,
                                         bias=use_bias),
                      norm_layer(int(ngf * mult / 2)),
                      nn.ReLU(True)]
        model += [nn.ReflectionPad2d(3)]
        model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)]
        model += [nn.Tanh()]

        self.model = nn.Sequential(*model)

    def forward(self, input):
        return self.model(input)


# Define a resnet block
class ResnetBlock(nn.Module):
    def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias):
        super(ResnetBlock, self).__init__()
        self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias)

    def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias):
        conv_block = []
        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type == 'replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type == 'zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)

        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
                       norm_layer(dim),
                       nn.ReLU(True)]
        if use_dropout:
            conv_block += [nn.Dropout(0.5)]

        p = 0
        if padding_type == 'reflect':
            conv_block += [nn.ReflectionPad2d(1)]
        elif padding_type == 'replicate':
            conv_block += [nn.ReplicationPad2d(1)]
        elif padding_type == 'zero':
            p = 1
        else:
            raise NotImplementedError('padding [%s] is not implemented' % padding_type)
        conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
                       norm_layer(dim)]

        return nn.Sequential(*conv_block)

    def forward(self, x):
        out = x + self.conv_block(x)
        return out

def create_generator():
    norm_layer = functools.partial(nn.InstanceNorm2d, affine=False,
                                   track_running_stats=True)
    netG = ResnetGenerator(3, 3, 64, norm_layer=norm_layer, use_dropout=False,
                           n_blocks=9)
    return netG

def get_trained_model_parameters(style):
    import urllib
    if not os.path.exists("checkpoints/"):
        os.makedirs("checkpoints")
    model_path = f"checkpoints/style_{style}.pth"
    if not os.path.exists(model_path):
        urllib.request.urlretrieve(
            "http://efrosgans.eecs.berkeley.edu/cyclegan/pretrained_models/"
            + 'style_' + style + ".pth",
            model_path)
    return model_path

def __patch_instance_norm_state_dict(state_dict, module, keys, i=0):
    key = keys[i]
    if i + 1 == len(keys):  # at the end, pointing to a parameter/buffer
        if module.__class__.__name__.startswith('InstanceNorm') and \
                (key == 'running_mean' or key == 'running_var'):
            if getattr(module, key) is None:
                state_dict.pop('.'.join(keys))
        if module.__class__.__name__.startswith('InstanceNorm') and \
           (key == 'num_batches_tracked'):
            state_dict.pop('.'.join(keys))
    else:
        __patch_instance_norm_state_dict(
            state_dict, getattr(module, key), keys, i + 1)

def load_generator_from(netG, model_path):
    state_dict = torch.load(model_path)
    for key in list(
            state_dict.keys()):  # need to copy keys because we mutate in loop
        __patch_instance_norm_state_dict(state_dict, netG, key.split('.'))
    netG.load_state_dict(state_dict)
    return netG    

In [0]:
#@title Parameter setting
#@markdown Choose an artist's style to play with
style = "cezanne" #@param ["cezanne", "monet", "ukiyoe", "vangogh"] {allow-input: false}
#@markdown ---
model_path = get_trained_model_parameters(style)
image_transf_net = create_generator()
image_transf_net = load_generator_from(image_transf_net, model_path).to(device)

In [0]:
input_image_fname = "/content/rose-blue-flower-rose-blooms-67636.jpeg"
assert os.path.exists(input_image_fname), \
    ("Image file does not exist, upload one and copy "
    "the path using the panel on left.")

In [0]:
# Alternatively, you can make a selfy here 👩🏻🤳 :P

In [0]:
im0 = load_image(input_image_fname).to(device)
im1 = image_transf_net(im0)
im_display = make_grid(torch.cat((im0, im1)).cpu())
to_pil(im_display)