# **RADIAL BASIS FUNCTION ANN IMPLEMENTATION**
**Dataset:** Airfoil Self-Noise Data Set (UCI)
--

***Spyrakis Angelos***, *ECE AUTh (9352)*

---
Table of Contents:

**1. K-Nearest-Neighbors**

**2. Radial Basis Function ANN**
>
>2.1. Fine-tuning RBF model using Keras tuner
>
>2.2. Fine-tuned RBF model
>

**3. RBF ANN With Perceptron Layer**
>
>3.1. Fine-tuning RBF+P model using Keras tuner
>
>3.2. Fine-tuned RBF+P model
>

---
Execute the following cells to download the data set and the keras tuner.

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00291/airfoil_self_noise.dat

In [None]:
pip install -q -U keras-tuner

# 1.K-Nearest-Neighbors

In [None]:
import pandas as pd
import numpy as np
import time
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsRegressor, NearestCentroid


def load_airfoil_dataset():
    """Load the Airfoil Self-Noise dataset from the current directory
    """
    data = pd.read_csv('./airfoil_self_noise.dat', sep='\t', header=None)

    X = data.loc[:,0:4]
    y = data.loc[:,5]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=10)

    # Feature normalization
    mean = X_train.mean(axis=0)
    X_train -= mean
    X_test -= mean

    std = X_train.std(axis=0)
    X_train /= std
    X_test /= std

    return X_train, X_test, y_train, y_test


def knn(k, X_train, X_test, y_train, y_test):
    """K-nearest neighbor implementation
    """
    print('Executing KNN with k = %d...\n' % (k))
    start_time = time.time()

    knn_regressor = KNeighborsRegressor(n_neighbors=k, weights='uniform', algorithm='auto')
    knn_regressor.fit(X_train, y_train)

    y_predicted = knn_regressor.predict(X_test)
    elapsed_time = time.time() - start_time

    rmse = mean_squared_error(y_test, y_predicted, squared=False)
    r2 = r2_score(y_test, y_predicted)
    
    print('KNN RMSE = %f' % rmse)
    print('KNN R2 = %f' % r2)
    print('Elapsed time: %.2f seconds.\n' % elapsed_time)


def main():
    X_train, X_test, y_train, y_test = load_airfoil_dataset()

    #KNN implementation
    knn(2, X_train, X_test, y_train, y_test)


if __name__ == "__main__":
    main()

# 2.Radial Basis Function ANN

##2.1. Fine-tuning RBF model using Keras tuner

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner as kt
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from keras import backend as K
from tensorflow.keras.layers import Layer
from keras.initializers import Initializer, Constant 
from sklearn.cluster import KMeans


def load_airfoil_dataset():
    """Load the Airfoil Self-Noise dataset from the current directory
    """
    data = pd.read_csv('./airfoil_self_noise.dat', sep='\t', header=None)

    X = data.loc[:,0:4]
    y = data.loc[:,5]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=10)

    # Feature normalization
    mean = X_train.mean(axis=0)
    X_train -= mean
    X_test -= mean

    std = X_train.std(axis=0)
    X_train /= std
    X_test /= std

    return X_train, X_test, y_train, y_test


class RBFLayer(Layer):

    def __init__(self, output_dim, centers_initializer, betas_trainable=False, **kwargs):
        self.output_dim = output_dim    # number of neurons in hidden layer
        self.init_betas = np.ones(self.output_dim)
        self.betas_trainable = betas_trainable
        self.centers_initializer = centers_initializer
        super(RBFLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.centers = self.add_weight(name='centers',
                                       shape=(self.output_dim, input_shape[1]),
                                       initializer=self.centers_initializer,
                                       trainable=False)
        
        if self.betas_trainable==True:
            self.betas = self.add_weight(name='betas',
                                        shape=(self.output_dim,),
                                        initializer=Constant(value=self.init_betas),
                                        trainable=True)
        else:
            # find max distance between any two centroids
            dmax = 0
            for i in range(0, self.output_dim):
                for j in range(0, self.output_dim):
                    if tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j]))) > dmax:
                        dmax = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j])))
            
            beta = self.output_dim/(dmax**2)
            self.betas = self.init_betas*beta

        super(RBFLayer, self).build(input_shape)

    def call(self, x):
        C = self.centers[np.newaxis, :, :]
        X = x[:, np.newaxis, :]

        diffnorm = K.sum((C-X)**2, axis=-1)
        ret = K.exp( - self.betas * diffnorm)  # this is the RB function
        return ret

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)


class KMeansInit(Initializer):

    def __init__(self, X, max_iter=400):
        self.X = X
        self.max_iter = max_iter

    def __call__(self, shape, dtype=None):
        assert shape[1] == self.X.shape[1]

        n_centers = shape[0]
        km = KMeans(n_clusters=n_centers, max_iter=self.max_iter, verbose=0)
        km.fit(self.X)
        return km.cluster_centers_


def RMSE(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))


def MSE(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true))


def R_squared(y_true, y_pred):
    SS_res =  K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return ( 1 - SS_res/(SS_tot + K.epsilon()) ) # K.epsilon()=1E-8 avoids division with zero


class myRBFModel(kt.HyperModel):

    def __init__(self, X_train):
        self.X_train = X_train
    
    def build(self, hp):
        model = Sequential()

        hp_rbf_neurons = hp.Choice('rbf_neurons', values=[int(0.1*self.X_train.shape[0]), int(0.2*self.X_train.shape[0]), 
                                    int(0.3*self.X_train.shape[0]), int(0.5*self.X_train.shape[0]), int(0.9*self.X_train.shape[0])])
        model.add(RBFLayer(hp_rbf_neurons, centers_initializer=KMeansInit(self.X_train), input_shape=(5,)))

        model.add(Dense(1))

        hp_lr = hp.Choice('learning_rate', values=[0.1, 0.01, 0.001])
        opt = tf.keras.optimizers.SGD(learning_rate=hp_lr)

        model.compile(loss = MSE, metrics=[RMSE, R_squared], optimizer=opt)

        return model


def main():
    X_train, X_test, y_train, y_test = load_airfoil_dataset()
    
    rbf_model = myRBFModel(X_train = X_train)

    # Set up tuner
    tuner = kt.RandomSearch(rbf_model, objective=kt.Objective('val_R_squared', direction='max'), max_trials=15, executions_per_trial=5, overwrite=True)
    tuner.search(X_train, y_train, epochs=100, validation_split=0.2, batch_size=15)

    # Get the optimal hyperparameters
    best_hps=tuner.get_best_hyperparameters(1)[0]

    print(f"""
    The hyperparameter search is complete. The optimal number of neurons for the RBF layer is {best_hps.get('rbf_neurons')}, 
    and the optimal learning rate is {best_hps.get('learning_rate')}.
    """)


if __name__ == "__main__":
    main()

##2.2. Fine-tuned RBF model

In [None]:
import numpy as np
import time
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from keras import backend as K
from tensorflow.keras.layers import Layer
from keras.initializers import Initializer, Constant 
from sklearn.cluster import KMeans


def load_airfoil_dataset():
    """Load the Airfoil Self-Noise dataset from the current directory
    """
    data = pd.read_csv('./airfoil_self_noise.dat', sep='\t', header=None)

    X = data.loc[:,0:4]
    y = data.loc[:,5]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=10)

    # Feature normalization
    mean = X_train.mean(axis=0)
    X_train -= mean
    X_test -= mean

    std = X_train.std(axis=0)
    X_train /= std
    X_test /= std

    return X_train, X_test, y_train, y_test


class RBFLayer(Layer):

    def __init__(self, output_dim, centers_initializer, betas_trainable=False, **kwargs):
        self.output_dim = output_dim    # number of neurons in hidden layer
        self.init_betas = np.ones(self.output_dim)
        self.betas_trainable = betas_trainable
        self.centers_initializer = centers_initializer
        super(RBFLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.centers = self.add_weight(name='centers',
                                       shape=(self.output_dim, input_shape[1]),
                                       initializer=self.centers_initializer,
                                       trainable=False)
        
        if self.betas_trainable==True:
            self.betas = self.add_weight(name='betas',
                                        shape=(self.output_dim,),
                                        initializer=Constant(value=self.init_betas),
                                        trainable=True)
        else:
            # find max distance between any two centroids
            dmax = 0
            for i in range(0, self.output_dim):
                for j in range(0, self.output_dim):
                    if tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j]))) > dmax:
                        dmax = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j])))
            
            beta = self.output_dim/(dmax**2)
            self.betas = self.init_betas*beta

        super(RBFLayer, self).build(input_shape)

    def call(self, x):
        C = self.centers[np.newaxis, :, :]
        X = x[:, np.newaxis, :]

        diffnorm = K.sum((C-X)**2, axis=-1)
        ret = K.exp( - self.betas * diffnorm)  # this is the RB function
        return ret

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)


class KMeansInit(Initializer):

    def __init__(self, X, max_iter=400):
        self.X = X
        self.max_iter = max_iter

    def __call__(self, shape, dtype=None):
        assert shape[1] == self.X.shape[1]

        n_centers = shape[0]
        km = KMeans(n_clusters=n_centers, max_iter=self.max_iter, verbose=0)
        km.fit(self.X)
        return km.cluster_centers_


def RMSE(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))


def MSE(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true))


def R_squared(y_true, y_pred):
    SS_res =  K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return ( 1 - SS_res/(SS_tot + K.epsilon()) ) # K.epsilon()=1E-8 avoids division with zero


def build_model(X_train, y_train):
    model = Sequential()

    n_neurons = int(0.5*X_train.shape[0])
    model.add(RBFLayer(n_neurons, centers_initializer=KMeansInit(X_train), input_shape=(5,)))

    model.add(Dense(1))

    opt = tf.keras.optimizers.SGD(learning_rate=0.1)
    model.compile(loss = MSE, metrics=[RMSE, R_squared], optimizer=opt)
    history = model.fit(X_train, y_train, batch_size=15, epochs=100, verbose=1, validation_split=0.2)

    return model, history


def plot_curves(history):
    # Plot loss

    plt.plot(history.history['loss'][2:history.params['epochs']])
    plt.plot(history.history['val_loss'][2:history.params['epochs']])
    plt.title('Model Loss')
    plt.ylabel('MSE')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()


def main():
    X_train, X_test, y_train, y_test = load_airfoil_dataset()

    start_time = time.time()
    model, history = build_model(X_train, y_train)
    print(f'Training time = {(time.time()-start_time):.2f} seconds.')

    start_time = time.time()
    model.evaluate(X_test, y_test)
    print(f'Testing time = {(time.time()-start_time):.2f} seconds.')

    plot_curves(history)
    

if __name__ == "__main__":
    main()

# 3.RBF ANN With Perceptron Layer

##3.1. Fine-tuning RBF+P model using Keras tuner

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner as kt
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from keras import backend as K
from tensorflow.keras.layers import Layer
from keras.initializers import Initializer, Constant
from sklearn.cluster import KMeans


def load_airfoil_dataset():
    """Load the Airfoil Self-Noise dataset from the current directory
    """
    data = pd.read_csv('./airfoil_self_noise.dat', sep='\t', header=None)

    X = data.loc[:, 0:4]
    y = data.loc[:, 5]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=10)

    # Feature normalization
    mean = X_train.mean(axis=0)
    X_train -= mean
    X_test -= mean

    std = X_train.std(axis=0)
    X_train /= std
    X_test /= std

    return X_train, X_test, y_train, y_test


class RBFLayer(Layer):

    def __init__(self, output_dim, centers_initializer, betas_trainable=False, **kwargs):
        self.output_dim = output_dim  # number of neurons in hidden layer
        self.init_betas = np.ones(self.output_dim)
        self.betas_trainable = betas_trainable
        self.centers_initializer = centers_initializer
        super(RBFLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.centers = self.add_weight(name='centers',
                                       shape=(self.output_dim, input_shape[1]),
                                       initializer=self.centers_initializer,
                                       trainable=False)

        if self.betas_trainable == True:
            self.betas = self.add_weight(name='betas',
                                         shape=(self.output_dim,),
                                         initializer=Constant(value=self.init_betas),
                                         trainable=True)
        else:
            # find max distance between any two centroids
            dmax = 0
            for i in range(0, self.output_dim):
                for j in range(0, self.output_dim):
                    if tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i] - self.centers[j]))) > dmax:
                        dmax = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i] - self.centers[j])))

            beta = self.output_dim / (dmax ** 2)
            self.betas = self.init_betas * beta

        super(RBFLayer, self).build(input_shape)

    def call(self, x):
        C = self.centers[np.newaxis, :, :]
        X = x[:, np.newaxis, :]

        diffnorm = K.sum((C - X) ** 2, axis=-1)
        ret = K.exp(- self.betas * diffnorm)  # this is the RB function
        return ret

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)


class KMeansInit(Initializer):

    def __init__(self, X, max_iter=400):
        self.X = X
        self.max_iter = max_iter

    def __call__(self, shape, dtype=None):
        assert shape[1] == self.X.shape[1]

        n_centers = shape[0]
        km = KMeans(n_clusters=n_centers, max_iter=self.max_iter, verbose=0)
        km.fit(self.X)
        return km.cluster_centers_


def RMSE(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))


def MSE(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true))


def R_squared(y_true, y_pred):
    SS_res = K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))

    # Avoid Keras-Tuner selecting NaN value as best
    r2_val = 1 - SS_res / (SS_tot + K.epsilon())
    if tf.math.is_nan(r2_val):
        r2_val = tf.constant(0, dtype=tf.float32)

    return r2_val  # K.epsilon()=1E-8 avoids division with zero


class myRBFModel(kt.HyperModel):

    def __init__(self, X_train):
        self.X_train = X_train

    def build(self, hp):
        model = Sequential()

        hp_rbf_neurons = hp.Choice('rbf_neurons',
                                   values=[int(0.2 * self.X_train.shape[0]), int(0.3 * self.X_train.shape[0]),
                                            int(0.4 * self.X_train.shape[0]), int(0.5 * self.X_train.shape[0]),
                                            int(0.6 * self.X_train.shape[0])])
        model.add(RBFLayer(hp_rbf_neurons, centers_initializer=KMeansInit(self.X_train), input_shape=(5,)))

        hp_perceptron_n = hp.Choice('perceptron_neurons', values=[128, 256, 512])
        model.add(Dense(hp_perceptron_n))

        model.add(Dense(1))

        hp_lr = hp.Choice('learning_rate', values=[0.1, 0.01, 0.001])

        hp_opt = hp.Choice('optimizer', values=['adam', 'sgd'])

        if hp_opt == 'adam':
            with hp.conditional_scope('optimizer', ['adam']):
                opt = tf.keras.optimizers.Adam(learning_rate=hp_lr)
                model.compile(loss=MSE, metrics=[RMSE, R_squared], optimizer=opt)
        else:
            with hp.conditional_scope('optimizer', ['sgd']):
                opt = tf.keras.optimizers.SGD(learning_rate=hp_lr)
                model.compile(loss=MSE, metrics=[RMSE, R_squared], optimizer=opt)

        return model


def main():
    X_train, X_test, y_train, y_test = load_airfoil_dataset()

    rbf_model = myRBFModel(X_train=X_train)

    # Set up tuner
    tuner = kt.RandomSearch(rbf_model, objective=kt.Objective('val_R_squared', direction='max'), max_trials=90,
                            executions_per_trial=5, overwrite=True)

    stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_R_squared', mode='max', patience=100)
    nan_stop = tf.keras.callbacks.TerminateOnNaN()
    tuner.search(X_train, y_train, epochs=200, validation_split=0.2, batch_size=15, callbacks=[nan_stop, stop_early])

    # Get the optimal hyperparameters
    best_hps = tuner.get_best_hyperparameters(1)[0]

    print(f"""
    The hyperparameter search is complete. The optimal number of neurons for the RBF layer is {best_hps.get('rbf_neurons')}, 
    for the perceptron layer {best_hps.get('perceptron_neurons')} and the optimal learning rate is {best_hps.get('learning_rate')}.
    The optimizer selected is {best_hps.get('optimizer')}.
    """)


if __name__ == "__main__":
    main()

##3.2. Fine-tuned RBF+P model

In [None]:
import numpy as np
import pandas as pd
import time
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
from keras import backend as K
from tensorflow.keras.layers import Layer
from keras.initializers import Initializer, Constant 
from sklearn.cluster import KMeans


def load_airfoil_dataset():
    """Load the Airfoil Self-Noise dataset from the current directory
    """
    data = pd.read_csv('./airfoil_self_noise.dat', sep='\t', header=None)

    X = data.loc[:,0:4]
    y = data.loc[:,5]

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=10)

    # Feature normalization
    mean = X_train.mean(axis=0)
    X_train -= mean
    X_test -= mean

    std = X_train.std(axis=0)
    X_train /= std
    X_test /= std

    return X_train, X_test, y_train, y_test


class RBFLayer(Layer):

    def __init__(self, output_dim, centers_initializer, betas_trainable=False, **kwargs):
        self.output_dim = output_dim    # number of neurons in hidden layer
        self.init_betas = np.ones(self.output_dim)
        self.betas_trainable = betas_trainable
        self.centers_initializer = centers_initializer
        super(RBFLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.centers = self.add_weight(name='centers',
                                       shape=(self.output_dim, input_shape[1]),
                                       initializer=self.centers_initializer,
                                       trainable=False)
        
        if self.betas_trainable==True:
            self.betas = self.add_weight(name='betas',
                                        shape=(self.output_dim,),
                                        initializer=Constant(value=self.init_betas),
                                        trainable=True)
        else:
            # find max distance between any two centroids
            dmax = 0
            for i in range(0, self.output_dim):
                for j in range(0, self.output_dim):
                    if tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j]))) > dmax:
                        dmax = tf.math.sqrt(tf.math.reduce_sum(tf.math.square(self.centers[i]-self.centers[j])))
            
            beta = self.output_dim/(dmax**2)
            self.betas = self.init_betas*beta

        super(RBFLayer, self).build(input_shape)

    def call(self, x):
        C = self.centers[np.newaxis, :, :]
        X = x[:, np.newaxis, :]

        diffnorm = K.sum((C-X)**2, axis=-1)
        ret = K.exp( - self.betas * diffnorm)  # this is the RB function
        return ret

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)


class KMeansInit(Initializer):

    def __init__(self, X, max_iter=400):
        self.X = X
        self.max_iter = max_iter

    def __call__(self, shape, dtype=None):
        assert shape[1] == self.X.shape[1]

        n_centers = shape[0]
        km = KMeans(n_clusters=n_centers, max_iter=self.max_iter, verbose=0)
        km.fit(self.X)
        return km.cluster_centers_


def RMSE(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))


def MSE(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true))


def R_squared(y_true, y_pred):
    SS_res =  K.sum(K.square(y_true - y_pred))
    SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
    return ( 1 - SS_res/(SS_tot + K.epsilon()) ) # K.epsilon()=1E-8 avoids division with zero


def build_model(X_train, y_train):
    model = Sequential()

    n_neurons = int(0.5*X_train.shape[0])
    model.add(RBFLayer(n_neurons, centers_initializer=KMeansInit(X_train), input_shape=(5,)))
    model.add(Dense(256))
    model.add(Dense(1))

    opt = tf.keras.optimizers.SGD(learning_rate=0.001)
    model.compile(loss = MSE, metrics=[RMSE, R_squared], optimizer=opt)
    history = model.fit(X_train, y_train, batch_size=15, epochs=100, verbose=1, validation_split=0.2)

    return model, history


def plot_curves(history):
    # Plot loss

    plt.plot(history.history['loss'][2:history.params['epochs']])
    plt.plot(history.history['val_loss'][2:history.params['epochs']])
    plt.title('Model Loss')
    plt.ylabel('MSE')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()


def main():
    X_train, X_test, y_train, y_test = load_airfoil_dataset()

    start_time = time.time()
    model, history = build_model(X_train, y_train)
    print(f'Training time = {(time.time()-start_time):.2f} seconds.')

    start_time = time.time()
    model.evaluate(X_test, y_test)
    print(f'Testing time = {(time.time()-start_time):.2f} seconds.')

    plot_curves(history)


if __name__ == "__main__":
    main()