<a href="https://colab.research.google.com/github/cluePrints/fastai-v3-notes/blob/master/fastai3_part2_02_fully_connected.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
%load_ext autoreload
%autoreload 2

%matplotlib inline

In [0]:
# 1.0.50 is the earliest version course materials supposed to be working with
pip install fastai>=1.0.50 -q

In [3]:
!pip freeze | grep fastai

fastai==1.0.50.post1


In [0]:
# comes from prev lesson
import operator

def test(a,b,cmp,cname=None):
    if cname is None: cname=cmp.__name__
    assert cmp(a,b),f"{cname}:\n{a}\n{b}"

def test_eq(a,b): test(a,b,operator.eq,'==')

from pathlib import Path
from IPython.core.debugger import set_trace
from fastai import datasets
import pickle, gzip, math, torch, matplotlib as mpl
import matplotlib.pyplot as plt
from torch import tensor

MNIST_URL='http://deeplearning.net/data/mnist/mnist.pkl'

def near(a,b): return torch.allclose(a, b, rtol=1e-3, atol=1e-5)
def test_near(a,b): test(a,b,near)

In [0]:
from fastai import datasets

In [0]:
path = datasets.download_data(MNIST_URL, ext='.gz')
with gzip.open(path) as f:
  ((train_X, train_Y), (valid_X, valid_Y), (test_X, test_Y)) = pickle.load(f, encoding='latin')

# was running out of memory too much
train_X = torch.tensor(train_X)[:1000]
train_Y = torch.tensor(train_Y)[:1000]
valid_X = torch.tensor(valid_X)
valid_Y = torch.tensor(valid_Y)

In [0]:
batch_X = train_X[0][None]
batch_Y = train_Y[0][None]

In [8]:
train_mean, train_std = train_X.mean(), train_X.std()
train_mean,train_std

(tensor(0.1277), tensor(0.3039))

In [0]:
def normalize(x, mean, std):
  return (x-mean) / std

In [0]:
train_X = normalize(train_X, train_mean, train_std)
valid_X = normalize(valid_X, train_mean, train_std)

In [11]:
train_X.mean(), train_X.std()

(tensor(1.0950e-06), tensor(1.))

In [12]:
valid_X.mean(), valid_X.std()

(tensor(0.0030), tensor(1.0033))

In [13]:
train_X.shape

torch.Size([1000, 784])

In [0]:
n_inputs = train_X.shape[-1]
n_hidden = 100
n_outputs = 1
w1 = torch.zeros(n_inputs, n_hidden)
b1 = torch.zeros(n_hidden)
w2 = torch.zeros(n_hidden, n_outputs)
b2 = torch.zeros(n_hidden)

In [15]:
(torch.tensor(train_X) @ w1).shape

  """Entry point for launching an IPython kernel.


torch.Size([1000, 100])

In [16]:
class Lin():
  def __init__(self, n_inputs, n_outputs):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs)*math.sqrt(2/n_inputs)
    self.b = torch.zeros(n_outputs)
    
  def __call__(self, x):
    return self.forward(x)
    
  def forward(self, x):
    return x @ self.w + self.b

class ReLU():
  def __call__(self, x):
    return self.forward(x)

  def forward(self, x):
    return torch.clamp_min(x, 0)


# forward
class Model:
  def __init__(self):
    self.lin1 = Lin(n_inputs, n_hidden)
    self.lin2 = Lin(n_hidden, n_outputs)
    self.relu = ReLU()
    
  def __call__(self, x):
    x = self.lin1(x)
    x = self.relu(x)
    x = self.lin2(x)
    return x

model = Model()
x = model(train_X)
  
x.mean(), x.std()

(tensor(0.6355), tensor(0.9962))

In [17]:
x.shape, train_Y.shape

(torch.Size([1000, 1]), torch.Size([1000]))

In [0]:
def mse(actual, expected):
  return (expected - actual).pow(2).mean()

In [19]:
x.shape, train_Y.shape

(torch.Size([1000, 1]), torch.Size([1000]))

In [20]:
mse(x, train_Y[:,None].float())

tensor(24.4955)

In [0]:
import pdb
def mse_grad(activations, target):
  # f(g(x)) = f'(g(x))*g'(x)
  # d/dx(mse(lin2(...), y)) = d/dx((lin(x) - y)^2/count) = 2(lin(x)-y)/count * d/dx(lin2(...))
  grad = 2 * (activations - target) / len(activations)
  assert grad.shape == activations.shape
  return grad

def lin_grad(prev_g, layer, layer_input):
  # lin(u=input, w, b) * prev_g = (d/du(u*w+b) + d/dw(u*w+b) + d/db(u*w+b))*prev_g = (w + u + 1)*prev_g
  inp_grad = prev_g @ layer.w.t()
  w_grad = layer_input.t() @ prev_g
  # Note to self: original way of calculating the gradient was OOMing
  # orig_w_grad = (layer_input.unsqueeze(-1) * prev_g.unsqueeze(1)).sum(0)
  b_grad = prev_g.sum(0)
  assert inp_grad.shape == layer_input.shape
  assert w_grad.shape == layer.w.shape
  assert b_grad.shape == layer.b.shape
  return (inp_grad, w_grad, b_grad)

def relu_grad(lin2_grad_part, relu_ins):
  grad = lin2_grad_part * (relu_ins > 0).float()
  assert grad.shape == relu_ins.shape
  return grad

In [22]:
# forward
lin1 = Lin(n_inputs, n_hidden)
lin2 = Lin(n_hidden, n_outputs)
relu = ReLU()
lin1_out = lin1(train_X)
relu_out = relu(lin1_out)
lin2_out = lin2(relu_out)

# ...and backward
loss = mse(lin2_out, train_Y[:,None].float())

grad = mse_grad(lin2_out, train_Y[:,None].float())
grad, w2_grad, b2_grad = lin_grad(grad, lin2, relu_out)
grad = relu_grad(grad, lin1_out)
grad, w1_grad, b1_grad = lin_grad(grad, lin1, train_X)
grad.shape

torch.Size([1000, 784])

In [0]:
class Lin():
  def __init__(self, n_inputs, n_outputs, requires_grad=True):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs, requires_grad=requires_grad)*math.sqrt(2/n_inputs)
    # Note to self: this generated no grads initially because I fed X and not lin2_out to the mse, so 'w' was no a leaf
    # necessity to use retain_grad() is just a symtom of me screwing up the chain of calcs
    # self.w.retain_grad()
    self.b = torch.zeros(n_outputs, requires_grad=requires_grad)

  def __call__(self, x):
    return self.forward(x)
    
  def forward(self, x):
    return x @ self.w + self.b

class ReLU():
  def __call__(self, x):
    return self.forward(x)

  def forward(self, x):
    return torch.clamp_min(x, 0)

In [0]:
def verify_grads(lin1_param, lin2_param, w1_grad, b1_grad, w2_grad, b2_grad):
  train_X_w_grad = train_X.clone().requires_grad_(True)
  l1 = Lin(n_inputs, n_hidden)
  l1.w = lin1_param.w.clone().requires_grad_(True)
  l1.b = lin1_param.b.clone().requires_grad_(True)
  l2 = Lin(n_hidden, n_outputs)
  l2.w = lin2_param.w.clone().requires_grad_(True)
  l2.b = lin2_param.b.clone().requires_grad_(True)

  relu = ReLU()
  l1_out = l1(train_X_w_grad)
  r_out = relu(l1_out)
  l2_out = l2(r_out)
  tmp_loss = mse(l2_out, train_Y[:,None].float())
  tmp_loss.backward()

  test_near(l1_out, lin1_out)
  test_near(l2_out, lin2_out)
  test_near(r_out,  relu_out)
  test_near(tmp_loss,  loss)
  test_near(b1_grad, l1.b.grad)
  test_near(b2_grad, l2.b.grad)
  test_near(w1_grad, l1.w.grad)
  test_near(w2_grad, l2.w.grad)

verify_grads(lin1, lin2, w1_grad, b1_grad, w2_grad, b2_grad)

# Refactor to a model


In [0]:
class Lin():
  def __init__(self, n_inputs, n_outputs):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs)*math.sqrt(2/n_inputs)
    self.b = torch.zeros(n_outputs)
    
  def __call__(self, x):
    self.input = x
    self.output = self.forward(x)
    return self.output
    
  def forward(self, x):
    return x @ self.w + self.b

class ReLU():
  def __call__(self, x):
    self.input = x
    self.output = self.forward(x)
    return self.output

  def forward(self, x):
    return torch.clamp_min(x, 0)


class Model:
  def __init__(self):
    self.lin1 = Lin(n_inputs, n_hidden)
    self.lin2 = Lin(n_hidden, n_outputs)
    self.relu = ReLU()
    
  def __call__(self, x):
    return self.forward(x)

  def forward(self, x):
    self.input = x
    x = self.lin1(x)
    x = self.relu(x)
    x = self.lin2(x)
    return x

  def backward(self, y):
    self.loss = mse(self.lin2.output, y)
    grad_mse = mse_grad(self.lin2.output, y)
    self.grad_lin2, self.grad_w2, self.grad_b2 = lin_grad(grad_mse, self.lin2, self.relu.output)
    grad_relu = relu_grad(self.grad_lin2, self.lin1.output)
    self.grad_lin1, self.grad_w1, self.grad_b1 = lin_grad(grad_relu, self.lin1, self.lin1.input)

  def verify_grads(self):
    train_X_w_grad = self.input.clone().requires_grad_(True)
    l1 = Lin(n_inputs, n_hidden)
    l1.w = self.lin1.w.clone().requires_grad_(True)
    l1.b = self.lin1.b.clone().requires_grad_(True)
    l2 = Lin(n_hidden, n_outputs)
    l2.w = self.lin2.w.clone().requires_grad_(True)
    l2.b = self.lin2.b.clone().requires_grad_(True)

    relu = ReLU()
    l1_out = l1(train_X_w_grad)
    r_out = relu(l1_out)
    l2_out = l2(r_out)
    tmp_loss = mse(l2_out, train_Y[:,None].float())
    tmp_loss.backward()

    test_near(self.grad_b1, l1.b.grad)
    test_near(self.grad_b2, l2.b.grad)
    test_near(self.grad_w1, l1.w.grad)
    test_near(self.grad_w2, l2.w.grad)
    
model = Model()
model.forward(train_X)
model.backward(train_Y[:,None].float())
model.verify_grads()

In [0]:
# self.target added
# Module refactored out common stuff
# backward() extracted on lin2

class Module():
  def __call__(self, x):
    self.input = x
    self.output = self.forward(x)
    return self.output

class Lin(Module):
  def __init__(self, n_inputs, n_outputs):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs)*math.sqrt(2/n_inputs)
    self.b = torch.zeros(n_outputs)
    
  def forward(self, x):
    return x @ self.w + self.b
  
  def backward(self):
    grad, grad_w2, grad_b2 = lin_grad(self.output.g, self, self.input)
    self.g = grad
    self.w.g = grad_w2
    self.b.g = grad_b2

class ReLU(Module):
  def forward(self, x):
    return torch.clamp_min(x, 0)
  


class Model:
  def __init__(self):
    self.lin1 = Lin(n_inputs, n_hidden)
    self.lin2 = Lin(n_hidden, n_outputs)
    self.relu = ReLU()

  def forward(self, x, y):
    self.input = x
    self.target = y
    x = self.lin1(x)
    x = self.relu(x)
    x = self.lin2(x)
    return x

  def backward(self):
    self.loss = mse(self.lin2.output, self.target)
    grad_mse = mse_grad(self.lin2.output, self.target)
    self.lin2.output.g = grad_mse
    self.lin2.backward()
    grad_relu = relu_grad(self.lin2.g, self.lin1.output)
    self.grad_lin1, self.grad_w1, self.grad_b1 = lin_grad(grad_relu, self.lin1, self.lin1.input)

  def verify_grads(self):
    train_X_w_grad = self.input.clone().requires_grad_(True)
    l1 = Lin(n_inputs, n_hidden)
    l1.w = self.lin1.w.clone().requires_grad_(True)
    l1.b = self.lin1.b.clone().requires_grad_(True)
    l2 = Lin(n_hidden, n_outputs)
    l2.w = self.lin2.w.clone().requires_grad_(True)
    l2.b = self.lin2.b.clone().requires_grad_(True)

    relu = ReLU()
    l1_out = l1(train_X_w_grad)
    r_out = relu(l1_out)
    l2_out = l2(r_out)
    tmp_loss = mse(l2_out, train_Y[:,None].float())
    tmp_loss.backward()

    test_near(self.grad_b1, l1.b.grad)
    test_near(self.lin2.b.g, l2.b.grad)
    test_near(self.grad_w1, l1.w.grad)
    test_near(self.lin2.w.g, l2.w.grad)
    
model = Model()
model.forward(train_X, train_Y[:,None].float())
model.backward()
model.verify_grads()

In [0]:
# backward() extracted on lin1
# backward() extracted on relu
# backward(), forward() extracted on mse

class Module():
  def __call__(self, x):
    self.input = x
    self.output = self.forward(x)
    return self.output

class Lin(Module):
  def __init__(self, n_inputs, n_outputs):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs)*math.sqrt(2/n_inputs)
    self.b = torch.zeros(n_outputs)
    
  def forward(self, x):
    return x @ self.w + self.b
  
  def backward(self):
    grad, grad_w2, grad_b2 = lin_grad(self.output.g, self, self.input)
    self.input.g = grad
    self.w.g = grad_w2
    self.b.g = grad_b2

class ReLU(Module):
  def forward(self, x):
    return torch.clamp_min(x, 0)

  def backward(self):
    self.input.g = relu_grad(self.output.g, self.input)

class Mse(Module):
  def forward(self, x):
    self.loss_value = mse(x, self.target)
    return x
  
  def backward(self):
    self.input.g = mse_grad(self.input, self.target)

class Model:
  def __init__(self):
    self.lin1 = Lin(n_inputs, n_hidden)
    self.lin2 = Lin(n_hidden, n_outputs)
    self.relu = ReLU()
    self.mse = Mse()

  def forward(self, x, y):
    self.input = x
    self.target = y
    self.mse.target = y
    x = self.lin1(x)
    x = self.relu(x)
    x = self.lin2(x)
    x = self.mse(x)
    return x

  def backward(self):
    self.mse.backward()
    self.lin2.backward()
    self.relu.backward()
    self.lin1.backward()
    
  def verify_grads(self):
    train_X_w_grad = self.input.clone().requires_grad_(True)
    l1 = Lin(n_inputs, n_hidden)
    l1.w = self.lin1.w.clone().requires_grad_(True)
    l1.b = self.lin1.b.clone().requires_grad_(True)
    l2 = Lin(n_hidden, n_outputs)
    l2.w = self.lin2.w.clone().requires_grad_(True)
    l2.b = self.lin2.b.clone().requires_grad_(True)

    relu = ReLU()
    l1_out = l1(train_X_w_grad)
    r_out = relu(l1_out)
    l2_out = l2(r_out)
    tmp_loss = mse(l2_out, train_Y[:,None].float())
    tmp_loss.backward()

    test_near(self.lin1.b.g, l1.b.grad)
    test_near(self.lin2.b.g, l2.b.grad)
    test_near(self.lin1.w.g, l1.w.grad)
    test_near(self.lin2.w.g, l2.w.grad)
    
model = Model()
model.forward(train_X, train_Y[:,None].float())
model.backward()
model.verify_grads()

In [0]:
# mse -> loss
# model.layers introduced
class Module():
  def __call__(self, x):
    self.input = x
    self.output = self.forward(x)
    return self.output

class Lin(Module):
  def __init__(self, n_inputs, n_outputs):
    self.n_inputs, self.n_outputs = n_inputs, n_outputs
    self.w = torch.randn(n_inputs, n_outputs)*math.sqrt(2/n_inputs)
    self.b = torch.zeros(n_outputs)
    
  def forward(self, x):
    return x @ self.w + self.b
  
  def backward(self):
    grad, grad_w2, grad_b2 = lin_grad(self.output.g, self, self.input)
    self.input.g = grad
    self.w.g = grad_w2
    self.b.g = grad_b2

class ReLU(Module):
  def forward(self, x):
    return torch.clamp_min(x, 0)

  def backward(self):
    self.input.g = relu_grad(self.output.g, self.input)

class Mse(Module):
  def forward(self, x):
    self.loss_value = mse(x, self.target)
    return x
  
  def backward(self):
    self.input.g = mse_grad(self.input, self.target)

class Model:
  def __init__(self):
    self.lin1 = Lin(n_inputs, n_hidden)
    self.lin2 = Lin(n_hidden, n_outputs)
    self.relu = ReLU()
    self.layers = [self.lin1, self.relu, self.lin2]
    self.loss = Mse()

  def forward(self, x, y):
    self.input = x
    self.target = y
    for layer in self.layers:
      x = layer(x)

    self.loss.target = y
    return self.loss(x)

  def backward(self):
    self.loss.backward()
    for layer in reversed(self.layers):
      layer.backward()
    
  def verify_grads(self):
    train_X_w_grad = self.input.clone().requires_grad_(True)
    l1 = Lin(n_inputs, n_hidden)
    l1.w = self.lin1.w.clone().requires_grad_(True)
    l1.b = self.lin1.b.clone().requires_grad_(True)
    l2 = Lin(n_hidden, n_outputs)
    l2.w = self.lin2.w.clone().requires_grad_(True)
    l2.b = self.lin2.b.clone().requires_grad_(True)

    relu = ReLU()
    l1_out = l1(train_X_w_grad)
    r_out = relu(l1_out)
    l2_out = l2(r_out)
    tmp_loss = mse(l2_out, train_Y[:,None].float())
    tmp_loss.backward()

    test_near(self.lin1.b.g, l1.b.grad)
    test_near(self.lin2.b.g, l2.b.grad)
    test_near(self.lin1.w.g, l1.w.grad)
    test_near(self.lin2.w.g, l2.w.grad)
    
model = Model()
model.forward(train_X, train_Y[:,None].float())
model.backward()
model.verify_grads()

In [0]:
# ...

* 1) normalize
* 2) initialize w & b
* 3) relu(lin(x))
* 4) model=lin(relu(lin(x))
* 5) MSE loss func
* 6) gradients for all the layers, autograd check
* 7) refactor to model
