<h1>Redes Convolucionais</h1>

Neste trabalho você irá comparar uma RNA totalmente conectada com uma RNA com uma camada convolucional no problema de reconhecimento de dígitos MNIST. 

No trecho de código abaixo nós temos a implementação de uma RNA totalmente conectada utilizando a tangente hiperbólica como função de ativação dos neurônios da camada escondida e a função softmax na camada de saída. 

Como vimos em sala de aula, a arquitetura da RNA nesta implementação é ajustável através do arranjo <i>dims = [784, 100, 10]</i>. Nesse exemplo, a RNA possui 784 entradas (uma para cada pixel de uma imagem MNIST), 100 neurônios na camada escondida e 10 neurônios na camada de saída. 

In [None]:
from keras.datasets import mnist
import matplotlib.pyplot as plt
import numpy as np

class RNA:
    def __init__(self, dims):
        self.dims = dims
        self.weights = {}
        self.L = len(dims)

        for i in range(1, len(dims)):
            self.weights[f'W{i-1}'] = np.random.randn(dims[i], dims[i-1]) * (1/np.sqrt(dims[i-1]))
            self.weights[f'B{i-1}'] = np.zeros((dims[i], 1))
                        
    def derivative(self, A):
        return 1 - (A ** 2)
    
    def activation_function(self, Z):
        return np.tanh(Z)
    
    def softmax(self, x):
        x = x - np.max(x)
        temp = np.exp(x)
        return temp / np.sum(temp, axis=0, keepdims=True)

    def forward(self, X, Y=None):
        self.cache = {}
        self.cache['A0'] = X
        m = X.shape[1]
        for j in range(1, self.L):
            if j < self.L - 1:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.activation_function(self.cache[f'Z{j}'])
            else:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.softmax(self.cache[f'Z{j}'])
        return self.cache[f'A{self.L-1}']
    
    def backward(self, Y):
        self.cache[f'd{self.L-1}'] = self.cache[f'A{self.L-1}'] - Y
        self.cache[f'd{self.L-1}'] = np.multiply(self.cache[f'd{self.L-1}'], self.derivative(self.cache[f'A{self.L-1}']))
        
        for j in reversed(range(0, self.L-1)):
            if j > 0:
                self.cache[f'd{j}'] = np.dot(self.weights[f'W{j}'].T, self.cache[f'd{j+1}'])
                self.cache[f'd{j}'] = np.multiply(self.cache[f'd{j}'], self.derivative(self.cache[f'A{j}']))

            self.cache[f'dW{j}'] = (1/len(Y)) * (np.dot(self.cache[f'd{j+1}'], self.cache[f'A{j}'].T))
            self.cache[f'dB{j}'] = (1/len(Y)) * np.sum(self.cache[f'd{j+1}'], axis=1, keepdims=True)
    
    def update(self, alpha):
        for j in range(0, self.L-1):
            self.weights[f'W{j}'] = self.weights[f'W{j}'] - alpha * self.cache[f'dW{j}']
            self.weights[f'B{j}'] = self.weights[f'B{j}'] - alpha * self.cache[f'dB{j}']
                    
    def train(self, X, Y, X_test, Y_test, alpha, steps):
        percentage_train_list = []
        percentage_test_list = []
        
        Y_one_hot = np.zeros((10, X.shape[1]))        
        for index, value in enumerate(Y):
            Y_one_hot[value][index] = 1
        
        for i in range(0, steps):
            self.forward(X, Y_one_hot)
            self.backward(Y_one_hot)
            self.update(alpha)
            if i % 100 == 0:
                percentage_train = self.evaluate(X, Y)
                percentage_train_list.append(percentage_train)
                
                percentage_test = self.evaluate(X_test, Y_test)
                percentage_test_list.append(percentage_test)                
                
                print('train: %.3f, test: %.3f ' % (percentage_train, percentage_test))

    def evaluate(self, X, Y):
        Y_hat = self.predict(X)
        classified_correctly = test_correct = np.count_nonzero(np.argmax(Y_hat, axis=0) == Y)
        return classified_correctly / X.shape[1]
    
    def predict(self, X):
        return self.forward(X) 

Ao executar o código abaixo você irá treinar uma RNA com a estrutura 784, 100, 10 com 10000 instâncias de treinamento. Em cada iteração do gradiente descendente será exibido na tela a porcentagem de instâncias classificadas corretamente no conjunto de treinamento e no conjunto de teste. Nesse experimento utilizamos a taxa de aprendizagem de 0.0001 em 1000 iterações do gradiente descendente.  

Logo abaixo desse trecho de código você irá observer os valores de uma execução desse processo de treinamento. 

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

m = 10000
images, labels = (x_train[0:m].reshape(m, 28*28) / 255, y_train[0:m])
images = images.T

images_test, labels_test = (x_test.reshape(x_test.shape[0], 28*28) / 255, y_test) 
images_test = images_test.T

dims = [784, 100, 10]
rna = RNA(dims)
rna.train(images, labels, images_test, labels_test, 0.0001, 1000)

Abaixo você deverá completar o código para implementar uma RNA com uma camada convolucional, conforme vimos na aula 17 da disciplina (ver notas de aulas no PVANet). Isto é, a RNA terá uma camada de convolução com 10 filtros de tamanho 5x5; a função de ativação da camada de convolução será a tangente hiperbólica. A RNA terá mais uma camada de saída totalmente conectada implementando a função softmax. Diferente do código acima, que implementa o algoritmo de gradiente descendente, abaixo você irá implementar o gradiente descentende com mini-batches de tamanho 32. 

Não é necessário implementar uma classe genérica para a RNA abaixo, basta completar o código de forma a treinar o modelo e obter uma saída parecida com a que vimos em sala de aula.  

In [None]:
import numpy as np, sys
from keras.datasets import mnist
import matplotlib.pyplot as plt

def plot_digit(image):
    """
    Função para plotar uma imagem, utilizada abaixo para mostrar os filtros aprendidos. 
    """
    plt.imshow(image, cmap='gray')
    plt.show()

(x_train, y_train), (x_test, y_test) = mnist.load_data()

images, labels = (x_train[0:10000].reshape(10000, 28*28) / 255,
                  y_train[0:10000])


one_hot_labels = np.zeros((len(labels),10))
for i,l in enumerate(labels):
    one_hot_labels[i][l] = 1
labels = one_hot_labels

test_images = x_test.reshape(len(x_test),28*28) / 255
test_labels = np.zeros((len(y_test),10))
for i,l in enumerate(y_test):
    test_labels[i][l] = 1

def tanh(x):
    return np.tanh(x)

def tanh2deriv(output):
    return 1 - (output ** 2)

def softmax(x):
    x = x - np.max(x)
    temp = np.exp(x)
    return temp / np.sum(temp, axis=1, keepdims=True)

def get_image_section(layer, row_from, row_to, col_from, col_to):
    section = layer[:,row_from:row_to, col_from:col_to]
    return section.reshape(-1,1,row_to-row_from, col_to-col_from)

alpha, iterations = (0.015, 50)
pixels_per_image, num_labels = (784, 10)
batch_size = 32

input_rows = 28
input_cols = 28

kernel_rows = 5
kernel_cols = 5
num_kernels = 10

hidden_size = ((input_rows - kernel_rows + 1) * 
               (input_cols - kernel_cols + 1)) * num_kernels

kernels = (1/np.sqrt(kernel_rows*kernel_cols))*np.random.randn(kernel_rows*kernel_cols,
                                 num_kernels) 
b_k = np.zeros((1, num_kernels))
weights_1_2 = (1/np.sqrt(hidden_size))*np.random.randn(hidden_size, num_labels) 
b_1_2 = np.zeros((1, num_labels))

#faremos iterations + 1 operações de atualização do gradiente descendente com o mini-batch
for j in range(iterations + 1):

    #variável para somarmos o total de instâncias classificadas corretamente em cada batch
    correct_cnt = 0
    for i in range(int(len(images) / batch_size)):
        batch_start, batch_end=((i * batch_size),((i+1)*batch_size))
        layer_0 = images[batch_start:batch_end]
        layer_0 = layer_0.reshape(layer_0.shape[0], 28, 28)

        sects = list()
        for row_start in range(layer_0.shape[1] - kernel_rows+1):
            for col_start in range(layer_0.shape[2] - kernel_cols+1):
                sect = get_image_section(layer_0,
                                         row_start,
                                         row_start+kernel_rows,
                                         col_start,
                                         col_start+kernel_cols)
                sects.append(sect)

        expanded_input = np.concatenate(sects,axis=1)
        es = expanded_input.shape
        flattened_input = expanded_input.reshape(es[0]*es[1],-1)
        
        #complete aqui a implementação
        if row_start% 100 == 0:  
              percentage_train=(correct_cnt/float(len(test_images)))
              percentage_test=(correct_cnt/float(len(images)))
              print('train: %.3f, test: %.3f ' % (percentage_train, percentage_test))

        kernel_output = flattened_input.dot(kernels)
        layer_1 = tanh(kernel_output.reshape(es[0],-1))
        layer_2 = np.dot(layer_1,weights_1_2)
        correct_cnt += int(np.argmax(layer_2) ==
                                np.argmax(test_labels[i:i+1]))
        
       # if i % 100 == 0:  
        #      percentage_train=(correct_cnt/float(len(test_images)))
         #     percentage_test=(correct_cnt/float(len(images)))
          #    print('train: %.3f, test: %.3f ' % (percentage_train, percentage_test))

   
        

In [None]:
from keras.datasets import mnist
import matplotlib.pyplot as plt
import numpy as np

class RNA:
    def __init__(self, dims):
        self.dims = dims
        self.weights = {}
        self.L = len(dims)

        for i in range(1, len(dims)):
            self.weights[f'W{i-1}'] = np.random.randn(dims[i], dims[i-1]) * (1/np.sqrt(dims[i-1]))
            self.weights[f'B{i-1}'] = np.zeros((dims[i], 1))
                        
    def derivative(self, A):
        return 1 - (A ** 2)
    
    def activation_function(self, Z):
        return np.tanh(Z)
    
    def softmax(self, x):
        x = x - np.max(x)
        temp = np.exp(x)
        return temp / np.sum(temp, axis=0, keepdims=True)

    def forward(self, X, Y=None):
        self.cache = {}
        self.cache['A0'] = X
        m = X.shape[1]
        for j in range(1, self.L):
            if j < self.L - 1:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.activation_function(self.cache[f'Z{j}'])
            else:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.softmax(self.cache[f'Z{j}'])
        return self.cache[f'A{self.L-1}']
    
    def backward(self, Y):
        self.cache[f'd{self.L-1}'] = self.cache[f'A{self.L-1}'] - Y
        self.cache[f'd{self.L-1}'] = np.multiply(self.cache[f'd{self.L-1}'], self.derivative(self.cache[f'A{self.L-1}']))
        
        for j in reversed(range(0, self.L-1)):
            if j > 0:
                self.cache[f'd{j}'] = np.dot(self.weights[f'W{j}'].T, self.cache[f'd{j+1}'])
                self.cache[f'd{j}'] = np.multiply(self.cache[f'd{j}'], self.derivative(self.cache[f'A{j}']))

            self.cache[f'dW{j}'] = (1/len(Y)) * (np.dot(self.cache[f'd{j+1}'], self.cache[f'A{j}'].T))
            self.cache[f'dB{j}'] = (1/len(Y)) * np.sum(self.cache[f'd{j+1}'], axis=1, keepdims=True)
    
    def update(self, alpha):
        for j in range(0, self.L-1):
            self.weights[f'W{j}'] = self.weights[f'W{j}'] - alpha * self.cache[f'dW{j}']
            self.weights[f'B{j}'] = self.weights[f'B{j}'] - alpha * self.cache[f'dB{j}']
                    
    def train(self, X, Y, X_test, Y_test, alpha, steps):
        percentage_train_list = []
        percentage_test_list = []
        
        Y_one_hot = np.zeros((10, X.shape[1]))        
        for index, value in enumerate(Y):
            Y_one_hot[value][index] = 1
        
        for i in range(0, steps):
            self.forward(X, Y_one_hot)
            self.backward(Y_one_hot)
            self.update(alpha)
            if i % 100 == 0:
                percentage_train = self.evaluate(X, Y)
                percentage_train_list.append(percentage_train)
                
                percentage_test = self.evaluate(X_test, Y_test)
                percentage_test_list.append(percentage_test)                
                
                print('train: %.3f, test: %.3f ' % (percentage_train, percentage_test))

    def evaluate(self, X, Y):
        Y_hat = self.predict(X)
        classified_correctly = test_correct = np.count_nonzero(np.argmax(Y_hat, axis=0) == Y)
        return classified_correctly / X.shape[1]
    
    def predict(self, X):
        return self.forward(X) 

In [None]:
from keras.datasets import mnist
import matplotlib.pyplot as plt
import numpy as np

class RNA:
    def __init__(self, dims):
        self.dims = dims
        self.weights = {}
        self.L = len(dims)

        for i in range(1, len(dims)):
            self.weights[f'W{i-1}'] = np.random.randn(dims[i], dims[i-1]) * (1/np.sqrt(dims[i-1]))
            self.weights[f'B{i-1}'] = np.zeros((dims[i], 1))
                        
    def derivative(self, A):
        return 1 - (A ** 2)
    
    def activation_function(self, Z):
        return np.tanh(Z)
    
    def softmax(self, x):
        x = x - np.max(x)
        temp = np.exp(x)
        return temp / np.sum(temp, axis=0, keepdims=True)

    def forward(self, X, Y=None):
        self.cache = {}
        self.cache['A0'] = X
        m = X.shape[1]
        for j in range(1, self.L):
            if j < self.L - 1:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.activation_function(self.cache[f'Z{j}'])
            else:
                self.cache[f'Z{j}'] = np.dot(self.weights[f'W{j-1}'], self.cache[f'A{j-1}']) + self.weights[f'B{j-1}']
                self.cache[f'A{j}'] = self.softmax(self.cache[f'Z{j}'])
        return self.cache[f'A{self.L-1}']
    
    def backward(self, Y):
        self.cache[f'd{self.L-1}'] = self.cache[f'A{self.L-1}'] - Y
        self.cache[f'd{self.L-1}'] = np.multiply(self.cache[f'd{self.L-1}'], self.derivative(self.cache[f'A{self.L-1}']))
        
        for j in reversed(range(0, self.L-1)):
            if j > 0:
                self.cache[f'd{j}'] = np.dot(self.weights[f'W{j}'].T, self.cache[f'd{j+1}'])
                self.cache[f'd{j}'] = np.multiply(self.cache[f'd{j}'], self.derivative(self.cache[f'A{j}']))

            self.cache[f'dW{j}'] = (1/len(Y)) * (np.dot(self.cache[f'd{j+1}'], self.cache[f'A{j}'].T))
            self.cache[f'dB{j}'] = (1/len(Y)) * np.sum(self.cache[f'd{j+1}'], axis=1, keepdims=True)
    
    def update(self, alpha):
        for j in range(0, self.L-1):
            self.weights[f'W{j}'] = self.weights[f'W{j}'] - alpha * self.cache[f'dW{j}']
            self.weights[f'B{j}'] = self.weights[f'B{j}'] - alpha * self.cache[f'dB{j}']
                    
    def train(self, X, Y, X_test, Y_test, alpha, steps):
        percentage_train_list = []
        percentage_test_list = []
        
        Y_one_hot = np.zeros((10, X.shape[1]))        
        for index, value in enumerate(Y):
            Y_one_hot[value][index] = 1
        
        for i in range(0, steps):
            self.forward(X, Y_one_hot)
            self.backward(Y_one_hot)
            self.update(alpha)
            if i % 100 == 0:
                percentage_train = self.evaluate(X, Y)
                percentage_train_list.append(percentage_train)
                
                percentage_test = self.evaluate(X_test, Y_test)
                percentage_test_list.append(percentage_test)                
                
                print('train: %.3f, test: %.3f ' % (percentage_train, percentage_test))

    def evaluate(self, X, Y):
        Y_hat = self.predict(X)
        classified_correctly = test_correct = np.count_nonzero(np.argmax(Y_hat, axis=0) == Y)
        return classified_correctly / X.shape[1]
    
    def predict(self, X):
        return self.forward(X) 

O trecho de código abaixo irá plotar o filtros treinados pela RNA. O que você observa nos filtros treinados pelo seu modelo? 

Escreva aqui a sua resposta: Os filtros treinados pelo modelo evidenciam regiões mais acentuadas, possivelmente características aprendidas de uma parte corespondente de um determinado algarismo.



In [None]:
for i in range(kernels.shape[1]):
    plot_digit(kernels[:,i].reshape(kernel_rows, kernel_cols))