In [1]:
#pip install ucimlrepo

# Packages
import pandas as pd
from ucimlrepo import fetch_ucirepo
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random

URV                                                                            MESIIA

Neural and Evolutionary Computation (NEC)
Assignment 1: Prediction with Back-Propagation and Linear Regression

Teachers: Dr. Jordi Duch, Dr. Sergio Gomez
Student: Natzaret Gálvez Rísquez

Part 1: Selecting and analyzing the datasets

We perform the predictions on  three datasets: 

In [2]:
# We upload the datasets

# First dataset: File: A1-turbine.txt
    # 5 features: the first 4 are the input variables, the last one is the value to predict
    # 451 patterns: use the first 85% for training and validation, and the remaining 15% for test
df_turbine=pd.read_csv('C:/Users/Gari/Desktop/NEC/A1-turbine.txt', sep='\t', header=None)
header_vector_turbine = df_turbine.iloc[0, :].tolist() #header
df_turbine=df_turbine.iloc[1:,:]
df_turbine=pd.DataFrame(df_turbine)

# Second dataset: File: A1-synthetic.txt
    # 10 features: the first 9 are the input variables, the last one is the value to predict
    # 1000 patterns: use the first 80% for training and validation, and the remaining 20% for test
df_synthetic=pd.read_csv('C:/Users/Gari/Desktop/NEC/A1-synthetic.txt', sep='\t', header=None)
header_vector_synthetic = df_synthetic.iloc[0, :].tolist() #header
df_synthetic=df_synthetic.iloc[1:,:]
df_synthetic=pd.DataFrame(df_synthetic)

# Third dataset: from "https://archive.ics.uci.edu/dataset/186/wine+quality"
    # At least 6 features, one of them used for prediction
    # The prediction variable must take real (float or double) values; it should not represent a categorical value (that would correspond to a classification task)
    # At least 400 patterns
    # Select randomly 80% of the patterns for training and validation, and the remaining 20% for test; it is important to shuffle the original data, to destroy any kind of sorting it could have

# Wine Quality dataset [6496 rows x 11 columns]
# fetch dataset 
wine_quality = fetch_ucirepo(id=186) 
  
# data (as pandas dataframes) 
df_wineQuality = wine_quality.data.features 
y = wine_quality.data.targets #quality of wine, an integer
  
# metadata 
#print(wine_quality.metadata) 
# variable information 
#print(wine_quality.variables) 

header_vector_wineQuality = df_wineQuality.columns.tolist() #header

In [3]:
# As we can observe by the following header of the wine quality, alcohol level is the last feature
# We will use it as the value to predict
print(header_vector_wineQuality)

['fixed_acidity', 'volatile_acidity', 'citric_acid', 'residual_sugar', 'chlorides', 'free_sulfur_dioxide', 'total_sulfur_dioxide', 'density', 'pH', 'sulphates', 'alcohol']


Now, we will do the data preprocessing to later do the data splitting.

In [4]:
# Handling missing values, we check for and handle any missing values in our datasets
# Categorical values, if there are categorical variables, we encode them appropriately
# Outliers, we identify and handle the outliers in the data
# Normalization, in case is needed

# Data Preprocessing for Dataset 1 and 2
# - Normalize input and output variables
# - No need to preprocess (datasets already cleaned)

# Data Preprocessing for Dataset 3
# - Link to the source webpage to the documentation: "https://archive.ics.uci.edu/dataset/186/wine+quality"
# - Check for missing values, represent categorical values, look for outliers
# - Normalize input/output variables if needed

In [5]:
##Turbine dataset
X_turbine = df_turbine.iloc[:, :-1]  # Features (all columns except the last one)
y_turbine = df_turbine.iloc[:, -1]   # Target variable (last column)

scaler_turbine = MinMaxScaler()
X_turbine_normalized = scaler_turbine.fit_transform(X_turbine)
#y_turbine_normalized = scaler_turbine.fit_transform(y_turbine.values.reshape(-1, 1))
# Because the prediction column has all NaN values, it is not necessary to reshape

In [6]:
##Synthetic dataset
X_synthetic = df_synthetic.iloc[:, :-1]
y_synthetic = df_synthetic.iloc[:, -1]

# Normalize input and output variables
scaler_synthetic = MinMaxScaler()
X_synthetic_normalized = scaler_synthetic.fit_transform(X_synthetic)
y_synthetic_normalized = scaler_synthetic.fit_transform(y_synthetic.values.reshape(-1, 1))

In [7]:
##Wine Quality dataset
#By the owners we know that this dataset has not missing values, we can check by:
missing_values_count = df_wineQuality.isnull().sum().sum()
print(f"Number of missing values in Wine Quality dataset: {missing_values_count}")

Number of missing values in Wine Quality dataset: 0


In [8]:
##Wine Quality dataset
# No categorical variables in this dataset
# Identify and handle outliers using IQR method
def handle_outliers_iqr(data, threshold=1.5):
    data_copy = data.copy()  # Create a copy to avoid SettingWithCopyWarning
    Q1 = data_copy.quantile(0.25)
    Q3 = data_copy.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - threshold * IQR
    upper_bound = Q3 + threshold * IQR
    data_copy[(data_copy < lower_bound) | (data_copy > upper_bound)] = np.nan
    return data_copy

# Handle outliers in all feature variables (columns) of df_wineQuality
df_wineQuality_no_outliers = handle_outliers_iqr(df_wineQuality)

#Shuffle
df_wineQuality_shuffled = df_wineQuality_no_outliers.sample(frac=1, random_state=42)

X_wineQuality = df_wineQuality_shuffled.iloc[:, :-1]
y_wineQuality = df_wineQuality_shuffled.iloc[:, -1]

# Normalize input and output variables
scaler_wineQuality = StandardScaler()
X_wineQuality_normalized_no_outliers = scaler_wineQuality.fit_transform(X_wineQuality)
y_wineQuality_normalized_no_outliers = scaler_wineQuality.fit_transform(y_wineQuality.values.reshape(-1, 1))

Now, we divide the datasets into validation & training and test.

In [9]:
#First dataset, turbine
# Split the data into validation-training and testing sets
# Extract the first 85% for training
# Extract the remaining 15% for testing
# Splitting Turbine dataset
X_train_turbine, X_test_turbine, y_train_turbine, y_test_turbine = train_test_split(
    X_turbine_normalized, y_turbine, test_size=0.15, random_state=42
)

#Second dataset, synthetic
X_train_synthetic, X_test_synthetic, y_train_synthetic, y_test_synthetic = train_test_split(
    X_synthetic_normalized, y_synthetic_normalized, test_size=0.2, random_state=42
)

#Third dataset, wineQuality
X_train_wineQuality, X_test_wineQuality, y_train_wineQuality, y_test_wineQuality = train_test_split(
    X_wineQuality_normalized_no_outliers,
    y_wineQuality_normalized_no_outliers,
    test_size=0.2,
    random_state=42,
)

# Print the sizes of the datasets
#print("Total data size:", len(df_wineQuality))
#print("Training data size:", len(df_wineQualityTrainingValidation))
#print("Test data size:", len(df_wineQualityTesting))

Part 2: Implementation of BP

In [30]:
class MyNeuralNetwork:
    def __init__(self, layers, epochs=100, learning_rate=0.01, momentum=0.9, fact='sigmoid', validation_split=0.2):
        self.L = len(layers)    # L: number of layers
        self.n = layers.copy()  # n: an array with the number of units in each layer (including the input and output layers) (number of neurons in each layer)
        self.h = [np.zeros(l) for l in self.n[1:]] # h: an array of arrays for the fields (h)
        self.xi = []            # xi: an array of arrays for the activations (ξ) (node values), an array of length as the number of layers (one component per layer) 
                                # and each position of the array of xi, for example xi[1], array of real numbers of length of the units in the first layer and so on
        for lay in range(self.L):
            self.xi.append(np.random.randn(layers[lay]))
        self.w = []             # w: an array of matrices for the weights (w) (edge weights)
                                # an array where w[1] is not used, and the array in w[2] will be (layer[2])x(layer[1]) and so on
        self.w.append(np.zeros((1, 1)))
        for lay in range(1, self.L):
            self.w.append(np.random.randn(layers[lay], layers[lay - 1]))

        self.theta = [np.zeros(l) for l in layers[1:]] # theta: an array of arrays for the thresholds (θ)

        self.delta = [np.zeros(l) for l in layers[1:]] # delta: an array of arrays for the propagation of errors (Δ)
        self.d_w = [np.zeros((layers[i], layers[i-1])) for i in range(1, self.L)] # d_w: an array of matrices for the changes of the weights (δw)
        self.d_theta = [np.zeros(l) for l in layers[1:]] # d_theta: an array of arrays for the changes of the weights (δθ)
        self.d_w_prev = [np.zeros((layers[i], layers[i-1])) for i in range(1, self.L)] # d_w_prev: an array of matrices for the previous changes of the weights, used for the momentum term (δw(prev))
        self.d_theta_prev = [np.zeros(l) for l in layers[1:]] # d_theta_prev: an array of arrays for the previous changes of the thresholds, used for the momentum term (δθ(prev))
        self.activation_function = fact # fact: the name of the activation function that it will be used. It can be one of these four: sigmoid, relu, linear, tanh.

        self.learning_rate = learning_rate
        self.momentum = momentum
        self.training_errors = []
        self.validation_errors = []

        self.epochs = epochs
        self.validation_split = validation_split

        # Initialize validation data to None
        self.validation_data = None

    def activation(self, x):
        if self.activation_function == 'sigmoid':
            return 1 / (1 + np.exp(-x))
        elif self.activation_function == 'relu':
            return np.maximum(0, x)
        elif self.activation_function == 'linear':
            return x
        elif self.activation_function == 'tanh':
            return np.tanh(x)
        else:
            raise ValueError("Invalid activation function")

    def activation_derivative(self, x):
        if self.activation_function == 'sigmoid':
            return x * (1 - x)
        elif self.activation_function == 'relu':
            return np.where(x > 0, 1, 0)
        elif self.activation_function == 'linear':
            return np.ones_like(x)
        elif self.activation_function == 'tanh':
            return 1 - np.tanh(x)**2
        else:
            raise ValueError("Invalid activation function")





    def feed_forward(self, x):
        self.xi[0] = x
        for layer in range(1, self.L):
            # Use np.dot with proper dimensions
            self.h[layer - 1] = np.dot(self.w[layer - 1], self.xi[layer - 1]) - self.theta[layer - 1]
            self.xi[layer] = self.activation(self.h[layer-1])

    def back_propagation(self, y):
        self.delta[-1] = self.activation_derivative(self.xi[-1]) * (y - self.xi[-1])
        for layer in range(self.L-2, 0, -1):
            self.delta[layer-1] = self.activation_derivative(self.xi[layer]) * np.dot(self.w[layer].T, self.delta[layer])

        for layer in range(self.L-1, 0, -1):
            # Access weights using w[layer][i, j]
            self.d_w[layer-1] = self.learning_rate * np.outer(self.delta[layer-1], self.xi[layer-1]) + self.momentum * self.d_w_prev[layer-1]
            self.d_theta[layer-1] = -self.learning_rate * self.delta[layer-1] + self.momentum * self.d_theta_prev[layer-1]

            self.w[layer-1] += self.d_w[layer-1]
            self.theta[layer-1] += self.d_theta[layer-1]

            self.d_w_prev[layer-1] = self.d_w[layer-1]
            self.d_theta_prev[layer-1] = self.d_theta[layer-1]

    def fit(self, X, y): # X of size (n_samples, n_features) which holds the training samples represented as floating point feature vectors; 
                         # and a vector y of size (n_samples), which holds the target values (class labels) for the training samples
        # First, we split the data into training and validation sets
        if self.validation_split > 0:
            X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=self.validation_split, random_state=42)
            self.validation_data = (X_val, y_val)
        else:
            self.validation_data = None
            X_train, y_train = X, y  # Use the full dataset for training
        
        # Implementation of fit function
        for epoch in range(self.epochs):
            for i in range(len(X_train)):
                self.feed_forward(X_train[i])
                self.back_propagation(y_train[i])

        # Calculate training error
        training_error = 0.5 * np.mean((y_train - self.predict(X_train)) ** 2)
        self.training_errors.append(training_error)

        # Calculate validation error if validation data is provided
        if self.validation_data: # Checks if self.validation_data is not None or empty
            X_val, y_val = self.validation_data
            validation_error = 0.5 * np.mean((y_val - self.predict(X_val)) ** 2)
            self.validation_errors.append(validation_error)
        else : # Give output in case that there is no validation data
            validation_error = None
            self.validation_errors.append(validation_error)

    def predict(self, X): # an array X of size (n_samples, n_features) that contains the samples
        # Implementation of predict function
        predictions = []
        for i in range(len(X)):
            self.feed_forward(X[i])
            predictions.append(self.xi[-1].copy())
        return np.array(predictions) #  vector with the predicted values for all the input samples

    def loss_epochs(self):
        return np.array(self.training_errors), np.array(self.validation_errors) # 2 arrays of size (n_epochs, 2) that contain the evolution of 
                                                                                # the training error and the validation error for each of the epochs of the system


In [31]:
# Example network with 3 layers: 10 input nodes, 9 hidden nodes, and 1 output node
layers = [10, 9, 5, 1]
neural_network = MyNeuralNetwork(layers=layers, epochs=100, learning_rate=0.01, momentum=0.9, fact='sigmoid', validation_split=0.2)
neural_network.fit(X_train_synthetic, y_train_synthetic)

ValueError: shapes (1,1) and (9,) not aligned: 1 (dim 1) != 9 (dim 0)

In [None]:
training_errors, validation_errors = neural_network.loss_epochs()
plt.plot(training_errors, label='Training Error')
plt.plot(validation_errors, label='Validation Error')
plt.legend()
plt.xlabel('Epochs')
plt.ylabel('Error')
plt.show()

In [None]:
# Make predictions on the test data
predictions_turbine_test = neural_network.predict(X_test_turbine)

Part 3: Obtaining and comparing predictions using the three models (BP, BP-F, MLR-F)

Part 3.1: Parameter comparison and selection

Part 3.2: Model result comparison