In [1]:
import numpy as np
import plotly.express as ex
import plotly.graph_objects as go
from plotly.subplots import make_subplots


SEED = 2020

In [140]:
class RBF_Network:
    def __init__(self, n, inputs, variance=1, add_noise=False, useDelta=False, eta=0.1, useSqr=False):
        """
        weights : (n, 1)
        mean : (n, 1)
        variance : (n, 1)
        """
        
        np.random.seed(SEED)

        self.n = n
        self.add_noise = add_noise
        self.useDelta = useDelta
        self.eta = eta
        self.useSqr=useSqr

        # set position and width for each node
        self.mean = self.mu_random_choice(inputs)
        self.std = np.ones((n,inputs.shape[1]))*variance

        self.W = np.random.randn(self.n, inputs.shape[1])
        
        self.phi_of_x = np.reshape(np.array(self.mean), (1,-1))
        self.total_error = []
        self.residual_error = []
        self.f_of_x = []

    def mu_random_choice(self, inputs):
        """
        x = Input patterns
        n = number of nodes
        """
        return np.random.choice(x.ravel(), (self.n,inputs.shape[1]))

    def train_network(self, epochs, train_data, train_label, test_data=[], test_label=[]):
        """
        train_data, test_data : (N,1)
        train_label test_label : (N,1)
        """
        for e in range(0, epochs):
            if (self.add_noise):
                train_targets = self.add_noise_to_targets(train_label)
                test_targets = self.add_noise_to_targets(test_label)
            else:
                train_targets = train_label
                test_targets = test_label
                 
            if (self.useDelta):
                self.delta_learning(train_data, train_targets, test_data, test_targets)
            else:
                self.lsqr(train_data, train_targets)
            

    def add_noise_to_targets(self, x, std=0.1):
        t = np.copy(x)
        noise = np.random.normal(0,std,(x.shape))
        return t + noise
    
    def forward(self, x):
        self.phi_of_x = np.array([self.activation_fxn(x, i) for i in range(self.n)]).T
        self.f_of_x= self.phi_of_x @ self.W

    def activation_fxn(self, x, i):
        _x = x
        if (x.ndim > 1):
            if (x.shape[1] == 1):
                _x = x.ravel()

        _mu = self.mean[i].ravel()
        _std = self.std[i].ravel()

        return np.exp((-(_x-_mu)**2) / 2*(_std**2))

    def calc_total_error(self, label):
        """
        MSE
        """
        self.total_error.append(np.mean((self.f_of_x-label)**2))

    def update_weights(self, label):
        self.W, _,_,_ = np.linalg.lstsq(self.phi_of_x, label.ravel())
        self.W.reshape((-1,1))

    def lsqr(self, x, y):
        self.forward(x)
        self.update_weights(y)

    def delta_learning(self, x, y, tx, ty):
        errors = np.zeros(len(x))
        for i in range(len(x)):
            self.forward(x[i])
            error = y[i] - self.f_of_x
            self.W += (self.eta * error)[0] * self.phi_of_x.T

            if (len(tx) > 0):
                errors[i] = self.validate(tx[i], ty[i])
        
        self.residual_error.append(np.mean(errors))
         
    def validate(self, test_data, test_label, _plot=False):
        self.forward(test_data)
        if (self.useSqr):
            output = np.sign(self.f_of_x)
        else:
            output = self.f_of_x
        
        if _plot == True:
            fig = make_subplots(rows=1, cols=2, subplot_titles=("Validation with n={}".format(self.n), "Residual error"))
            fig.add_trace(go.Scatter(x=np.arange(len(self.residual_error)), y=self.residual_error, mode="lines", name="Residual error"), row=1, col=2)
            fig.add_trace(go.Scatter(x=test_data.ravel(), y=test_label.ravel(), mode="lines", name="Original"), row=1, col=1)
            fig.add_trace(go.Scatter(x=test_data.ravel(), y=output.ravel(), mode="lines", name="Approximated"), row=1, col=1)

            fig.show()

        return np.mean(np.abs(output-test_label)) 
        
    def init_cl(self, inputs, epochs, eta, useBias=False, bias_w=1e-2):
        """
        Run CL-algorithm for positioning the nodes
        Use bias to eliminate dead nodes
        """
        k = inputs.shape[1]
        b = np.ones((self.n,1)) * 1/self.n # Bias for each node
        wins = np.zeros((self.n,1)) # Counting the number of wins for each ndoe
        x = np.copy(inputs)

        for i in range(epochs):
            np.random.shuffle(x)
            for x_i in x:
                error = x_i - self.mean
                dist = np.sum(error*2,axis=1, keepdims=True)

                if (useBias):
                    dist -= bias_w * b

                winner = np.argmin(dist)
                self.mean[winner,:] += eta*error[winner,:]
                wins[winner]+=1       

                if (useBias):
                    b = 1/self.n - wins/(winner+1)   
             

        return wins
                







In [3]:
def data_gen(start, end, step=0.1):
    x = np.arange(start, end, step).reshape((-1,1))
    sin_data = np.sin(2*x).reshape((-1,1))
    sqr_data = np.sign(sin_data)
    return x, sin_data, sqr_data

def plot_fx_n_errror(patterns, targets, approximation, res_error, titles):
    fig = make_subplots(rows=1, cols=2, subplot_titles=(titles[0], titles[1]))
    fig.add_trace(go.Scatter(x=np.arange(len(res_error)), y=res_error, mode="lines", name="Residual error"), row=1, col=2)
    fig.add_trace(go.Scatter(x=x.ravel(), y=y.ravel(), mode="lines", name="Original"), row=1, col=1)
    fig.add_trace(go.Scatter(x=x.ravel(), y=approximation.ravel(), mode="lines", name="Approximated"), row=1, col=1)

    fig.show()


In [66]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)

fig = go.Figure().add_trace(go.Scatter(x=x.ravel(), y=sin_data.ravel(), mode="lines", name="sin(x)"))
fig.add_trace(go.Scatter(x=x.ravel(), y=sqr_data.ravel(), mode="lines", name="sqr(x)"))
fig.update_layout(title="Functions to approximate")
fig.show()

## Quick test of the model

Sin(x)

In [67]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

n = 10
epochs = 1 # Actually only one iteration is enough for least squre method

RBFNet = RBF_Network(n, x, add_noise=True)
# RBFNet.init_cl(x, 50, 1e-2, useBias=True) # Uncomment this to use CL initialization
RBFNet.train_network(epochs, x, sin_data, t_x, t_sin_data)
res_error = RBFNet.validate(t_x, t_sin_data, _plot=True)

print(res_error)

0.0343865793857196


Square(2x)

In [174]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi, step=0.05)

n = 10
epochs = 2

RBFNet = RBF_Network(n, x, useSqr=True)

# RBFNet.init_cl(x, 50, 1e-2) # Uncomment this to use CL initialization
RBFNet.train_network(epochs, x, sqr_data, t_x, t_sqr_data)
res_error = RBFNet.validate(t_x, t_sqr_data, _plot=True)

print(res_error)

0.0


Delta learining

In [6]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

n = 10
epochs = 100

RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True)
# RBFNet.init_cl(x, 100, 1e-1, useBias=True) # Uncomment this to use CL initialization
RBFNet.train_network(epochs, x, sin_data, t_x, t_sin_data)
res_error = RBFNet.validate(t_x, t_sin_data, _plot=True)

print(res_error)

0.8117172799116351


CL initialization

In [194]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

n = 10
epochs = 10

RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True)
RBFNet.init_cl(x, 100, 1e-1) # Uncomment this to use CL initialization
RBFNet.train_network(epochs, x, sin_data, t_x, t_sin_data)
res_error = RBFNet.validate(t_x, t_sin_data, _plot=True)

print(res_error)

0.8573057348528903


Testing different n for least square method

In [234]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

n_nodes = np.arange(1,10)
res_errors = []
outputs = []

epochs=100

for n in n_nodes:
    RBFNet = RBF_Network(n, x, useDelta=False, add_noise=False)
    RBFNet.train_network(epochs, x, sin_data)
    res_errors.append(RBFNet.validate(t_x, t_sin_data))
    outputs.append(RBFNet.f_of_x)

titles = ["Validation", "Residual error"]
fig = make_subplots(rows=1, cols=2, subplot_titles=(titles[0], titles[1]))

fig.add_trace(go.Scatter(x=np.arange(len(res_errors)), y=res_errors, mode="lines", name="error"), row=1, col=2)

fig.add_trace(go.Scatter(x=t_x.ravel(), y=sin_data.ravel(), mode="lines", name="Original"), row=1, col=1)

for i in range(1, len(outputs), 2):
    fig.add_trace(go.Scatter(x=t_x.ravel(), y=outputs[i].ravel(), mode="lines", name="n={}".format(i)), row=1, col=1)

fig.show()


In [7]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

n_nodes = np.arange(1,30)
res_errors = []
outputs = []

epochs=100

for n in n_nodes:
    RBFNet = RBF_Network(n, x, useDelta=True, add_noise=False)
    RBFNet.train_network(epochs, x, sin_data)
    res_errors.append(RBFNet.validate(t_x, t_sin_data))
    outputs.append(RBFNet.f_of_x)

titles = ["Validation", "Residual error"]
fig = make_subplots(rows=1, cols=2, subplot_titles=(titles[0], titles[1]))

fig.add_trace(go.Scatter(x=np.arange(len(res_errors)), y=res_errors, mode="lines", name="error"), row=1, col=2)

fig.add_trace(go.Scatter(x=t_x.ravel(), y=sin_data.ravel(), mode="lines", name="Original"), row=1, col=1)

for i in range(1, len(outputs), 3):
    fig.add_trace(go.Scatter(x=t_x.ravel(), y=outputs[i].ravel(), mode="lines", name="n={}".format(i)), row=1, col=1)

fig.show()


## 3.3.1 - Compare CL-based approach with earlier RBF network

N=10, Without noise

In [248]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

epochs = 100
n = 10

fig = make_subplots(rows=1, cols=2, subplot_titles=("Validation", "Residual error"))


fig.add_trace(go.Scatter(x=t_x.ravel(), y=sin_data.ravel(), mode="lines", name="Original"), row=1, col=1)

# Non-CL
RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True)
RBFNet.train_network(epochs, x, sin_data, t_x, t_sin_data)
RBFNet.validate(t_x, t_sin_data)

fig.add_trace(go.Scatter(x=np.arange(len(RBFNet.residual_error)), y=RBFNet.residual_error, mode="lines", name="Error Without CL"), row=1, col=2)
fig.add_trace(go.Scatter(x=t_x.ravel(), y=RBFNet.f_of_x.ravel(), mode="lines", name="Prediction Without CL"), row=1, col=1)

# Using CL
RBFNet_cl = RBF_Network(n, x, eta=1e-1,useDelta=True)
RBFNet_cl.init_cl(x, 100, 1e-1)
RBFNet_cl.train_network(epochs, x, sin_data, t_x, t_sin_data)
RBFNet_cl.validate(t_x, t_sin_data)

fig.add_trace(go.Scatter(x=np.arange(len(RBFNet_cl.residual_error)), y=RBFNet_cl.residual_error, mode="lines", name="Error With CL"), row=1, col=2)
fig.add_trace(go.Scatter(x=t_x.ravel(), y=RBFNet_cl.f_of_x.ravel(), mode="lines", name="Prediction With CL"), row=1, col=1)

fig.show()




N=10, With noise

In [249]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)

epochs = 100
n = 10

fig = make_subplots(rows=1, cols=2, subplot_titles=("Validation", "Residual error"))


fig.add_trace(go.Scatter(x=t_x.ravel(), y=sin_data.ravel(), mode="lines", name="Original"), row=1, col=1)

# Non-CL
RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True, add_noise=True)
RBFNet.train_network(epochs, x, sin_data, t_x, t_sin_data)
RBFNet.validate(t_x, t_sin_data)

fig.add_trace(go.Scatter(x=np.arange(len(RBFNet.residual_error)), y=RBFNet.residual_error, mode="lines", name="Error Without CL"), row=1, col=2)
fig.add_trace(go.Scatter(x=t_x.ravel(), y=RBFNet.f_of_x.ravel(), mode="lines", name="Prediction Without CL"), row=1, col=1)

# Using CL
RBFNet_cl = RBF_Network(n, x, eta=1e-1,useDelta=True,add_noise=True)
RBFNet_cl.init_cl(x, 100, 1e-1)
RBFNet_cl.train_network(epochs, x, sin_data, t_x, t_sin_data)
RBFNet_cl.validate(t_x, t_sin_data)

fig.add_trace(go.Scatter(x=np.arange(len(RBFNet_cl.residual_error)), y=RBFNet_cl.residual_error, mode="lines", name="Error With CL"), row=1, col=2)
fig.add_trace(go.Scatter(x=t_x.ravel(), y=RBFNet_cl.f_of_x.ravel(), mode="lines", name="Prediction With CL"), row=1, col=1)

fig.show()




## 3.3.2 - Avoid dead nodes



In [280]:
x, sin_data, sqr_data = data_gen(0, 2*np.pi)
t_x, t_sin_data, t_sqr_data = data_gen(0.05, 2*np.pi)


epochs = 100
n = 10

# Without bias
fig = go.Figure()
RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True)
wins1 = RBFNet.init_cl(x, 100, 1e-1)
fig.add_trace(go.Bar(x=np.arange(len(wins1)), y=wins1.ravel(), name="Without bias"))

# With bias
RBFNet = RBF_Network(n, x, eta=1e-1, useDelta=True)
wins2 = RBFNet.init_cl(x, 100, 1e-1, useBias=True)
fig.add_trace(go.Bar(x=np.arange(len(wins2)), y=wins2.ravel(), name="With bias"))

fig.show()


## 3.3.3 - Approximate two dimensional function

In [32]:
def load():
    inputs, targets = load_data("./jakob/data/ballist.dat")
    test_inputs, test_targets = load_data("./jakob/data/balltest.dat")

    return inputs, targets, test_inputs, test_targets

def load_data(filepath):
    data = np.fromfile(filepath, dtype=np.float32, sep=" ")
    inputs = []
    targets = []
    for i in range(0, len(data), 4):
        inputs.append([data[i], data[i+1]])
        targets.append([data[i+2], data[i+3]])
    
    return np.asarray(inputs), np.asarray(targets)



In [158]:
x, y, tx, ty = load()

fig = go.Figure().add_trace(go.Scatter3d(x=x[:,0], y=x[:,1], z=y[:,0], mode="markers", name="feature 1"))
fig.add_trace(go.Scatter3d(x=x[:,0], y=x[:,1], z=y[:,1], mode="markers",name="feature 2"))
fig.show()

In [237]:
n=20
epochs=500

RBFNet = RBF_Network(n, x, eta=1e-3, useDelta=True)
wins = RBFNet.init_cl(x, 10000, 1e-4, useBias=True)

meanFig = ex.bar(x=np.arange(len(wins)), y=wins.ravel())
meanFig.show()

clsFig = ex.scatter(x=x[:, 0], y=x[:,1],title="Input space")
clsFig.add_trace(go.Scatter(x=RBFNet.mean[:,0], y=RBFNet.mean[:,1], name="Node positions", mode="markers", marker_size=10))
clsFig.show()

RBFNet.train_network(epochs, x, y, tx, ty)
RBFNet.validate(tx, ty, _plot=False)


0.18213414070053766

In [228]:
# Finding best CL initialization
# bset epoch = 10 000
# best eta = 1e-4
# epochs = [10, 50, 100, 200, 500, 1000, 5000]
epochs = [3000, 5000, 10000,]
eta = [1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7]

min_dist = np.inf
min_et = 0
min_ep = 0

for ep in epochs:
    for et in eta:
        RBFNet = RBF_Network(n, x)
        RBFNet.init_cl(x, ep, et, useBias=True)

        dist = []
        for mu in RBFNet.mean:
            err = np.sum(x - mu, axis=0)
            dist.append(err)
        dist = np.array(dist)
        s = dist.sum()
        if (s < min_dist):
            min_dist = s
            min_ep = ep
            min_et = et

print(min_dist)
print(min_ep)
print(min_et)    

0.0035079122
10000
0.0001


In [236]:
# Coarse search for best training settings
np.random.seed(SEED)
nodes=[5, 10, 15, 20, 25, 30]
epochs=[10, 50, 100, 200, 500, 1000]
eta=[1, 1e-1, 1e-2, 1e-3, 1e-4,1e-5]

min_error = np.inf
min_n = 0
min_epoch=0
min_eta = 0
best_positions=np.array([
 [0.5200957,  0.51857686],
 [0.52392626, 0.5201171 ],
 [0.52077186, 0.5181832 ],
 [0.52173585, 0.52055633],
 [0.5291993 , 0.5190004 ],
 [0.522605  , 0.5213044 ],
 [0.5266128 , 0.5188718 ],
 [0.5244044 , 0.521691  ],
 [0.52271897, 0.5175167 ],
 [0.5220817 , 0.51805735],
 [0.5202942 , 0.51773745],
 [0.5269549 , 0.52212995],
 [0.5213842 , 0.51788706],
 [0.52149975, 0.5142093 ],
 [0.52459115, 0.5162019 ],
 [0.52449733, 0.51473016],
 [0.52214   , 0.51917845],
 [0.52288896, 0.5150612 ],
 [0.5278722 , 0.5203877 ],
 [0.5241938 , 0.52292675]])

for n in nodes:
    for ep in epochs:
        for et in eta:
            RBFNet = RBF_Network(n, x, eta=et, useDelta=True)
            RBFNet.mean = best_positions
            RBFNet.train_network(ep, x, y, tx, ty)
            err = RBFNet.validate(tx, ty, _plot=False)

            if err < min_error:
                min_error = err
                min_n = n
                min_epoch = ep
                min_eta = et

print(min_error)
print(min_n)
print(min_ep)
print(min_et)


IndexError: index 20 is out of bounds for axis 0 with size 20

In [238]:
errorFig = ex.scatter(x=np.arange(len(RBFNet.residual_error)), y=RBFNet.residual_error, title="Residual error")
errorFig.show()

In [239]:
# outputs = np.sum(RBFNet.f_of_x, axis=0)
outputs = np.mean(RBFNet.f_of_x, axis=0)
# outputs = RBFNet.f_of_x[1]
# outputs = RBFNet.f_of_x[1] - RBFNet.f_of_x[0]
mae = np.mean(np.abs(ty-outputs), axis=1)
maefig = ex.scatter_3d(x=tx[:,0], y=tx[:,1], z=mae, title="MAE")

fig1 = ex.scatter_3d(x=tx[:,0], y=tx[:,1], z=ty[:,0], title="Feature 1")
fig2 = ex.scatter_3d(x=tx[:,0], y=tx[:,1], z=ty[:,1], title="Feature 2")

fig1.add_trace(go.Scatter3d(x=tx[:,0], y=tx[:,1], z=outputs[:,0], mode="markers",name="p1"))
fig2.add_trace(go.Scatter3d(x=tx[:,0], y=tx[:,1], z=outputs[:,1], mode="markers",name="p2"))


fig1.show()
fig2.show()
maefig.show()

In [240]:
print(RBFNet.mean)

[[0.5200957  0.51857686]
 [0.52392626 0.5201171 ]
 [0.52077186 0.5181832 ]
 [0.52173585 0.52055633]
 [0.5291993  0.5190004 ]
 [0.522605   0.5213044 ]
 [0.5266128  0.5188718 ]
 [0.5244044  0.521691  ]
 [0.52271897 0.5175167 ]
 [0.5220817  0.51805735]
 [0.5202942  0.51773745]
 [0.5269549  0.52212995]
 [0.5213842  0.51788706]
 [0.52149975 0.5142093 ]
 [0.52459115 0.5162019 ]
 [0.52449733 0.51473016]
 [0.52214    0.51917845]
 [0.52288896 0.5150612 ]
 [0.5278722  0.5203877 ]
 [0.5241938  0.52292675]]
