In [1]:
from keras.datasets import mnist  #Used to load the MNIST dataset.
import matplotlib.pyplot as plt   #Used to show graphs.
import numpy as np  
import time                       #Used to calculate elapsed time.
np.set_printoptions(suppress=True)  #Disables scientific notation when printing.

In [2]:
#Data loading using keras mnist.load_data() function
(X_train, y_train), (X_test, y_test) = mnist.load_data()

#Reduces the number of dimensions from 3 to 2
X_train=X_train.reshape(X_train.shape[0],X_train.shape[1]*X_train.shape[2])
X_test = X_test.reshape(X_test.shape[0],X_test.shape[1]*X_test.shape[2])
y_train = np.array(y_train, dtype=np.int8)
y_test = np.array(y_test, dtype=np.int8)

In [3]:
#Encoding our y data to 2 classes , for even and odd numbers.
def encode(A):
  xE= []
  for x in range(len(A)):
    if (A[x]%2==0):
      value = 1
    else:
      value=-1
    xE.append(value)
  return np.array(xE)

#Normalizing our data by dividing them all by 255
def normalize(X_array):
  X_array=X_array/255
  X_array=np.round(X_array,5)
  return X_array

In [4]:
#Applying the above functions to our data

y_train = encode(y_train)
y_test = encode(y_test)
X_train = normalize(X_train)
X_test = normalize(X_test)

In [5]:
#Τaking smaller samples of "size" data to use
index = np.random.choice(X_train.shape[0], size=8000, replace=False)
index2 = np.random.choice(X_test.shape[0], size=1000, replace=False)
X_train_sm=X_train[index]
y_train_sm=y_train[index]
X_test_sm=X_test[index2]
y_test_sm=y_test[index2]

In [6]:
#def getDistance(x,y):
 # distance = (sum((x-y)**2))**0.5
 # return distance

#Calculate the distance of element x from each center y all together
def groupGetDistance(x,y):
  np.tile(x,(len(y),1))
  d = np.sum((x-y)**2,axis=1)**0.5
  return d
  
#KMeans algorithm. Takes arguments : X: X dataset, cLen : number of centers, iterations: no of iterations of the algorithm
#Returns Centers,loss,standard_deviation
def KMeans(X,cLen,iterations):
  losses = [0]
  index = np.random.choice(X.shape[0], size=cLen, replace=False)
  centers=X[index]

  for it in range(iterations):
    distanceLoss = np.zeros(cLen)
    #Keeps count of data points close to each center
    itemsPerCenter = np.zeros(cLen)
    #Initializes centers from random data points of X set
    centerValues = np.zeros(shape = (cLen,len(X[0])))
    print("Iteration: ",it+1)

    for Xindex,Xvalue in enumerate(X):

      #for centerIndex,center in enumerate(centers):
        #distance[centerIndex] = getDistance(Xvalue,center)
        
    #Calculates distance of Xvalue from all centers and keeps the minimum distance to clIndex
      distance = groupGetDistance(Xvalue,centers)
      clIndex = np.argmin(distance)
    
    #Count for classified center is increased by one and 
      itemsPerCenter[clIndex]+=1
        
    #Keeps the sum of each point close to each center, which will later be divided with the count to get the new centers
      centerValues[clIndex]+=Xvalue
        
    #Keeps the distance of each point close to each center, which will later be divided with the count to get standard deviation
      distanceLoss[clIndex]+=distance[clIndex]
    #Calculate new centers and loss.
    centers = centerValues/itemsPerCenter[:,None]
    losses.append(sum(distanceLoss)/cLen)
    #If loss has not changed between iterations , bypass iteration number and end the process early.
    if (losses[it]==losses[it+1]):
      print("Done after " ,it+1," iterations.")
      return centers,losses,distanceLoss/itemsPerCenter
  return centers,losses,distanceLoss/itemsPerCenter

#takes random points of X dataset as centers. Basically KMeans for 1 iteration without calculating new centers.
#Takes X: X dataset, cLen: number of centers
#Returns centers,standard_deviation
def randomCenters(X,cLen):
  losses = [0]
  index = np.random.choice(X.shape[0], size=cLen, replace=False)
  centers=X[index]

  distanceLoss = np.zeros(cLen)
  itemsPerCenter = np.zeros(cLen)
  centerValues = np.zeros(shape = (cLen,len(X[0])))

  for Xvalue in X:
    distance = groupGetDistance(Xvalue,centers)
    clIndex = np.argmin(distance)
    itemsPerCenter[clIndex]+=1
    centerValues[clIndex]+=Xvalue
    distanceLoss[clIndex]+=distance[clIndex]
  return centers,distanceLoss/itemsPerCenter

#Gaussian Radial Basis Function
def RBFgaussian(X,centers,std):
  return  ( np.exp(-((np.linalg.norm(X-centers,axis=1)**2) / (2*(std**2)))) ) 

#Multiquadratic Radial Basis Function
def RBFquad(X,centers,std):
  return (  np.sqrt((np.linalg.norm(X-centers,axis=1)**2) + (std**2))  )

#Cauchy Radial Basis Function
def RBFcauchy(X,centers,std):
  return ( ( ( (np.linalg.norm(X-centers,axis=1)**2) + (std**2) )**-1 ) / std )

#Least Squares Error Loss function
def leastSquaresLoss(y,out):
  return 0.5*((y-out)**2)



In [7]:
#Get centers and standard deviation using KMeans algorithm
#Remove centers with no data points and plot loss for eacah iteration
centerT = time.time()
centers,losses,devs = KMeans(X_train,100,3)
std = devs[devs!=0]
centers = centers[devs!=0]


centerT2 = time.time()

#Execution time of the algorithm
centerCalcTime = centerT2 - centerT

Iteration:  1
Iteration:  2
Iteration:  3


In [12]:
featX = np.linalg.norm(X_train[0]-centers[0])

In [14]:
featX

8.513631078692818

In [None]:
plt.plot(losses[1:])
plt.xlabel('No of iterations')
plt.ylabel('Loss')
plt.title('Avg Distance from centers')

plt.show()
print("Time to find centers: ",int(centerCalcTime)," seconds.")

In [None]:
#Get centers and standard deviation using random Centers algorithm
#Remove centers with no data points.Commented out (Works)

#centers,devs = randomCenters(X_train,1000)
#std = devs[devs!=0]
#centers = centers[devs!=0]

In [None]:
#X: X dataset, y: y labels
#losses: loss of each epoch all in a list for plotting purposes
#bias, weights initialized to 0
#weight length equals the number of centers.
#learing_rate : lr hyperparameter , epochs: number of algorithm iterations.

#
start = time.time()
X = X_train
y = y_train
losses = []
loss = 0
bias = 0
learning_rate = 0.5
weights = np.zeros(len(centers[:,]))
epochs = 1

#For each element in X dataset (len = 784): 
#calculate featX using RBF (len = no of centers) -> forward pass -> calculate loss -> update weights and bias. 
#Repeat for no of epochs

for e in range(epochs):
    
  randomize = np.arange(len(X))
  np.random.shuffle(randomize)
  X = X[randomize]
  y = y[randomize]
    
  for index,Xvalue in enumerate(X):

    featX = RBFgaussian(Xvalue,centers,std)
    
    #Negative outputs become -1 and positive become +1
    output = np.sign( np.dot(weights,featX) +bias ) #wT.f(x) + b

    #Calculate loss with Least Squares function
    loss+= leastSquaresLoss(y[index],output)
    
    #Update weights and bias using delta rule
    weights += learning_rate* (y[index] - output)*featX
    bias += learning_rate* (y[index] - output)  
  losses.append(loss)
  print("Epoch ",e+1)
  print("Loss:",loss)
  loss =0 

In [None]:
#For each element in the training set,transfer it to the feature space, forward pass and check if it matches the data's label.
X = X_test
y = y_test
correct = 0

for index,Xvalue in enumerate(X):

  featX = RBFgaussian(Xvalue,centers,std)

  output = np.sign( np.dot(weights,featX) + bias )

  if (output == y[index]):
    correct+=1

print("Test Accuracy : ",(correct*100) / len(X))

#Same algorithm as above but for training set
X = X_train
y = y_train
correct = 0

for index,Xvalue in enumerate(X):

  featX = RBFgaussian(Xvalue,centers,std)

  output = np.sign( np.dot(weights,featX) + bias )

  if (output == y[index]):
    correct+=1
end = time.time()
print("Test Accuracy : ",(correct*100) / len(X))
print("Time: ",end-start)

In [None]:
plt.plot(losses)
plt.xlabel('No of epochs')
plt.ylabel('Loss')
plt.title('Training Loss Per Epoch')

plt.show()