*Colormap of the notebook:*

* <span style="color:red">assignment problem</span>. The red color indicates the task that should be done
* <span style="color:green">debugging</span>. The green tells you what is expected outcome. Its primarily goal to help you get the correct answer
* <span style="color:blue">hints</span>.

Assignment 3 (Two layer network)
======================



##### Preliminaries

In [None]:
# for compatability issues 
from __future__ import print_function

In [None]:
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
import torch
import torchvision
import torch.nn as nn
from torch.autograd import Variable

In [None]:
# to make interactive plotting possible
%matplotlib inline
# for auto-reloading external modules
%load_ext autoreload
%autoreload 2

In [None]:
# make plots a bit nicer
plt.matplotlib.rcParams.update({'font.size': 18, 'font.family': 'serif'})

In [None]:
# random seed settings
torch.manual_seed(42)
np.random.seed(42)

# data type (useful to have in pytorch)
dtype_np = np.float64
dtype_torch = torch.FloatTensor
# dtype = torch.cuda.FloatTensor # to run on GPU

##### Data (playground)

In [None]:
# load data
data = np.loadtxt('data/toy_data/data_class_train.txt')
dataX = data[:,0:2]
dataY = data[:,2]

In [None]:
n_samples = data.shape[0]
dim_in = 2 # two features
dim_out = 3 # three classes

n_train = int(n_samples * 0.7)
n_test = n_samples - n_train

In [None]:
# train-test partition
perm = np.random.permutation(n_samples)
train_indx = perm[:n_train]
test_indx = perm[n_train:]

dataX_train, dataY_train = dataX[train_indx,:], dataY[train_indx]
dataX_test, dataY_test = dataX[test_indx,:], dataY[test_indx]

In [None]:
# visualize data
plt.figure(figsize=(10,5))
plt.plot(dataX_train[dataY_train==0,0], dataX_train[dataY_train==0,1],'ob', label="class1")
plt.plot(dataX_train[dataY_train==1,0], dataX_train[dataY_train==1,1],'og', label="class2")
plt.plot(dataX_train[dataY_train==2,0], dataX_train[dataY_train==2,1],'or', label="class3")

plt.plot(dataX_test[:,0], dataX_test[:,1],'xk', label="test")

plt.xlabel('feature #1')
plt.ylabel('feature #2')
plt.legend()
plt.xlim(-9, 9)
plt.ylim(-5, 12);

##### Two-layer Network (by hand)

$$x_{hidden} = RELU(x  \cdot W_1 + b_1)$$
$$y_{pred} = x_{hidden} \cdot W_2 + b_2$$

In [None]:
dim_hidden = 100 # hidden dimension

In [None]:
# input 
x = Variable(torch.from_numpy(dataX_train).type(dtype_torch), requires_grad=False)
y = Variable(torch.from_numpy(dataY_train).type(torch.LongTensor), requires_grad=False)

* Weights and biases

<span style="color:red"> **[PROBLEM I]**: </span>   
<span style="color:red"> Fill the missing part (weights and biases for the output layer) </span>  

In [None]:
# Randomly initialize weights
w1_value = np.random.randn(dim_in, dim_hidden)
w2_value = # YOUR CODE HERE

# Randomly initialize biases
b1_value = np.random.randn(dim_hidden)
b2_value = # YOUR CODE HERE

In [None]:
w1 = Variable(torch.from_numpy(w1_value).type(dtype_torch), requires_grad=True)
w2 = Variable(torch.from_numpy(w2_value).type(dtype_torch), requires_grad=True)

b1 = Variable(torch.from_numpy(b1_value).type(dtype_torch), requires_grad=True)
b2 = Variable(torch.from_numpy(b2_value).type(dtype_torch), requires_grad=True)

* Loss (we will use cross-entropy loss), see documentation for details http://pytorch.org/docs/master/nn.html#torch.nn.CrossEntropyLoss

In [None]:
criterion = nn.CrossEntropyLoss()

* learning parameters

In [None]:
learning_rate = 1e-3
n_iteration = 500

<span style="color:red"> **[PROBLEM II]**: </span>   
<span style="color:red"> Fill the missing part (last operation in forward pass to calculate *y_pred*) </span>  

In [None]:
logger = {}
logger['iteration'] = []
logger['loss_iteration'] = []

for t in range(n_iteration):  
    
    # forward pass
    x_hidden = x.mm(w1) + b1.expand(n_train, dim_hidden)
    x_hidden_act = x_hidden.clamp(min=0) # apply RELU
    y_pred = # YOUR CODE HERE
    
    # compute loss
    loss = criterion(y_pred, y)

    # backprop
    loss.backward()

    # update weights using gradient descent  
    w1.data -= learning_rate * w1.grad.data
    w2.data -= learning_rate * w2.grad.data
    b1.data -= learning_rate * b1.grad.data
    b2.data -= learning_rate * b2.grad.data
    
    # manually zero the gradients
    w1.grad.data.zero_()
    w2.grad.data.zero_()
    b1.grad.data.zero_()
    b2.grad.data.zero_()  
    
    # reporting & logging       
    if t % 100 == 0:
        print(t, loss.data[0])
        
    logger['iteration'] += [t]
    logger['loss_iteration'] += [loss.data[0]]

<span style="color:green"> After visualizing the loss (cell below) you should see something like this </span>

<img src="fig/loss_toy.png" style="height:128px;" />

In [None]:
# visualize loss
plt.figure(figsize=(10,5))
plt.plot(logger['iteration'], logger['loss_iteration'],'ob', label="loss")

plt.xlabel('iteration')
plt.ylabel('loss');

<span style="color:red"> **[PROBLEM III]**: </span>   
<span style="color:red"> Implement the fuction which takes x and predicts its class </span>  

In [None]:
def predict(x, w1, b1, w2, b2, dtype_torch=torch.FloatTensor):
    """
    Prediction based on two-layer model (by hand)
    
    Args:
        x (numpy.array): sample
        w1, b1, w2, b2 (torch.Tensor) : weights and biases 
    Returns:
        scalar: predicted class
    """
    #YOUR CODE HERE

<span style="color:green"> by running the following command you should get number bigger than 8 </span>

In [None]:
np.sum(np.equal([predict(x, w1, b1, w2, b2) for x in dataX_train[:10]], [0, 2, 0, 1, 0, 2, 0, 0, 1, 0]))

* Calculate accuracy

In [None]:
def get_accuracy(y, y_pred):
    """
    Calculate accuracy given y and y_predicted
    
    Args:
        y (numpy.array): ground truth
        y_pred (numpy.array): predictated values
         
    Returns:
        scalar: accuracy
    """
    n_samples = y.shape[0]
    return np.sum(y == y_pred)/n_samples * 100

In [None]:
y_train_predict = np.zeros(n_train)
for i in range(n_train):
    y_train_predict[i] = predict(dataX_train[i], w1, b1, w2, b2)

print("Train accuracy: %f" % get_accuracy(y_train_predict, dataY_train))

<span style="color:red"> **[PROBLEM IV]**: </span>   
<span style="color:red"> Calculate accuracy on the test set </span>

In [None]:
#YOUR CODE HERE

* Visualize decision boundary

In [None]:
Nspace = 100
x1space = np.linspace(-9, 9, Nspace)
x2space = np.linspace(-5, 12, Nspace)
X,Y = np.meshgrid(x1space, x2space)

Z = np.zeros((Nspace,Nspace))
for i in range(Nspace):
    for j in range(Nspace):
        x1 = x1space[j]
        y1 = x2space[i]
        x = np.array([x1,y1])
        Z[i,j] = predict(x, w1, b1, w2, b2)

plt.figure(figsize=(10,5))
plt.pcolor(X, Y, Z, vmin=abs(Z).min(), vmax=abs(Z).max())

plt.xlabel('feature #1')
plt.ylabel('feature #2')
plt.xlim(-9, 9)
plt.ylim(-5, 12);

##### Two-layer Network, again ...  (with nn package) 

$$x_{hidden} = RELU(x  \cdot W_1 + b_1)$$
$$y_{pred} = x_{hidden} \cdot W_2 + b_2$$

After some hard work we will use torch with all its power and elegance.  

* Model

In [None]:
# Use the nn package to define our model as a sequence of layers.
# nn.Sequential is a Module which contains other Modules,
# and applies them in sequence to produce its output.
# Each Linear Module computes output from input using a linear function,
# and holds internal Variables for its weight and bias.
model = torch.nn.Sequential(
            torch.nn.Linear(dim_in, dim_hidden),
            torch.nn.ReLU(),
            torch.nn.Linear(dim_hidden, dim_out),
        )

In [None]:
model

* Loss (we will use cross-entropy loss)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
# input 
x = Variable(torch.from_numpy(dataX_train).type(dtype_torch), requires_grad=False)
y = Variable(torch.from_numpy(dataY_train).type(torch.LongTensor), requires_grad=False)

In [None]:
learning_rate = 1e-2
n_iteration = 1000

In [None]:
logger = {}
logger['iteration'] = []
logger['loss_iteration'] = []

for t in range(n_iteration):  
    
    # forward pass
    y_pred = model(x)
    
    # compute loss
    loss = criterion(y_pred, y)

    # backprop
    loss.backward()

    # update weights using gradient descent  
    for param in model.parameters():
        param.data -= learning_rate * param.grad.data 
    
    # manually zero the gradients
    model.zero_grad()  
    
    # reporting & logging       
    if t % 100 == 0:
        print(t, loss.data[0])
        
    logger['iteration'] += [t]
    logger['loss_iteration'] += [loss.data[0]]

In [None]:
# visualize loss
plt.figure(figsize=(10,5))
plt.plot(logger['iteration'], logger['loss_iteration'],'ob', label="loss")

plt.xlabel('iteration')
plt.ylabel('loss');

In [None]:
def predict(x, model):
    """
    Prediction based on two-layer model
    
    Args:
        x (numpy.array): feature vector of a sample
         
    Returns:
        scalar: predicted class for this sample
    """    
    t = Variable(torch.from_numpy(x).type(dtype_torch))
    forward_pass = model(t)
    return np.argmax(forward_pass.data.numpy(), axis=1)

In [None]:
y_train_predict = predict(dataX_train,model)
print("Train accuracy: %f" % get_accuracy(y_train_predict, dataY_train))

y_test_predict = predict(dataX_test, model)
print("Test accuracy: %f" % get_accuracy(y_test_predict, dataY_test))

##### Two-layer Network, again and again ...  (with nn package + optim ) 

$$x_{hidden} = RELU(x  \cdot W_1 + b_1)$$
$$y_{pred} = x_{hidden} \cdot W_2 + b_2$$

torch.optim is a package implementing various optimization algorithms.  
see official documentation: http://pytorch.org/docs/master/optim.html

Already familar stuff ...

In [None]:
model = torch.nn.Sequential(
            torch.nn.Linear(dim_in, dim_hidden),
            torch.nn.ReLU(),
            torch.nn.Linear(dim_hidden, dim_out),
        )

criterion = nn.CrossEntropyLoss()

# input 
x = Variable(torch.from_numpy(dataX_train).type(dtype_torch), requires_grad=False)
y = Variable(torch.from_numpy(dataY_train).type(torch.LongTensor), requires_grad=False)

Instead of doing manually gradient descent, we will ask optim package to do the job.

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
learning_rate = 1e-2
n_iteration = 1000

In [None]:
logger = {}
logger['iteration'] = []
logger['loss_iteration'] = []

for t in range(n_iteration):  
    
    # forward + backward + optimize
    optimizer.zero_grad()
    y_pred = model(x)
    
    loss = criterion(y_pred, y)
    loss.backward()
    optimizer.step()  
    
    # reporting & logging       
    if t % 100 == 0:
        print(t, loss.data[0])
        
    logger['iteration'] += [t]
    logger['loss_iteration'] += [loss.data[0]]

In [None]:
# visualize loss
plt.figure(figsize=(10,5))
plt.plot(logger['iteration'], logger['loss_iteration'],'ob', label="loss")

plt.xlabel('iteration')
plt.ylabel('loss');

### Two-layer Network for our dataset 

In [None]:
from src.data_set import DataSetCifar10, DataSetDTD

In [None]:
# do it for sanity check (that you have a dataset)
!./data/get_cifar10_dataset.sh # for cifar10
#!./data/get_dtd_dataset.sh # for dtd

In [None]:
path_data = 'data' 

* Hyper-parameters

In [None]:
dim_hidden = 500
num_epochs = 10
learning_rate = 1e-3
batch_size=100

You can choose one the courses datasets (cifar10 or DTD)

In [None]:
data_set = DataSetCifar10(path_data, num_dunkeys=4, batch_size=batch_size)
#data_set = DataSetDTD(path_data, num_dunkeys=4, batch_size=100, fin_scale=32)

To make more clear what comes next:

Our image data (obtained from dataloader) has the following shape N x C x H x W, where:
* N is the number of datapoints
* C is the number of channels
* H is the height of the intermediate feature map in pixels
* W is the height of the intermediate feature map in pixels

In [None]:
# useful function 
def make_test(data_loader, model_current, train_test):
    """
     Giving dataset and model function calculates and prints out the accuracy
    
    Args:
        data_loader (DataLoader): loaded dataset
        model_current (model): the current model
        train_test (string): either 'train' or 'test' to define on which of these datasets we calculate accuracy
         
    Returns:
        scalar: accuracy value
    """    
    model.eval()
    correct = 0
    total = 0
    for images_, labels_ in data_loader[train_test]:
        N, C, H, W = images_.size() # read in N, C, H, W, C, H, W = x.size() # read in N, C, H, W
        images_ = Variable(images_.view(N, -1))
        outputs_ = model_current(images_)
        _, predicted = torch.max(outputs_.data, 1)
        total += labels_.size(0)
        correct += (predicted == labels_).sum()
    print('accuracy[' + train_test + '] : %f %%' % (100 * correct / total))

<span style="color:red"> **[PROBLEM V]**: </span>   
<span style="color:red"> Correctly specify the dimensions of the model </span>

In [None]:
model = torch.nn.Sequential(
            torch.nn.Linear(# YOUR CODE HERE),
            torch.nn.ReLU(),
            torch.nn.Linear(# YOUR CODE HERE),
        )

* define optimizer (we will use 'Adam' here)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# train
t = 0
logger = {}
logger['iteration'] = []
logger['loss_iteration'] = []

for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(data_set.loader['train']):
        # get data to train
        N, C, H, W = images.size() # read in N, C, H, W, C, H, W = x.size() # read in N, C, H, W
        images = Variable(images.view(N, -1))
        labels = Variable(labels)

        # forward + backward + optimize
        optimizer.zero_grad()
        outputs = model.forward(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # reporting & logging
        logger['iteration'] += [t]
        logger['loss_iteration'] += [loss.data[0]]
        t += 1
        if t % 100 == 0:
            print('epoch: [%d/%d], step: [%d/%d], loss: %.4f' %
                   (epoch + 1, num_epochs, i+1, len(data_set.dataset['train'])//batch_size, loss.data[0]))
        
    print('--- epoch: [%d, %d]' % (epoch + 1, num_epochs))
    #make_test(data_set.loader, model, 'train')
    make_test(data_set.loader, model, 'test')

    # switch back to the training  mode
    model.train()

In [None]:
# visualize loss
plt.figure(figsize=(10,5))
plt.plot(logger['iteration'], logger['loss_iteration'],'ob', label="loss")

plt.xlabel('iteration')
plt.ylabel('loss');