# Chaotic systems prediction using NN

## This notebook is developed to show how well Neural Networks perform when presented with the task of predicting the trajectories of **Chaotic Systems**, this notebook is part of the work presented in *New results for prediction of chaotic systems using Deep Recurrent Neural Networks* in the journal of **Neural Processing Letters**

### In this experiment RNN-LSTM, RNN-GRU and MLP neural networks are trained and tested to predict the trajectories of the chaotic systems of Lorenz, Rabinovich-Fabrikant and Rossler.

## Description of this Notebook

*   The initial conditions of the chaotic systems are defined in the *Chaos_Attractors* class
*   The *NN_Identifier* class is where the neural test and training takes place, the outputs are graphs that show the performance obtained by the trained model as well as the trajectories identified by the neural network
*  In the last cells the global parameters such as the number of epochs, layers, neurons and batch size are defined to train and predict the chaotic systems as well as the time series size. 



#Libraries

In [1]:
import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, GRU, Dense, Dropout, Masking, Embedding, Flatten
from sklearn.preprocessing import MinMaxScaler
from scipy.integrate import odeint
from google.colab import drive
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np 
import math
from mpl_toolkits.mplot3d import Axes3D
fig_size = plt.rcParams["figure.figsize"]

In [None]:
#Save images to you Google Drive Path 
drive.mount('/content/gdrive')
images_dir = '/content/gdrive/My Drive/Colab_Images'

#Lorenz, Rabinovich–Fabrikant and Rossler Chaotic Systems

In [2]:
class ChaosAttractors():
    """
    Initial conditions for the systems to display chaotic behaviour are 
    defined as follows:

    Lorenz 63 -> s = 10, r = 8/3, b = 28 and dt = 0.01
    Fabrikant-Rabinovich -> a = 0.14, g = 0.1 and dt = 0.01
    Rossler -> a = 0.2, b = 0.2, c = 6.3 and dt = 0.01 
    """
    def __init__(self, steps, lrz_s=10, lrz_r=28, lrz_b=8/3, lrz_dt = 0.01, 
                 rab_fab_a = 0.14, rab_fab_g = 0.1, rab_fab_dt = 0.01,
                 ros_a=0.2, ros_b=0.2, ros_c=6.3, ros_dt = 0.01):
        self.lrz_s = lrz_s
        self.lrz_b = lrz_b
        self.lrz_r = lrz_r
        self.lrz_dt = lrz_dt
        self.rab_fab_a = rab_fab_a
        self.rab_fab_g = rab_fab_g
        self.rab_fab_dt = rab_fab_dt
        self.ros_a = ros_a
        self.ros_b = ros_b
        self.ros_c = ros_c
        self.ros_dt = ros_dt
        self.steps = steps
        
    """Lorenz 63 System"""    
    def lorenz63(self):
        xs = np.empty((self.steps + 1,))
        ys = np.empty((self.steps + 1,))
        zs = np.empty((self.steps + 1,))
        
        xs[0], ys[0], zs[0] = (1.0, 1.0, 1.0)
        for i in range(self.steps):
            x_dot = self.lrz_s*(ys[i] - xs[i])
            y_dot = self.lrz_r*xs[i] - ys[i] - xs[i]*zs[i]
            z_dot = xs[i]*ys[i] - self.lrz_b*zs[i]
            xs[i + 1] = xs[i] + (x_dot * self.lrz_dt)
            ys[i + 1] = ys[i] + (y_dot * self.lrz_dt)
            zs[i + 1] = zs[i] + (z_dot * self.lrz_dt)
        return xs, ys, zs
    
    """Rabinovich–Fabrikant equations"""
    def rabinovich_fabrikant(self):
        xs = np.zeros((self.steps))
       	ys = np.zeros((self.steps))
       	zs = np.zeros((self.steps))
       	xs[0] ,ys[0] ,zs[0] = (-1,0,0.5)
       	
       	for i in range(1,self.steps):
       		x = xs[i-1]
       		y = ys[i-1]
       		z = zs[i-1]
       		dx = y*(z - 1 + x*x) + self.rab_fab_g*x
       		dy = x*(3*z + 1 - x*x) + self.rab_fab_g *y
       		dz = -2*z*(self.rab_fab_a  + x*y)
       		xs[i] = x+self.rab_fab_dt*dx
       		ys[i] = y+self.rab_fab_dt*dy
       		zs[i] = z+self.rab_fab_dt*dz
        return xs, ys, zs
    
    """Rossler Hyperchaotic System"""
    def rossler(self):
        xs = np.empty([self.steps + 1])
        ys = np.empty([self.steps + 1])
        zs = np.empty([self.steps + 1])
        xs[0], ys[0], zs[0] = (1.0, 1.0, 1.0)
        
        for i in range(self.steps):
            x_dot = -ys[i] - zs[i]
            y_dot = xs[i] + self.ros_a*ys[i]
            z_dot = self.ros_b + xs[i]*zs[i] - self.ros_c*zs[i]
            xs[i+1] = xs[i] + (x_dot * self.ros_dt)
            ys[i+1] = ys[i] + (y_dot * self.ros_dt)
            zs[i+1] = zs[i] + (z_dot * self.ros_dt)
        return xs, ys, zs

# Neural characterization models

In [3]:
class NNIdentifier():
  """
    Neural network models to predict chaotic systems
    The neural network models used are the RNN-LSTM, RNN-GRU and MLP
    ...

    Attributes
    ----------
    num_neurons : int
        Number of neurons used in each layer of the NN
    num_neurons : int
        Number of layers in the NN
    dataset : array[x ,y ,z]
        Dataset used to train and test the NN model
    training_epochs : int 
        Number of epochs for training the NN
    batch_size: int
        Size of the batch passed to the NN
    attractor_name: string
        Name of the chaotic system (Used for title of the trajectory graph)
    chaos_x_series: array[x]
        Time series of the chaotic system in the X variable
    chaos_y_series: array[x]
        Time series of the chaotic system in the Y variable
    chaos_z_series: array[x]
        Time series of the chaotic system in the Z variable
    """
    def __init__(self, num_neurons, num_layers, dataset, training_epochs, batch_size, attractor_name, chaos_x_series, chaos_y_series, chaos_z_series):
        self.num_neurons = num_neurons
        self.num_layers = num_layers
        self.dataset = dataset
        self.training_epochs = training_epochs
        self.batch_size = batch_size
        self.trainX = []
        self.trainY = []
        self.testX = []
        self.testY = []
        self.attractor_name = attractor_name
        self.look_back = 1
        self.chaos_x_series = chaos_x_series
        self.chaos_y_series = chaos_y_series
        self.chaos_z_series = chaos_z_series
        
    def predict_attractor(self):
        self.normalize_datase()
        self.train_eval_models()
    
    def create_dataset(self, dataset, look_back=1):
    	dataX, dataY = [], []
    	for i in range(len(dataset)-look_back-1):
    		a = dataset[i:(i+look_back)]
    		dataX.append(a)
    		dataY.append(dataset[i + look_back])
    	return np.array(dataX), np.array(dataY)
 
    def normalize_datase(self):
        # Normalize Uk 
        scaler = MinMaxScaler(feature_range=(0, 1))
        dataset = scaler.fit_transform(self.dataset)
        
        # split into train and test sets
        train_size = int(len(dataset) * 0.6)
        test_size = len(dataset) - train_size
        train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]
        
        # reshape into X=t and Y=t+1
        look_back = 1
        self.trainX, self.trainY = self.create_dataset(train, look_back)
        self.testX, self.testY = self.create_dataset(test, look_back)
        
        # reshape input to be [samples, time steps, features]
        self.trainX = np.reshape(self.trainX, (self.trainX.shape[0], 1, self.trainX.shape[2]))
        self.testX = np.reshape(self.testX, (self.testX.shape[0], 1, self.testX.shape[2]))
        
    def gru_model(self):
        """GRU RNN"""
        gru_model = Sequential()
        if(self.num_layers>1):
            gru_model.add(GRU(self.num_neurons, input_shape=(1,3), return_sequences = True))
            for x in range(self.num_layers):
                gru_model.add(GRU(self.num_neurons, return_sequences = True))
                if(x == self.num_layers-1):
                    gru_model.add(GRU(self.num_neurons, return_sequences = False))
        else:
            gru_model.add(GRU(self.num_neurons, input_shape=(1,3), return_sequences = False))
        gru_model.add(Dense(3))
        gru_model.compile(optimizer='adam', loss='mean_squared_error', metrics =['mse','acc'])
        seq_gru_model = gru_model.fit(self.trainX, self.trainY, epochs=self.training_epochs, batch_size=self.batch_size)
        return gru_model, seq_gru_model
        
    def lstm_model(self):
        """LSTM RNN"""
        lstm_model = Sequential()
        if(self.num_layers>1):
            lstm_model.add(LSTM(self.num_neurons, input_shape=(1,3), return_sequences = True))
            for x in range(self.num_layers):
                lstm_model.add(LSTM(self.num_neurons, return_sequences = True))
                if(x == self.num_layers-1):
                    lstm_model.add(LSTM(self.num_neurons, return_sequences = False))
        else:
            lstm_model.add(LSTM(self.num_neurons, input_shape=(1,3), return_sequences = False))
        lstm_model.add(Dense(3))
        lstm_model.compile(optimizer='adam', loss='mean_squared_error', metrics =['mse','acc'])
        seq_lstm_model = lstm_model.fit(self.trainX, self.trainY, epochs=self.training_epochs, batch_size=self.batch_size)
        return lstm_model, seq_lstm_model
        
    def mlp_model(self):
        """MLP NN"""
        mlp_model = Sequential()
        mlp_model.add(Dense(self.num_neurons, input_shape=(1,3)))
        if(self.num_layers>1):
            for x in range(self.num_layers): 
                mlp_model.add(Dense(self.num_neurons))
        mlp_model.add(Flatten())
        mlp_model.add(Dense(3))
        mlp_model.compile(optimizer='adam', loss='mean_squared_error', metrics =['mse','acc'])
        seq_mlp_model = mlp_model.fit(self.trainX, self.trainY, epochs=self.training_epochs, batch_size=self.batch_size)
        return mlp_model, seq_mlp_model
        
        
    def predict_eval_model(self, model, label_nn):
        scaler = MinMaxScaler(feature_range=(0, 1))
        dataset = scaler.fit_transform(self.dataset)
        # make predictions
        trainPredict = model.predict(self.trainX)
        testPredict = model.predict(self.testX)
        # invert predictions
        trainPredict = scaler.inverse_transform(trainPredict)
        self.trainY = scaler.inverse_transform(self.trainY)
        testPredict = scaler.inverse_transform(testPredict)
        self.testY = scaler.inverse_transform(self.testY)
        # shift train predictions for plotting
        trainPredictPlot = np.empty_like(dataset)
        trainPredictPlot[:, :] = np.nan
        trainPredictPlot[self.look_back:len(trainPredict)+self.look_back, :] = trainPredict
        # shift test predictions for plotting
        testPredictPlot = np.empty_like(dataset)
        testPredictPlot[:, :] = np.nan
        testPredictPlot[len(trainPredict)+(self.look_back*2)+1:len(dataset)-1, :] = testPredict
        # get values to graph
        val_xtrain = []
        val_ytrain = []
        val_ztrain = []
        for x in range(len(trainPredictPlot)):
          val_xtrain.append(trainPredictPlot[x][0])
          val_ytrain.append(trainPredictPlot[x][1])
          val_ztrain.append(trainPredictPlot[x][2])
        
        val_xtest = []
        val_ytest = []
        val_ztest = []
        for x in range(len(testPredictPlot)):
          val_xtest.append(testPredictPlot[x][0])
          val_ytest.append(testPredictPlot[x][1])
          val_ztest.append(testPredictPlot[x][2])
        #Graph
        fig = plt.figure()
        ax = fig.gca(projection='3d')
        
        ax.plot(self.chaos_x_series, self.chaos_y_series, self.chaos_z_series, lw=0.8, label=self.attractor_name)
        ax.plot(val_xtrain, val_ytrain, val_ztrain, lw=0.5,label='Train Set')
        ax.plot(val_xtest, val_ytest, val_ztest, lw=0.5, label='Test Set')
        legend = plt.legend(loc='upper left', shadow=True, fontsize='xx-large')
        fig_size = plt.rcParams["figure.figsize"]
        ax.set_xlabel("X Axis", fontsize=20)
        ax.set_ylabel("Y Axis", fontsize=20)
        ax.set_zlabel("Z Axis", fontsize=20)
        ax.set_title(self.attractor_name + ' - '+label_nn)
        plt.rcParams["figure.figsize"] = (10,10)
        plt.savefig(f'{images_dir}/{self.attractor_name}_{label_nn}.eps', format='eps')
        plt.show()

      
    def graph_eval_model(self, gru_train_loss, lstm_train_loss, mlp_train_loss, gru_train_acc, lstm_train_acc, mlp_train_acc):
        """Loss evaluation and graph"""
        xc = range(self.training_epochs)
        plt.figure()
        plt.plot(xc, gru_train_loss, label='MSE - GRU')
        plt.plot(xc, lstm_train_loss, label='MSE - LSTM')
        plt.plot(xc, mlp_train_loss, label='MSE - MLP')
        plt.xlabel('Epochs')
        plt.ylabel('Error %')
        plt.yscale('log')
        plt.title('MSE for the '+ self.attractor_name)
        legend = plt.legend(loc='upper right', shadow=True, fontsize='x-large')
        plt.grid(True)
        plt.show()
        """Accuracy evaluation and graph"""
        plt.figure()
        plt.plot(xc, gru_train_acc, label ='Accuracy - GRU')
        plt.plot(xc, lstm_train_acc, label ='Accuracy - LSTM')
        plt.plot(xc, mlp_train_acc, label ='Accuracy - MLP')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy %')
        plt.title('Accuracy for the '+ self.attractor_name+' with '+str(self.num_layers)+' layers - '+str(self.num_neurons)+' neurons')
        legend = plt.legend(loc='lower right', shadow=True, fontsize='x-large')
        plt.grid(True)
        plt.show()
        
    def eval_model(self, model, seqModel):
        loss_and_metrics = model.evaluate(self.testX, self.testY, batch_size=self.batch_size)
        train_loss = seqModel.history['mse']
        train_acc  = seqModel.history['acc']
        return train_loss, train_acc
    
    def train_eval_models(self):
        """Train NN Models"""
        gru_model, seq_gru_model = self.gru_model()
        lstm_model, seq_lstm_model = self.lstm_model()
        mlp_model, seq_mlp_model = self.mlp_model()
        
        """Eval NN Models"""
        gru_train_loss, gru_train_acc = self.eval_model(gru_model, seq_gru_model)
        lstm_train_loss, lstm_train_acc = self.eval_model(lstm_model, seq_lstm_model)
        mlp_train_loss, mlp_train_acc = self.eval_model(mlp_model, seq_mlp_model)
        
        """Graph NN Eval Model"""
        self.graph_eval_model(gru_train_loss, lstm_train_loss, mlp_train_loss, gru_train_acc, lstm_train_acc, mlp_train_acc)
        
        """Graph NN Predict Model"""
        self.predict_eval_model(gru_model, 'GRU')
        self.predict_eval_model(lstm_model, 'LSTM')
        self.predict_eval_model(mlp_model, 'MLP')

In [4]:
# Format dataset to pass it into the NN
def create_dataset(x,y,z):
    dataset = []
    for i in range(len(x)):
        dataset.append([x[i], y[i], z[i]])
    return dataset

# Global parameters

In [5]:
# Number of neurons 
num_neurons = 128
# Number of layers
num_layers = 5
# Number of epochs
epochs = 10
# Batch size
batch_size = 32

# Predicting Lorenz, Rabinovich-Fabrikant and Rossler systems

In [None]:
# Define length of the chaotic time series
attractors_series  = ChaosAttractors(10000)
# Obtain the time series for the Lorenz systems
lorenz_x, lorenz_y, lorenz_z = attractors_series.lorenz63()
# Create dataset to pass it to the NN
dataset = create_dataset(lorenz_x, lorenz_y, lorenz_z)
# Instantiate the class to evaluate the prediction with the previously defined parameters
nn_identifier = NNIdentifier(num_neurons, num_layers, dataset,epochs,batch_size,'Lorenz Chaotic System',lorenz_x, lorenz_y, lorenz_z)
# Start evaluation
nn_identifier.predict_attractor()

In [None]:
# Define length of the chaotic time series
attractors_series  = ChaosAttractors(50000)
# Obtain the time series for the Lorenz systems
rab_x, rab_y, rab_z = attractors_series.rabinovich_fabrikant()
# Create dataset to pass it to the NN
dataset = create_dataset(rab_x, rab_y, rab_z)
# Instantiate the class to evaluate the prediction with the previously defined parameters
nn_identifier = NNIdentifier(num_neurons, num_layers, dataset,epochs,batch_size,'Rabinovich–Fabrikant Equations',rab_x, rab_y, rab_z)
# Start evaluation
nn_identifier.predict_attractor()

In [None]:
# Define length of the chaotic time series
attractors_series  = ChaosAttractors(50000)
# Obtain the time series for the Lorenz systems
ros_x, ros_y, ros_z = attractors_series.rossler()
# Create dataset to pass it to the NN
dataset = create_dataset(ros_x, ros_y, ros_z)
# Instantiate the class to evaluate the prediction with the previously defined parameters
nn_identifier = NNIdentifier(num_neurons, num_layers, dataset,epochs,batch_size,'Rossler System',ros_x, ros_y, ros_z)
# Start evaluation
nn_identifier.predict_attractor()