In [76]:
import numpy as np
from scipy.spatial import distance

def ComputeMaxDistance(X1, X2):
    
    max_dist = 0
    for x1 in X1:
        for x2 in X2:
            dist = distance.euclidean(x1, x2)
            max_dist = max(dist, max_dist)
    
    return max_dist

In [77]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
import tensorflow as tf 
from tensorflow import keras
from tensorflow.keras import Input
from tensorflow.keras import layers
from tensorflow.keras.optimizers import RMSprop

def RBF_kernel(x, centers, sigma):
  return [math.exp(-np.linalg.norm(x - c)**2) / (2 * sigma**2) for c in centers]

def RBF_layer_output(x, centers, sigma):
  return [RBF_kernel(x_, centers, sigma) for x_ in x]

class RBF(keras.layers.Layer):
  def __init__(self, units):
    super(RBF, self).__init__()
    self.units = units

  def build(self, input_shape):
    # w and b will be removed
    self.w = self.add_weight(
      shape=(input_shape[-1], self.units), initializer="ones", trainable=False
    )
    self.b = self.add_weight(shape=(self.units,), initializer="zeros", trainable=False) 

  def call(self, inputs):
    return [0 for i in range(37)]
    # return RBF_layer_output(inputs, self.centers, self.sigma)
    # return tf.matmul(float(inputs), float(self.w)) + float(self.b) # TODO rbf
    # return [0, 0, 0]

  def compute_params(self, inputs):
    kmeans = KMeans(n_clusters=self.units, random_state=0, copy_x=True).fit(inputs)
    self.centers = kmeans.cluster_centers_
    self.d_max = ComputeMaxDistance(self.centers, self.centers)
    self.sigma = self.d_max / (math.sqrt(2 * self.centers.shape[0]))

In [78]:
import numpy as np
import tensorflow as tf
from sklearn.cluster import KMeans
import utils
import math
import os

def test_params(d_max, sigma, err):
    if abs(d_max - 68.1778719396713) < err and abs(sigma - 27.83349966683499) < err:
        return True
    else:
        return False

# ---------------------------------------------------------------------------- #
#                                     test                                     #
# ---------------------------------------------------------------------------- #

num_of_samples = 6
np.random.seed(0)
x = np.random.randint(low=0, high=100, size=(num_of_samples, 2))
hidden_size = 3

rbf_layer_ = RBF(hidden_size)
rbf_layer_.compute_params(x)

if (test_params(rbf_layer_.d_max, rbf_layer_.sigma, 0.00001)):
    print("Test passed")
else:
    print("Test failed")


y = rbf_layer_(x)
print(y)




Test passed
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [79]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
import tensorflow as tf 
from tensorflow import keras
from tensorflow.keras import Input
from tensorflow.keras import layers
from tensorflow.keras.optimizers import RMSprop
import utils
import rbf_layer

# ---------------------------------------------------------------------------- #
#                                   read data                                  #
# ---------------------------------------------------------------------------- #

# data_url = "http://lib.stat.cmu.edu/datasets/boston"
data = "boston.csv"
raw_df = pd.read_csv(data, sep="\s+", skiprows=22, header=None)
X = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
y = raw_df.values[1::2, 2]

# ---------------------------------------------------------------------------- #
#                      split data to training and testing                      #
# ---------------------------------------------------------------------------- #

X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.75, test_size=0.25, random_state=0)


In [80]:
n_train = X_train.shape[0]
hidden_size = int(0.1 * n_train)
rbf_layer_ = rbf_layer.RBF(hidden_size)
rbf_layer_.compute_params(X_train)

In [81]:
model = keras.Sequential()
model.add(Input(shape=(13,)))

In [82]:
model.add(rbf_layer_)


OperatorNotAllowedInGraphError: Exception encountered when calling layer "rbf_17" (type RBF).

in user code:

    File "c:\Users\gitop\repos\computational-intelligence\rbf-reg\rbf_layer.py", line 33, in call  *
        return RBF_layer_output(inputs, self.centers, self.sigma)
    File "c:\Users\gitop\repos\computational-intelligence\rbf-reg\rbf_layer.py", line 18, in RBF_layer_output  *
        return [RBF_kernel(x_, centers, sigma) for x_ in x]

    OperatorNotAllowedInGraphError: iterating over `tf.Tensor` is not allowed in Graph execution. Use Eager execution or decorate this function with @tf.function.


Call arguments received:
  • inputs=tf.Tensor(shape=(None, 13), dtype=float32)

In [13]:
import tensorflow as tf

arr = [1., 2., 3., 4.]
arr2 = tf.constant(arr)
0.5*tf.math.exp(arr2)

<tf.Tensor: shape=(4,), dtype=float32, numpy=array([ 2.7182817,  7.389056 , 20.085537 , 54.59815  ], dtype=float32)>

In [32]:
# x = np.array([[1., 2.], [3., 4.], [5., 6.]])
# centers = np.array([[1., 2.], [3., 4.], [5., 6.]])
# sigma = 1

In [37]:
# modified kernel test

import numpy as np
import math


def RBF_kernel(x, centers, sigma):
  return [math.exp(-np.linalg.norm(x - c)/2) for c in centers]
  # return [math.exp(-np.linalg.norm(x - c)**2) / (2 * sigma**2) for c in centers]

def RBF_layer_output(x, centers, sigma):
  return [RBF_kernel(x_, centers, sigma) for x_ in x]

  
x = np.array([[0., 0.], [0., 1.], [1., 0.], [1., 1.]])
centers = x
sigma = 1

(RBF_layer_output(x, centers, sigma) == 
[[1.0, 0.6065306597126334, 0.6065306597126334, 0.4930686913952398],
 [0.6065306597126334, 1.0, 0.4930686913952398, 0.6065306597126334],
 [0.6065306597126334, 0.4930686913952398, 1.0, 0.6065306597126334],
 [0.4930686913952398, 0.6065306597126334, 0.6065306597126334, 1.0]])

True