In [None]:
import warnings
import numpy as np
from typing import List, Tuple
import matplotlib.pyplot as plt
from scipy.integrate import odeint
from sklearn.utils import resample
from src.optimizers import SGD, Adam
from src.utils import generate_batches
from src.losses import mean_squared_error
from src.activations import tanh, d_tanh
from src.callbacks import ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from ode_generators import generate_damped_pendulum_solution

warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'
plt.rc('xtick',labelsize=16)
plt.rc('ytick',labelsize=16)
plt.style.use('seaborn-whitegrid')

In [None]:
t, x, dxdt = generate_damped_pendulum_solution(one_step=False, α=0.1, β=8.91, t_end=20, noise=0.1)

In [None]:
plt.figure(figsize=(16,7))

plt.subplot(1,2,1)
plt.title("State Trajectory", fontsize=22)
plt.plot(t, x, "-", label="x")
plt.plot(t, dxdt, "-", label="dxdt")
plt.xlabel("t", fontsize=20)
plt.legend()

plt.subplot(1,2,2)
plt.title("Phase Space", fontsize=22)
plt.plot(x, dxdt, "-")
plt.xlabel("x", fontsize=20)
plt.ylabel("dxdt", fontsize=20)
plt.tight_layout()
plt.show()

In [None]:
X, Y = generate_damped_pendulum_solution(N_SAMPLES=25000, α=0.1, β=8.91, Δ=0.1, one_step=True, noise=0.05)
X_train, X_valid, y_train, y_valid = train_test_split(X, Y, test_size=0.2, random_state=42)

In [None]:
class OneStepResNet:
    
    def __init__(self, nn_architecture, loss_function, optimizer, learning_schedule=None, track_gradients=False):
        self.nn_architecture = nn_architecture
        self.loss_function = loss_function
        self.optimizer = optimizer
        self.learning_schedule = learning_schedule

        # initiation of lists storing the history of metrics calculated during the learning process 
        self.train_cost_history = []
        self.valid_cost_history = []
        self.track_gradients = track_gradients
        self.grads_history = {}
        # variables for learning rate scheduling
        self.performance_metric = 1e12
        self.wait = 0
        
        
    def init_layers(self, seed=99):
        # random seed initiation
        np.random.seed(seed)
        # number of layers in our neural network
        number_of_layers = len(self.nn_architecture)
        # parameters storage initiation
        params_values = {}
        # iteration over network layers
        for idx, layer in enumerate(self.nn_architecture):
            # we number network layers from 1
            layer_idx = idx + 1
            # extracting the number of units in layers
            layer_input_size = layer["input_dim"]
            layer_output_size = layer["output_dim"]
            # initiating the values of the W matrix and vector b for subsequent layers
            params_values['W'+str(layer_idx)] = np.random.randn(layer_output_size, layer_input_size) * 1
            params_values['b'+str(layer_idx)] = np.zeros((layer_output_size, 1))

        return params_values
    
    
    def init_grads_history(self):
        
        for layer_idx_prev, layer in reversed(list(enumerate(self.nn_architecture))):
            layer_idx_curr = layer_idx_prev + 1
            
            self.grads_history["dW" + str(layer_idx_curr)] = []
            self.grads_history["db" + str(layer_idx_curr)] = []

    
    def forward_propagation(self, X, params_values):
        # creating a temporary memory to store the information needed for a backward step
        memory = {}
        # X vector is the activation for layer 0 
        A_curr = X

        # iteration over network layers
        for idx, layer in enumerate(self.nn_architecture):
            # we number network layers from 1
            layer_idx = idx + 1
            
            # transfer the activation from the previous iteration or add the residual if "skip"
            if layer["activation"] is "skip":
                A_prev = A_curr + X
            else:
                A_prev = A_curr
                
            # extraction of W for the current layer
            W_curr = params_values["W" + str(layer_idx)]
            # extraction of b for the current layer
            b_curr = params_values["b" + str(layer_idx)]
            # calculation of the input value for the activation function
            Z_curr = np.dot(W_curr, A_prev) + b_curr
            
            # calculation of activation for the current layer ()
            if layer["activation"] is not "skip":
                A_curr = tanh(Z_curr)
            else:
                A_curr = Z_curr
            
            # saving calculated values in the memory
            if layer["activation"] is not "skip":
                memory["A" + str(idx)] = A_prev
                
            memory["Z" + str(layer_idx)] = Z_curr

        # return of prediction vector and a dictionary containing intermediate values
        return A_curr, memory
    
    
    def backward_propagation(self, Y_hat, Y, memory, params_values):
        grads_values = {}

        # number of examples
        m = Y.shape[1]
        # a hack ensuring the same shape of the prediction vector and labels vector
        Y = Y.reshape(Y_hat.shape)

        # initiation of gradient descent algorithm (Derivative of mse loss function)
        dCost = 2*(Y_hat - Y)
        
        dA_prev = dCost

        for layer_idx_prev, layer in reversed(list(enumerate(self.nn_architecture))):
            # we number network layers from 1
            layer_idx_curr = layer_idx_prev + 1
            
            dA_curr = dA_prev

            # extraction of the activation function for the current layer
            if layer["activation"] is not "skip":
                A_prev = memory["A" + str(layer_idx_prev)]
                
            Z_curr = memory["Z" + str(layer_idx_curr)]

            W_curr = params_values["W" + str(layer_idx_curr)]
            b_curr = params_values["b" + str(layer_idx_curr)]
            
            if layer["activation"] == "skip":
                # Last layer has no activation function, so slightly different back prop
                dZ_curr = dCost 
                # derivative of the matrix W
                dW_curr = np.dot(dCost, Z_curr.T) / m
                # derivative of the vector b
                db_curr = np.sum(dCost, axis=1, keepdims=True) / m
                # derivative of the matrix A_prev
                dA_prev = np.dot(W_curr.T, dZ_curr)
            else:
                # calculation of the activation function derivative 
                dZ_curr = dA_curr * d_tanh(Z_curr)
                # derivative of the matrix W
                dW_curr = np.dot(dZ_curr, A_prev.T) / m
                # derivative of the vector b
                db_curr = np.sum(dZ_curr, axis=1, keepdims=True) / m
                # derivative of the matrix A_prev
                dA_prev = np.dot(W_curr.T, dZ_curr)
                
            grads_values["dW" + str(layer_idx_curr)] = dW_curr.astype(np.float32)
            grads_values["db" + str(layer_idx_curr)] = db_curr.astype(np.float32)

        return grads_values
    
    
    def log_gradients(self, grads_values):
        
        for layer_idx_prev, layer in reversed(list(enumerate(self.nn_architecture))):
            layer_idx_curr = layer_idx_prev + 1
            
            dW = grads_values["dW" + str(layer_idx_curr)]
            db = grads_values["db" + str(layer_idx_curr)]
            
            self.grads_history["dW" + str(layer_idx_curr)].append(np.mean(dW, axis=0))
            self.grads_history["db" + str(layer_idx_curr)].append(np.mean(db, axis=0))
                            
            
    def train(self, X_train, Y_train, epochs, X_valid, Y_valid, batch_size=10):
        
        # initiation of neural net parameters
        params_values = self.init_layers()
        self.init_grads_history()
        
        X_valid = np.transpose(X_valid)
        Y_valid = np.transpose(Y_valid)
            
        # performing calculations for subsequent iterations
        for i in range(epochs):
            # Reshuffle the data 
            X_train_, Y_train_ = resample(X_train, Y_train, n_samples=None)
            # then transpose back into "correct" shape 
            X_train_ = np.transpose(X_train_)
            Y_train_ = np.transpose(Y_train_)
            
            epoch_train_cost = []
            epoch_valid_cost = []

            for idx, (X_batch, Y_batch) in enumerate(generate_batches(X_train_, Y_train_, batch_size)):
                
                # step forward
                Y_hat, cache = self.forward_propagation(X_batch, params_values)

                # calculating metrics and saving them in history
                cost = self.loss_function(Y_hat, Y_batch)
                epoch_train_cost.append(cost)

                # step backward - calculating gradient
                grads_values = self.backward_propagation(Y_hat, Y_batch, cache, params_values)
                
                if self.track_gradients:
                    self.log_gradients(grads_values)
            
                # updating model state
                params_values = self.optimizer.update(params_values, grads_values, self.nn_architecture)
                
            # step forward
            Y_hat, cache = self.forward_propagation(X_valid, params_values)

            # calculating metrics and saving them in history
            epoch_valid_cost = self.loss_function(Y_hat, Y_valid)               
            epoch_mean_train_cost = np.mean(epoch_train_cost)
            self.train_cost_history.append(epoch_mean_train_cost)
            self.valid_cost_history.append(epoch_valid_cost)
            
            if self.learning_schedule is not None:
                self.learning_schedule.update(epoch_valid_cost, self.optimizer)

            print("\r Epoch: {:05} - Train cost: {:.7f} - Valid cost: {:.7f}".format(i, 
                                                    epoch_mean_train_cost, epoch_valid_cost), end='')

        return params_values
    
    
    def predict(self, X, params_values):
        y , _ = self.forward_propagation(np.expand_dims(X, axis=0).T, params_values)
        return y

## Testing NumPy model

In [None]:
NN_ARCHITECTURE = [
    {"input_dim": 2, "output_dim": 30,"activation": "tanh"},
    {"input_dim": 30,"output_dim": 30,"activation": "tanh"},
    {"input_dim": 30,"output_dim": 2, "activation": "tanh"},
    {"input_dim": 2, "output_dim": 2, "activation": "skip"},
]

In [None]:
model = OneStepResNet(
    nn_architecture = NN_ARCHITECTURE,
    loss_function = mean_squared_error,
    optimizer = Adam(lr = 0.001),
    learning_schedule = ReduceLROnPlateau(patience=100, factor=0.5, min_lr=1e-5),
    track_gradients = False
)

In [None]:
params_values = model.train(
    X_train = X_train,
    Y_train = y_train, 
    epochs = 500, 
    X_valid = X_valid,
    Y_valid = y_valid,
    batch_size = 20,
)

In [None]:
def plot_history(model): 
    train_loss = model.train_cost_history
    valid_loss = model.valid_cost_history
    epochs = range(len(train_loss))
    plt.figure(figsize=(15,7))
    plt.plot(epochs, train_loss, 'b', alpha=0.7, label="Training")
    plt.plot(epochs, valid_loss, 'r', alpha=0.7, label="Validation")
    plt.title('Training and Validation Loss',fontsize=22)
    plt.yscale("log")
    plt.ylim([0,0.1])
    plt.xlabel("Epoch", fontsize=20)
    plt.legend(prop={"size":20})
    plt.show()

In [None]:
plot_history(model)

### Visualize

Todo: get cool animation by calling `predict_solution` many times during training and clearning output

In [None]:
def predict_solution(Δ=0.1, t_end=20):

    t_steps = np.arange(0,t_end, Δ)
    X_pred = np.zeros((len(t_steps), 2))

    x_0 = np.array([-1.193, -3.876])
    x_Δ = np.array([-1.193, -3.876])
    
    for i in range(0, len(t_steps)):
        x_0 = x_Δ
        x_Δ = model.predict(x_0, params_values)
        x_Δ = np.squeeze(x_Δ.T)
        X_pred[i] = x_Δ   
        
    t_steps = t_steps + Δ
    return t_steps, X_pred

In [None]:
def get_error(t_actual, x_actual, t_pred, x_pred, t_end):
    Nt = t_actual.shape[0]
    errors = []
    for i in range(0,len(t_pred)):
        t_idx = int((Nt/t_end)*t_pred[i] - 1)
        x_true = x_actual[t_idx]
        x_est = x_pred[i]
        err = np.abs((x_true - x_est)/x_true)*100
        #print("x true = {:.4f}  x est = {:.4f}  error = {:.4f}".format(x_true, x_est, err))
        errors.append(err)
    return errors

In [None]:
Δ=0.1
t_end=20

In [None]:
t_steps, X_pred = predict_solution(Δ=Δ, t_end=t_end)  
t, x, dxdt = generate_damped_pendulum_solution(one_step=False, α=0.1, β=8.91, Δ=Δ, t_end=t_end)

In [None]:
error = get_error(t_actual=t, x_actual=x, t_pred=t_steps, x_pred=X_pred[:,0], t_end=t_end)

In [None]:
COLOUR1 = (0.99, 0.72, 0.01, 1.0)

In [None]:
plt.figure(figsize=(16,8))
plt.title("Position", fontsize=22)
plt.plot(t, x, "-", c="r", linewidth=3, label="x(t)")
plt.plot(t_steps, X_pred[:,0], ".", c="b", markersize=10, label="x(t) Predicted")
t_idx = [int(tx) for tx in (len(t)/t_end)*t_steps-1]
plt.fill_between(t_steps, X_pred[:,0], x[t_idx], color=COLOUR1, alpha=0.5)
plt.legend(prop={"size":16})
plt.show()

In [None]:
plt.figure(figsize=(16,8))
plt.title("Velocity", fontsize=22)
plt.plot(t, dxdt, "-", c="r", linewidth=3, label="dx/dt (t)")
plt.plot(t_steps, X_pred[:,1],".", c="b", markersize=10, label="dx/dt (t) Predicted")
t_idx = [int(tx) for tx in (len(t)/t_end)*t_steps-1]
plt.fill_between(t_steps, X_pred[:,1], dxdt[t_idx], color=COLOUR1, alpha=0.5)
plt.legend(prop={"size":16})
plt.show()

In [None]:
plt.figure(figsize=(16,8))
plt.title("Phase Space", fontsize=22)
plt.plot(x, dxdt, "-", c="r", linewidth=2, label="Reference")
plt.plot(X_pred[:,0], X_pred[:,1], ":.", c="b", markersize=10, label="Approximation")
plt.legend(prop={"size":16})
plt.show()

In [None]:
plt.figure(figsize=(16,8))
plt.title("Percent Error", fontsize=22)
plt.plot(t_steps, error, ":.", linewidth=2, c="r", markerfacecolor='blue', markersize=12)
plt.ylabel("% Error", fontsize=20)
plt.yscale("log")
plt.show()