# Apply RCNN on synthetic data

### Importing Requirements

In [None]:
%matplotlib inline

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
from torch.nn import init
from torch.autograd import Variable

from sklearn.externals import joblib
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error


from box_gen import get_array_with_box_at_pos

### Restoring data prepared in earlier notebook

In [None]:
# import data
X = np.load("RCNN/data/sliding_square.npy")
y = np.load("RCNN/data/sliding_square_target.npy")

### Making temporal dataset

In [None]:
temporal_X = np.zeros([X.shape[0]-1, 2, X.shape[1], X.shape[2]])
y = y[1:]

# stack the input frames in sequencial pairs
temporal_X[:,0,:,:] = X[:-1]
temporal_X[:,1,:,:] = X[1:]

temporal_X.shape, y.shape

In [None]:
# look a input out for sanity
i = 18 # any random example
ax1 = plt.subplot(331)
ax1.imshow(temporal_X[i,0,:,:])
ax1.set_title('X1')

ax2 = plt.subplot(332)
ax2.imshow(temporal_X[i,1,:,:])
ax2.set_title('X2')

ax3 = plt.subplot(333)
ax3.imshow(get_array_with_box_at_pos(y[i]))
ax3.set_title('y'+ " = "+ str(round(y[i],2)))

plt.show()


In [None]:
# final dataset with X and y
temporal_X.shape, y.shape

In [None]:
# splitting dataset into train-test
split_point = int(len(chron_X)*0.7)
temporal_x_train, temporal_x_test = temporal_X[:split_point], temporal_X[split_point:]
y_train, y_test = y[:split_point], y[split_point:]

assert(len(temporal_x_train) == len(y_train))
assert(len(temporal_x_test) == len(y_test))

# Build RCNN Model

In [None]:
class ConvGRUCell(nn.Module):
    """
    Generate a convolutional GRU cell
    """

    def __init__(self, input_size, hidden_size, kernel_size):
        super().__init__()
        padding = kernel_size // 2
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.reset_gate = nn.Conv2d(input_size + hidden_size, hidden_size, kernel_size, padding=padding)
        self.update_gate = nn.Conv2d(input_size + hidden_size, hidden_size, kernel_size, padding=padding)
        self.out_gate = nn.Conv2d(input_size + hidden_size, hidden_size, kernel_size, padding=padding)

        init.orthogonal(self.reset_gate.weight)
        init.orthogonal(self.update_gate.weight)
        init.orthogonal(self.out_gate.weight)
        init.constant(self.reset_gate.bias, 0.)
        init.constant(self.update_gate.bias, 0.)
        init.constant(self.out_gate.bias, 0.)


    def forward(self, input_, prev_state):

        # get batch and spatial sizes
        batch_size = input_.data.size()[0]
        spatial_size = input_.data.size()[2:]

        # generate empty prev_state, if None is provided
        if prev_state is None:
            state_size = [batch_size, self.hidden_size] + list(spatial_size)
            if torch.cuda.is_available():
                prev_state = Variable(torch.zeros(state_size)).cuda()
            else:
                prev_state = Variable(torch.zeros(state_size))

        # data size is [batch, channel, height, width]
        stacked_inputs = torch.cat([input_, prev_state], dim=1)
        update = F.sigmoid(self.update_gate(stacked_inputs))
        reset = F.sigmoid(self.reset_gate(stacked_inputs))
        out_inputs = F.tanh(self.out_gate(torch.cat([input_, prev_state * reset], dim=1)))
        new_state = prev_state * (1 - update) + out_inputs * update

        return new_state
    
class ConvGRU(nn.Module):

    def __init__(self, input_size, hidden_sizes, kernel_sizes, n_layers):
        '''
        Generates a multi-layer convolutional GRU.
        Preserves spatial dimensions across cells, only altering depth.
        Parameters
        ----------
        input_size : integer. depth dimension of input tensors.
        hidden_sizes : integer or list. depth dimensions of hidden state.
            if integer, the same hidden size is used for all cells.
        kernel_sizes : integer or list. sizes of Conv2d gate kernels.
            if integer, the same kernel size is used for all cells.
        n_layers : integer. number of chained `ConvGRUCell`.
        '''

        super(ConvGRU, self).__init__()
        self.input_size = input_size

        if type(hidden_sizes) != list:
            self.hidden_sizes = [hidden_sizes]*n_layers
        else:
            assert len(hidden_sizes) == n_layers, '`hidden_sizes` must have the same length as n_layers'
            self.hidden_sizes = hidden_sizes
        if type(kernel_sizes) != list:
            self.kernel_sizes = [kernel_sizes]*n_layers
        else:
            assert len(kernel_sizes) == n_layers, '`kernel_sizes` must have the same length as n_layers'
            self.kernel_sizes = kernel_sizes

        self.n_layers = n_layers
        cells = []
        for i in range(self.n_layers):
            if i == 0:
                input_dim = self.input_size
            else:
                input_dim = self.hidden_sizes[i-1]

            cell = ConvGRUCell(input_dim, self.hidden_sizes[i], self.kernel_sizes[i])
            name = 'ConvGRUCell_' + str(i).zfill(2)
            setattr(self, name, cell)
            cells.append(getattr(self, name))
        
        self.cells = cells
        
        # dense1 and dense2 for to converge in to output of size 1
        self.dense1 = nn.Linear(in_features=32*5*50, out_features=100)
        self.dense2 = nn.Linear(in_features=100, out_features=1)
        


    def forward(self, x, hidden=None):
        '''
        Parameters
        ----------
        x : 4D input tensor. (batch, channels, height, width).
        hidden : list of 4D hidden state representations. (batch, channels, height, width).
        Returns
        -------
        upd_hidden : 5D hidden representation. (layer, batch, channels, height, width).
        '''
        if not hidden:
            hidden = [None]*self.n_layers

        input_ = x
        upd_hidden = []
        for layer_idx in range(self.n_layers):
            cell = self.cells[layer_idx]
            cell_hidden = hidden[layer_idx]
            # pass through layer
            upd_cell_hidden = cell(input_, cell_hidden)
            upd_hidden.append(upd_cell_hidden.detach().numpy())
            # update input_ to the last updated hidden layer for next pass
            input_ = upd_cell_hidden
        output_tensor = torch.Tensor(np.array(upd_hidden))
        batch_size = output_tensor.shape[1]
        reshaped_output = output_tensor.view(batch_size, -1)
        # passing through dense layers to 
        dense1_out = self.dense1(reshaped_output)
        dense2_out = self.dense2(dense1_out)
        
        return dense2_out

In [None]:
model = ConvGRU(input_size=2, hidden_sizes=[32],kernel_sizes=[3], n_layers=1)

**Defining loss and optimizer**

In [None]:
criteria = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Training 

In [None]:
epochs = 200
losses  = []
for i in range(epochs):
    x = Variable(torch.Tensor(temporal_x_train).type(torch.FloatTensor))
    optimizer.zero_grad()
    predicted = model(x)
    loss = criteria(predicted, Variable(torch.Tensor(y_train.reshape(-1,1))))
    losses.append(loss.item())
    loss.backward()
    optimizer.step()

In [None]:
plt.plot(losses)
plt.xlabel("Epochs")
plt.ylabel("Losses")
plt.title("Decrease in loss as the training progresses")

## Test in forward / backward direction

In [None]:
test_loss = 0
all_predictions = []
all_label = []
for frame, output in zip(temporal_x_test, y_test):
    x = Variable(torch.Tensor(frame).unsqueeze(0).type(torch.FloatTensor))
    predicted = model(x)
    
    ax1 = plt.subplot(441)
    ax1.imshow(frame[0])
    plt.yticks([])
    
    ax1.set_title('X1')
    plt.yticks([])

    ax2 = plt.subplot(442)
    ax2.imshow(frame[1])
    ax2.set_title('X2')
    plt.yticks([])

    ax3 = plt.subplot(443)
    ax3.imshow(get_array_with_box_at_pos(predicted.detach().numpy()[0][0]))
    ax3.set_title('y_pred'+ " = "+ str(round(predicted.detach().numpy()[0][0],2)))
    plt.yticks([])
    
    ax4 = plt.subplot(444)
    ax4.imshow(get_array_with_box_at_pos(output))
    ax4.set_title('y'+ " = "+ str(round(output,2)))
    plt.yticks([])
    plt.show()
    
    all_predictions.append(predicted.item())
    all_label.append(output)

test_loss += mean_squared_error(all_predictions, all_label)

In [None]:
print("The final test loss : ", test_loss)