<a href="https://colab.research.google.com/github/Dr-Carlos-Villasenor/Clase_Aprendizaje_Profundo/blob/main/L11_03_NARX.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Learning
##Dr. Carlos Villaseñor
## NARX(p)


# MLP

In [None]:
"""
Activations functions
Dr. Carlos Villaseñor
"""

import numpy as np

# Activation functions for output layer ------------------------------

def linear(z, derivative=False):
    a = z
    if derivative:
        da = np.ones(z.shape)
        return a, da
    return a


def logistic(z, derivative=False):
    a = 1/(1 + np.exp(-z))
    if derivative:
        da = np.ones(z.shape)
        return a, da
    return a


def softmax(z, derivative=False):
    e = np.exp(z - np.max(z, axis=0))
    a = e / np.sum(e, axis=0)
    if derivative:
        da = np.ones(z.shape)
        return a, da
    return a

# Activation functions for hidden layers -----------------------------

def tanh(z, derivative=False):
    a = np.tanh(z)
    if derivative:
        da = (1 - a) * (1 + a)
        return a, da
    return a


def relu(z, derivative=False):
    a = z * (z >= 0)
    if derivative:
        da = np.array(z >= 0, dtype=float)
        return a, da
    return a

def logistic_hidden(z, derivative=False):
    a = 1/(1 + np.exp(-z))
    if derivative:
        da = a * (1 - a)
        return a, da
    return a

In [None]:
"""
Multilayer Perceptron
Dr. Carlos Villaseñor
"""

class MLP:

    def __init__(self, layers_dims,
                 hidden_activation=tanh,
                 output_activation=logistic):

        # Attributes
        self.L = len(layers_dims) - 1
        self.w = [None] * (self.L + 1)
        self.b = [None] * (self.L + 1)
        self.f = [None] * (self.L + 1)

        # Initialize weights
        for l in range(1, self.L + 1):
            self.w[l] = -1 + 2 * np.random.rand(layers_dims[l],
                                                layers_dims[l-1])
            self.b[l] = -1 + 2 * np.random.rand(layers_dims[l], 1)

            if l == self.L:
                self.f[l] = output_activation
            else:
                self.f[l] = hidden_activation


    def predict(self, X):
        a = X
        for l in range(1, self.L + 1):
            z = np.dot(self.w[l], a) + self.b[l]
            a = self.f[l](z)
        return a

    def train(self, X, Y, epochs=500, lr=0.1):
        p = X.shape[1]
        for _ in range(epochs):
            # initialize activations
            a = [None] * (self.L + 1)
            da = [None] * (self.L + 1)
            lg = [None] * (self.L + 1)

            # Propagation
            a[0] = X
            for l in range(1, self.L + 1):
                z = np.dot(self.w[l], a[l-1]) + self.b[l]
                a[l], da[l] = self.f[l](z, derivative=True)

            # Backpropagation
            for l in range(self.L, 0, -1):
                if l == self.L:
                    lg[l] = -(Y - a[l]) * da[l]
                else:
                    lg[l] = np.dot(self.w[l+1].T, lg[l + 1]) * da[l]

            # Gradient Descent
            for l in range(1, self.L + 1):
                self.w[l] -= (lr/p) * np.dot(lg[l], a[l - 1].T)
                self.b[l] -= (lr/p) * np.sum(lg[l])

# Delay block

In [None]:
'''
DelayBlock class

Implementa un buffer de retardos para una señal vectorial

Dr. Carlos Villaseñor
'''
import numpy as np

class DelayBlock:

    def __init__(self, n_inputs, n_delays):
        # Se crea la memoria del buffer como una matriz
        self.memory = np.zeros((n_inputs, n_delays))

    def add(self, x):
        # Creo una copia profunda para evitar conflictos
        # con la señal original
        x_new = x.copy().reshape(1,-1)

        # Implementacion del retardo
        self.memory[:,1:] = self.memory[:,:-1]
        self.memory[:,0] = x_new

    def get(self):
        # Retornar la memoria como un solo vector
        return self.memory.reshape(-1, 1, order='F')

    def add_and_get(self, x):
        # Unificación de las dos funcionalidades en una sola
        self.add(x)
        return self.memory.reshape(-1, 1, order='F')

# Test code ----------------------------------------
'''
A = [np.random.rand(2,1) for i in range(50)]
db = db = DelayBlock(2,5)
for i in range(6):
    db.add(A[i])
    print(db.get())
'''

'\nA = [np.random.rand(2,1) for i in range(50)]\ndb = db = DelayBlock(2,5)\nfor i in range(6):\n    db.add(A[i])\n    print(db.get())\n'

#NARX

In [None]:
class NARX:
    '''
    Non linear AutoRegressor with eXogeneous inputs.

    Con red neuronal densa multicapa como función no lineal.
    '''

    def __init__(self, n_inputs, n_outputs, n_delays,
                       dense_hidden_layers=(100,),
                       learning_rate = 0.01,
                       n_repeat_train = 5):
        self.net = MLP(((n_inputs+n_outputs)*n_delays,
                   *dense_hidden_layers, n_outputs),
                     output_activation=linear)
        self.dbx = DelayBlock(n_inputs, n_delays)
        self.dby = DelayBlock(n_outputs, n_delays)
        self.learning_rate = learning_rate
        self.n_repeat_train = n_repeat_train

    def predict(self, x):
        '''
        Predicción NARX.
        x es un vector de tamaño (n_inputs, 1)
        '''

        # Preparar entrada extendida en el tiempo
        X_block = self.dbx.add_and_get(x)
        Y_est_block = self.dby.get()
        net_input = np.vstack((X_block, Y_est_block))


        # Predicción de la red neuronal
        y_est = self.net.predict(net_input)

        # Guardar predicción en el bloque recurrente
        self.dby.add(y_est)

        # Retornar predicción
        return y_est

    def predict_and_train(self, x, y):
        '''
        Predicción y entranemiento en línea de NARX.
        x es la entrada un vector de tamaño (n_inputs, 1)
        y es la salida deseada un vector de tamaño (n_outputs, 1)

        La predicción se entrega antes del entrenamiento pero no se guarda
        hasta despues de entrenar.
        '''

        # Preparar entrada extendida en el tiempo
        X_block = self.dbx.add_and_get(x)
        Y_est_block = self.dby.get()
        net_input = np.vstack((X_block, Y_est_block))

        # Predicción de la red neuronal
        y_est = self.net.predict(net_input)

        # Entrenar red neuronal
        self.net.train(net_input, y,
                       epochs=self.n_repeat_train,
                       lr = self.learning_rate)

        # Guardar predicción en el bloque recurrente
        self.dby.add(y_est)

        # Retornar predicción
        return y_est

# Test code

In [None]:
from scipy.integrate import odeint
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def lorenz(u,t):
    """
    Sistema de Lorenz
    """
    s=10; r=28; b=2.667
    ux_dot = s*(u[1] - u[0])
    uy_dot = r*u[0] - u[1] - u[0]*u[2]
    uz_dot = u[0]*u[1] - b*u[2]
    return [ux_dot, uy_dot, uz_dot]

#Condicion inicial
u0 = [0., 1., 1.05]

#Tiempo
t = np.linspace(0,100,10000)

#Soluciones
u = odeint(lorenz,u0,t)


# Test NARX a un paso
narx = NARX(3, 3, 10,
            dense_hidden_layers=(100,),
            learning_rate=0.01, n_repeat_train=1)
y_est = np.zeros((3, 10000))
u = u.T
for i in range(10000-1):
    x = u[:,i].reshape(-1,1)
    y = u[:,i+1].reshape(-1,1)
    y_est[:,i] = narx.predict_and_train(x, y).ravel()

#Gráfica
fig = plt.figure()
fig.add_subplot(1, 1, 1, projection='3d')
#ax = fig.gca(projection='3d')
ax = fig.gca()
ax.plot(u[0,:], u[1,:], u[2,:], lw=0.7)
ax.plot(y_est[0,:], y_est[1,:], y_est[2,:], lw=0.5)
ax.set_xlabel("Eje X")
ax.set_ylabel("Eje Y")
ax.set_zlabel("Eje Z")
ax.set_title("Atractor de Lorenz")
plt.show()