# Import libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt  
from sklearn.model_selection import train_test_split
import math
import random

In [None]:
# Load data from Excel file
data = pd.read_excel('Lorenz Dataset.xlsx', header=None).values  
# data = pd.read_excel('Tehran Stock Exchange.xlsx', header=None).values
# data = pd.read_excel('ECG Datasets.xlsx', header=None).values

data_size = len(data) 

In [None]:
# Normalize the input data
for ii in range(len(data[0])): 
    data[:, ii] = data[:, ii] / np.max(data[:, ii])  

num_feature = len(data[0]) - 1 

In [None]:
# for row in data:
#     for elem in row:
#         if abs(elem) > 1:
#             print(row, elem) 

In [None]:
#Lorenz Dataset
np.random.seed(42)
epochs = 100 
regularization_strength = 0.005
learning_rate = 0.0001 

#Stock Dataset
# np.random.seed(41)
# epochs = 100 
# regularization_strength = 0.008
# learning_rate = 0.0001

In [None]:
class GMDHNeuron:
    def __init__(self, x1_index, x2_index):
        self.x1_index = x1_index
        self.x2_index = x2_index
        self.weights = np.random.uniform(-1, 1, 6) 
        self.train_mse_history = []
        self.validation_mse = None
        
    def train(self, x1_train, x2_train, actual_output, epochs,learning_rate, regularization_strength):
        self.train_mse_history = []
        for epoch in range(epochs):
            for i in range(len(x1_train)):
                predicted_output = self.predict(x1_train[i], x2_train[i])
                error = predicted_output - actual_output[i]
#                 print('index: ', i)
#                 print('error: ', error)
#                 print('x1_train[i]**2: ',x1_train[i]**2)
#                 print('x2_train[i]**2: ',x2_train[i]**2)
                
                gradient = np.array([
                    -1 * error * x1_train[i]**2,
                    -1 * error * x2_train[i]**2,
                    -1 * error * x1_train[i] * x2_train[i],
                    -1 * error * x1_train[i],
                    -1 * error * x2_train[i],
                    -1 * error
                ])
                
                # L2 regularization
                regularization_term = 2 * regularization_strength * self.weights  
                
                self.weights = self.weights - learning_rate * gradient - regularization_term
            
            train_mse = self.calculate_mse(x1_train, x2_train, actual_output)
#             print(train_mse)
            self.train_mse_history.append(train_mse)
        
        # calculate validation MSE
        self.validation_mse = self.calculate_mse(validation_data[:,self.x1_index],validation_data[:,self.x2_index],validation_output)
        
    def predict(self, x1, x2):
        return self.weights[0] * x1**2 + self.weights[1] * x2**2 + self.weights[2] * x1 * x2 + self.weights[3] * x1  + self.weights[4] * x2 + self.weights[5]

    def calculate_mse(self, x1, x2, target):
        predicted_output = self.predict(x1, x2)
        mse = np.mean((predicted_output - target) ** 2)
        return mse


    

In [None]:
# Split data into training, validation, and testing sets
train_size = int(0.7 * data_size)
validation_size = int(0.15 * data_size)

train_data, validation_data, test_data = data[:train_size,:num_feature], data[train_size:train_size + validation_size,:num_feature], data[train_size + validation_size:,:num_feature]
train_output, validation_output, test_output = data[:train_size,num_feature], data[train_size:train_size + validation_size,num_feature], data[train_size + validation_size:,num_feature]


In [None]:
first_gmdh_layer_neurons = []
for i in range(num_feature):
    for j in range(i + 1, num_feature):
        neuron = GMDHNeuron(i,j)
        neuron.train(train_data[:, i], train_data[:, j], train_output, epochs,learning_rate, regularization_strength)
        first_gmdh_layer_neurons.append(neuron)

In [None]:
for neuron in first_gmdh_layer_neurons:
    # Plotting the training data and output
    plt.figure(figsize=(20, 8)) 
    plt.semilogy(np.arange(1, epochs + 1 ), neuron.train_mse_history)
    plt.xlabel('Epoch')
    plt.ylabel('MSE Train') 
    plt.show()
    print('selected_neurons-indexs: ', neuron.x1_index,neuron.x2_index)
    print("Final Weights:", neuron.weights)
    print("Final Train MSE:", neuron.train_mse_history[-1])
    print("Final Validation MSE:", neuron.validation_mse)
    

In [None]:
first_gmdh_layer_train_output = [neuron.predict(train_data[:, neuron.x1_index], train_data[:, neuron.x2_index]) for neuron in first_gmdh_layer_neurons]
train_data = np.array(list(zip(*first_gmdh_layer_train_output)))

first_gmdh_layer_validation_output = [neuron.predict(validation_data[:, neuron.x1_index], validation_data[:, neuron.x2_index]) for neuron in first_gmdh_layer_neurons]
validation_data = np.array(list(zip(*first_gmdh_layer_validation_output))) 

first_gmdh_layer_test_output = [neuron.predict(test_data[:, neuron.x1_index], test_data[:, neuron.x2_index]) for neuron in first_gmdh_layer_neurons]
test_data  = np.array(list(zip(*first_gmdh_layer_test_output))) 

In [None]:
# # Normalize the train_data
for ii in range(len(train_data[0])): 
    train_data[:, ii] = train_data[:, ii] / np.max(train_data[:, ii])
    
# Normalize the validation_data
for ii in range(len(validation_data[0])): 
    validation_data[:, ii] = validation_data[:, ii] / np.max(validation_data[:, ii])

# Normalize the test_data
for ii in range(len(test_data[0])): 
    test_data[:, ii] = test_data[:, ii] / np.max(test_data[:, ii])

In [None]:
#Lorenz Dataset
np.random.seed(42)
epochs = 100 
regularization_strength = 0.005
learning_rate = 0.0015

#Stock Dataset
# np.random.seed(41)
# epochs = 100 
# regularization_strength = 0.005
# learning_rate = 0.00015

In [None]:
second_gmdh_layer_neurons = []
for i in range(len(train_data[0])):
    for j in range(i + 1, len(train_data[0])):
        neuron = GMDHNeuron(i,j)
        neuron.train(train_data[:, i], train_data[:, j], train_output, epochs,learning_rate, regularization_strength)
        second_gmdh_layer_neurons.append(neuron)

In [None]:
for neuron in second_gmdh_layer_neurons:
    # Plotting the training data and output
    plt.figure(figsize=(20, 8)) 
    plt.semilogy(np.arange(1, epochs + 1 ), neuron.train_mse_history)
    plt.xlabel('Epoch')
    plt.ylabel('MSE Train') 
    plt.show()
    print('selected_neurons-indexs: ', neuron.x1_index,neuron.x2_index)
    print("Final Weights:", neuron.weights)
    print("Final Train MSE:", neuron.train_mse_history[-1])
    print("Final Validation MSE:", neuron.validation_mse)
    

In [None]:
second_gmdh_layer_neurons = sorted(second_gmdh_layer_neurons, key=lambda x: x.validation_mse)
# Select the top two neurons with the lowest validation_mse
second_gmdh_layer_neurons = second_gmdh_layer_neurons[:2]

In [None]:
second_gmdh_layer_train_output = [neuron.predict(train_data[:, neuron.x1_index], train_data[:, neuron.x2_index]) for neuron in second_gmdh_layer_neurons]
train_data = np.array(list(zip(*second_gmdh_layer_train_output)))

second_gmdh_layer_validation_output = [neuron.predict(validation_data[:, neuron.x1_index], validation_data[:, neuron.x2_index]) for neuron in second_gmdh_layer_neurons]
validation_data = np.array(list(zip(*second_gmdh_layer_validation_output))) 

second_gmdh_layer_test_output = [neuron.predict(test_data[:, neuron.x1_index], test_data[:, neuron.x2_index]) for neuron in second_gmdh_layer_neurons]
test_data  = np.array(list(zip(*second_gmdh_layer_test_output))) 

In [None]:
# Normalize the input data
for ii in range(len(train_data[0])): 
    train_data[:, ii] = train_data[:, ii] / np.max(train_data[:, ii])
    
# Normalize the input data
for ii in range(len(validation_data[0])): 
    validation_data[:, ii] = validation_data[:, ii] / np.max(validation_data[:, ii])

# Normalize the input data
for ii in range(len(test_data[0])): 
    test_data[:, ii] = test_data[:, ii] / np.max(test_data[:, ii])

In [None]:
ACTIVATION_FUNC = 'leaky_relu'
leaky_relu_alpha = 0.01
def activation_function(x,fun_name=ACTIVATION_FUNC):
    if(fun_name == 'relu'): 
        return np.maximum(0, x)
    elif(fun_name == 'logsig'): 
        return  1 /( 1 + (math.e)**(-1 * x))
    elif(fun_name == 'tansig'):
        return 2/(1+ (math.e)**(-2*x))-1
    elif(fun_name == 'leaky_relu'): 
        return np.where(x > 0, x, leaky_relu_alpha * x) 

def activation_function_derivative(x,fun_name=ACTIVATION_FUNC):
    if(fun_name == 'relu'): 
        return np.where(x > 0, 1, 0)
    elif(fun_name == 'logsig'): 
        a = activation_function(x)
        a = np.reshape(a, (-1,1))
        b = 1 - activation_function(x)
        b = np.reshape(b, (-1,1))
        b = np.transpose(b)
        return np.diag(np.diag(np.matmul(a,b)))
    elif(fun_name == 'tansig'):
        tansig_x = activation_function(x)
        return 1 - tansig_x**2
    elif(fun_name == 'leaky_relu'):
        return np.where(x > 0, 1, leaky_relu_alpha)

# Lorenz parameters

In [None]:
# Define the number of input, hidden, and output neurons
input_neurons = train_data.shape[1]
l1_neurons = 15
output_neurons = 1  # Linear activation for regression 

# Initialize the weights with random values in range (-1,1)
np.random.seed(1)
w1 = 2 * np.random.random((input_neurons, l1_neurons)) - 1
w2_L = 2 * np.random.random((l1_neurons, output_neurons)) - 1
w2_U = 2 * np.random.random((l1_neurons, output_neurons)) - 1


# Initialize the biases with random values in range (-1,1) 
b1 = 2 * np.random.random(l1_neurons) - 1
b2_L = 2 * np.random.random(output_neurons) - 1
b2_U = 2 * np.random.random(output_neurons) - 1


# Training parameters
learning_rate = 0.005
epochs = 400  # Train sample by sample 

mse_train = np.zeros(epochs)
mse_test = np.zeros(epochs)

alpha = 0.5
beta = 0.5

lambda_reg = 0.00001

# Stock parameters

In [None]:
# # Define the number of input, hidden, and output neurons
# input_neurons = train_data.shape[1]
# l1_neurons = 10
# output_neurons = 1  # Linear activation for regression 

# # Initialize the weights with random values in range (-1,1)
# np.random.seed(1)
# w1 = 2 * np.random.random((input_neurons, l1_neurons)) - 1
# w2_L = 2 * np.random.random((l1_neurons, output_neurons)) - 1
# w2_U = 2 * np.random.random((l1_neurons, output_neurons)) - 1


# # Initialize the biases with random values in range (-1,1) 
# b1 = 2 * np.random.random(l1_neurons) - 1
# b2_L = 2 * np.random.random(output_neurons) - 1
# b2_U = 2 * np.random.random(output_neurons) - 1


# # Training parameters
# learning_rate = 0.003
# epochs = 400  # Train sample by sample 

# mse_train = np.zeros(epochs)
# mse_test = np.zeros(epochs)

# alpha = 0.5
# beta = 0.5

# lambda_reg = 0.000001

In [None]:
# Training the MLP for regression
for epoch in range(epochs):

    error_data_train = np.zeros(len(train_data))
    output_data_train = np.zeros(len(train_data))
    for i in range(len(train_data)):
        
        #-------------------------------- Feed Forward -------------------------------------
        input_layer = train_data[i:i+1] 
        
        net1 = np.dot(input_layer, w1) + b1  # net1 = x * w1 + b1
        o1   = activation_function(net1) #  o1 = f(net1)
        
        net2_L = np.dot(o1, w2_L) + b2_L # net2_L = o1 * w2 + b2_L
        net2_U = np.dot(o1, w2_U) + b2_U # net2_U = o1 * w2 + b2_U
        
        f2_L = activation_function(net2_L)
        f2_U = activation_function(net2_U)
        
        o2_L = min(f2_L,f2_U)
        o2_U = max(f2_L,f2_U)
        
        o2 = alpha * o2_U + beta * o2_L
        output_data_train[i] = o2      
        #-------------------------------- Backpropagation ----------------------------------- 
        output_layer_error = train_output[i:i+1] - o2
        if(f2_L <= f2_U): # => o2_L = f2_L(net2_L) = f2_L(w2_L.T * o1) , o2_U = f2_U(net2_U) = f2_U(w2_U.T * o1)
            
            # update w2_L
            # dE/dw2_L = dE/de * de/do2 * do2/do2_L * do2_L/dnet2_L * dnet2_L/dw2_L = e * -1 * beta * fprim_net2_L *  o1 
            w2_L_old = w2_L
            w2_L = w2_L +  learning_rate * beta * o1.T @ output_layer_error  - lambda_reg * w2_L 
            
            # update w2_U
            # dE/dw2_U = dE/de * de/do2 * do2/do2_U * do2_U/dnet2_U * dnet2_U/dw2_U = e * -1 * alpha * fprim_net2_U *  o1
            w2_U_old = w2_U
            w2_U = w2_U +  learning_rate * alpha * o1.T @ output_layer_error  - lambda_reg * w2_U
            
            # update b2_L
            # dE/db2_L = dE/de * de/do2 * do2/do2_L * do2_L/dnet2_L * dnet2_L/db2_L = e * -1 * beta * fprim_net2_L * 1
            b2_L = b2_L +  learning_rate * beta * output_layer_error  - lambda_reg * b2_L
        
            # update b2_U
            # dE/db2_U = dE/de * de/do2 * do2/do2_U * do2_U/dnet2_U * dnet2_U/db2_U = e * -1 * alpha * fprim_net2_U * 1
            b2_U = b2_U +  learning_rate * alpha * output_layer_error  - lambda_reg * b2_U
            
           
            # update w1
            # dE/dw1 = dE/de * de/do2 * (do2/do2_L * do2_L/dnet2_L * dnet2_L/do1 + do2/do2_U * do2_U/dnet2_U * dnet2_U/do1) * do1/dnet1 * dnet1/dw1
            # = e * -1 * (beta * fprim_net2_L * w2_L + alpha * fprim_net2_U * w2_U) * fprim_net1 * input_layer
            
            # fprim_net2_L = 1 , fprim_net2_U = 1
            # Create a diagonal matrix
            fprim_net1 = np.array(activation_function_derivative(net1))[0] 
            diag_matrix_fprim_net1 = np.diag(fprim_net1)
            w1 = w1 + learning_rate * input_layer.T @ output_layer_error * (beta * w2_L_old  + alpha * w2_U_old).T @ diag_matrix_fprim_net1  - lambda_reg * w1

            
            # update b1
            # dE/db1 = dE/de * de/do2 * (do2/do2_L * do2_L/dnet2_L * dnet2_L/do1 + do2/do2_U * do2_U/dnet2_U * dnet2_U/do1) * do1/dnet1 * dnet1/db1
            # = e * -1 * (beta * fprim_net2_L * w2_L + alpha * fprim_net2_U * w2_U) * fprim_net1 * 1
            b1 = b1 + learning_rate * output_layer_error @ (beta * w2_L_old  + alpha * w2_U_old).T @ diag_matrix_fprim_net1  - lambda_reg * b1

        
        
        
        elif(f2_L > f2_U): # => o2_L = f2_U(net2_U) = f2_U(w2_U.T * o1) , o2_U = f2_L(net2_L) = f2_L(w2_L.T * o1)
            
            # update w2_L
            # dE/dw2_L = dE/de * de/do2 * do2/do2_U * do2_U/dnet2_L * dnet2_L/dw2_L = e * -1 * alpha * fprim_net2_L *  o1 
            w2_L_old = w2_L
            w2_L = w2_L +  learning_rate * alpha * o1.T @ output_layer_error  - lambda_reg * w2_L
            
            # update w2_U
            # dE/dw2_U = dE/de * de/do2 * do2/do2_L * do2_L/dnet2_U * dnet2_U/dw2_U = e * -1 * beta * fprim_net2_U *  o1
            w2_U_old = w2_U
            w2_U = w2_U +  learning_rate * beta * o1.T @ output_layer_error  - lambda_reg * w2_U
            
            # update b2_L
            # dE/db2_L = dE/de * de/do2 * do2/do2_U * do2_U/dnet2_L * dnet2_L/db2_L = e * -1 * alpha * fprim_net2_L * 1
            b2_L = b2_L +  learning_rate * alpha * output_layer_error  - lambda_reg * b2_L
        
            # update b2_U
            # dE/db2_U = dE/de * de/do2 * do2/do2_L * do2_L/dnet2_U * dnet2_U/db2_U = e * -1 * beta * fprim_net2_U * 1
            b2_U = b2_U +  learning_rate * beta * output_layer_error - lambda_reg * b2_U
            
           
            # update w1
            # dE/dw1 = dE/de * de/do2 * (do2/do2_U * do2_U/dnet2_L * dnet2_L/do1 + do2/do2_L * do2_L/dnet2_U * dnet2_U/do1) * do1/dnet1 * dnet1/dw1
            # = e * -1 * (alpha * fprim_net2_L * w2_L + beta * fprim_net2_U * w2_U) * fprim_net1 * input_layer
            
            # fprim_net2_L = 1 , fprim_net2_U = 1
            # Create a diagonal matrix
            fprim_net1 = np.array(activation_function_derivative(net1))[0] 
            diag_matrix_fprim_net1 = np.diag(fprim_net1)
#             print('input_layer: ',input_layer.shape)
#             print('w2_L_old: ', w2_L_old.shape)
#             print('w2_U_old: ', w2_U_old.shape)
#             print('diag_matrix_fprim_net1: ', diag_matrix_fprim_net1.shape)
#             print('w1: ', w1.shape)
#             print(input_layer.T.shape,output_layer_error.shape,(w2_L_old + w2_U_old).T.shape,diag_matrix_fprim_net1.shape)
#             dw1 = -1 * input_layer.T * output_layer_error * (alpha * w2_L_old  + beta * w2_U_old).T @ diag_matrix_fprim_net1
            w1 = w1 + learning_rate * input_layer.T @ output_layer_error * (alpha * w2_L_old  + beta * w2_U_old).T @ diag_matrix_fprim_net1  - lambda_reg * w1

            
            # update b1
            # dE/db1 = dE/de * de/do2 * (do2/do2_U * do2_U/dnet2_L * dnet2_L/do1 + do2/do2_L * do2_L/dnet2_U * dnet2_U/do1) * do1/dnet1 * dnet1/db1
            # = e * -1 * (beta * fprim_net2_L * w2_L + alpha * fprim_net2_U * w2_U) * fprim_net1 * 1
            b1 = b1 + learning_rate * output_layer_error @ (alpha * w2_L_old  + beta * w2_U_old).T @ diag_matrix_fprim_net1  - lambda_reg * b1

              
        error_data_train[i] = output_layer_error
    
    mse_train[epoch] = np.mean(error_data_train ** 2)
    
    # Testing the trained MLP for regression 
    error_data_test = np.zeros(len(test_data))
    output_data_test = np.zeros(len(test_data))
    for i in range(len(test_data)):
        input_layer = test_data[i:i+1] 
        
        net1 = np.dot(input_layer, w1) + b1  # net1 = x * w1 + b1
        o1   = activation_function(net1) #  o1 = f(net1)
        
        net2_L = np.dot(o1, w2_L) + b2_L # net2_L = o1 * w2 + b2_L
        net2_U = np.dot(o1, w2_U) + b2_U # net2_U = o1 * w2 + b2_U
        
        f2_L = activation_function(net2_L)
        f2_U = activation_function(net2_U)
        
        o2_L = min(f2_L,f2_U)
        o2_U = max(f2_L,f2_U)
        
        o2 = alpha * o2_U + beta * o2_L
#         print(o2)
        output_data_test[i] = o2
        error_data_test[i] = test_output[i] - o2
    
    mse_test[epoch] = np.mean(error_data_test ** 2)
        
    # Plotting the training data and output
    plt.figure(figsize=(20, 8))
    plt.subplot(2, 2, 1)
    plt.plot(train_output)
    plt.plot(output_data_train, 'r', linewidth=0.5)
    plt.xlabel('Train Data')
    plt.ylabel('Output')
    plt.legend(['Actual', 'Predicted'])

    # Plotting the training MSE
    plt.subplot(2, 2, 2)
    plt.semilogy(np.arange(1, epoch + 1), mse_train[:epoch])
    plt.xlabel('Epoch')
    plt.ylabel('MSE Train') 
    
    print('Epoch: {} \t'.format(epoch+1)) 
    print('MSE_train: {:.4f}'.format(mse_train[epoch]))
    
    plt.tight_layout()
    plt.show()
      
    print("\n\033[1;m" + "*" * 125)
         
        
print("****************************** Training completed *******************************")


In [None]:
# Plotting the training data and output
plt.figure(figsize=(20, 8))
plt.subplot(2, 2, 1)
plt.plot(train_output)
plt.plot(output_data_train, 'r', linewidth=0.5)
plt.xlabel('Train Data')
plt.ylabel('Output')
plt.legend(['Actual', 'Predicted'])

# Plotting the training MSE 
plt.subplot(2, 2, 2)
plt.semilogy(np.arange(1, epoch + 1), mse_train[:epoch])
plt.xlabel('Epoch')
plt.ylabel('MSE Train') 


# Plotting the test data and output
plt.figure(figsize=(20, 8))
plt.subplot(2, 2, 1)
plt.plot(test_output)
plt.plot(output_data_test+0.1, 'r', linewidth=0.5)
plt.xlabel('Test Data')
plt.ylabel('Output')
plt.legend(['Actual', 'Predicted'])

# Plotting the test MSE
plt.subplot(2, 2, 2)
plt.semilogy(np.arange(1, epoch + 1), mse_test[:epoch])
plt.xlabel('Epoch')
plt.ylabel('MSE Test')  


print('MSE_train: ',mse_train[epoch])
print('MSE_test: ',mse_test[epoch]) 

In [None]:
plt.figure(2)
m_train , b_train = np.polyfit(train_output, output_data_train, 1)    
plt.scatter(train_output, output_data_train,facecolors='none',edgecolors='#104E8B')
plt.plot(train_output, m_train*train_output+b_train,'r') 
plt.title('Regression Train') 

plt.figure(3)
m_test , b_test = np.polyfit(test_output, output_data_test, 1)  
plt.scatter(test_output, output_data_test,facecolors='none',edgecolors='#104E8B')
plt.plot(test_output, m_test*test_output+b_test,'r')
plt.title('Regression Test')
 
plt.tight_layout()
plt.show()

mse_train_result = mse_train[-1]
mse_test_result = mse_test[-1]

print("Final MSE on Train Data:", mse_train_result)
print("Final MSE on Test Data:", mse_test_result)