In [75]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
from sklearn.neighbors import NearestNeighbors
from numpy import linalg as LA
from tqdm import tqdm
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.model_selection import LeaveOneOut
from sklearn.ensemble import RandomForestRegressor

In [76]:
class RBML:
    def __init__(self, x, y, k=3, alpha=0.5, beta=2.):
        self.x = x
        self.len_ = len(x)
        self.y = y
        self.k = k
        self.alpha = alpha
        self.beta = beta
        self.labels = np.unique(self.y)
        self.neigh = NearestNeighbors(n_neighbors=4)
        self.neigh.fit(self.x)
        
    def find_target_neighbors(self, i):
        neighbors = self.neigh.kneighbors(self.x[i].reshape(1, -1), self.len_, return_distance=False)[:,1:][0]
        target_neighbors = []
        label = y[i]
        count_ = 0
        l = 0
        while count_ != self.k and l < self.len_-1:
            if self.y[neighbors[l]] == label:
                target_neighbors.append(neighbors[l])
                count_+=1
            l+=1
        return target_neighbors    
    
    def find_imposters(self, i):
        neighbors = self.neigh.kneighbors(self.x[i].reshape(1, -1), self.len_, return_distance=False)[:,1:][0]
        imposters = []
        label = y[i]
        k = 0
        while k < self.len_-1:
            if self.y[neighbors[k]] != label:
                imposters.append(neighbors[k])
            k+=1
        return imposters
    
    def delta_t(self, i, j):
        neighbors = self.find_target_neighbors(i)
        return 1 if j in neighbors else 0
    
    def Tv(self, i):
        sum_ = 0
        for j in range(self.len_):
            sum_ += (LA.norm(x[i] - x[j])**2)*self.delta_t(i, j)
        return sum_
    
    def TX(self):
        sum_ = 0
        for i in tqdm(range(self.len_)):
            sum_ += self.Tv(i)
        return sum_
    
    def m(self, i):
        neighbors = self.find_target_neighbors(i)
        # neighbors[-1] most distant target neighbor
        mi = self.beta*(LA.norm(x[i] - self.x[neighbors[-1]])**2)*self.delta_t(i, neighbors[-1])
        return mi
    
    def delta_i(self, i, j):
        #print("la",LA.norm(x[i] - x[j])**2)
        #print("mi",self.m(i))
        t1 = (LA.norm(x[i] - x[j])**2 < self.m(i))
        t2 = (y[i] != y[j])
        return 1 if t1 and t2 else 0
    
    def Hv(self, i):
        sum_ = 0
        for j in range(self.len_):
            sum_ += (self.m(i) - LA.norm(x[i] - x[j])**2)*self.delta_i(i,j)
        return sum_
    
    def HX(self):
        sum_ = 0
        for i in tqdm(range(self.len_)):
            sum_ += self.Hv(i)
        return sum_
    
    def C(self):
        return (1-self.alpha)*self.TX() + self.alpha*self.HX()
    
    def xiN(self, i):
        sum_ = 0
        sum_t = 0
        for j in range(len(self.x)):
            sum_ += self.x[j]*self.delta_t(i,j)
            sum_t += self.delta_t(i,j)
        
        return sum_/sum_t if sum_t != 0 else 0
    
    def xjI(self, i):
        sum_ = 0
        sum_t = 0
        for j in range(self.len_):
            delta_value = self.delta_i(i,j)
            sum_ += self.x[j]*delta_value
            sum_t += delta_value
        #  if sum_t != 0 else 0
        return sum_/sum_t if sum_t != 0 else 0
    
    # Updated hinge loss
    def Hv_(self, i):
        return max(0, (self.m(i) - LA.norm(x[i] - self.xjI(i))**2))
    
    def xiH(self, i):
        return self.x[i] + self.m(i)*((self.x[i]-self.xjI(i))/(LA.norm(x[i] - self.xjI(i))**2))  
    
    def xi_star(self, i):
        return (1-self.alpha)*self.xiN(i) + self.alpha*self.xiH(i)
        

In [77]:
# This method for Iris, Wine, Sonar datasets(a leave-one-out cross validation)
def convergence_method(x, y, splitter, k=3):
    accuracies = []
    for train_index, test_index in splitter:
        X_train, X_test = x[train_index], x[test_index]
        y_train, y_test = y[train_index], y[test_index]
        classifier = KNeighborsClassifier(n_neighbors=k)
        classifier.fit(X_train, y_train)
        y_pred = classifier.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        accuracies.append(acc)
    accuracies = np.array(accuracies)
    acc_mean = np.mean(accuracies)
    return acc_mean

# Build a K-NN classifier with X*'s (at first iteration with X's) then test the test dataset
# This method for Balance, pima, vowel datasets(With train and test(250 sample) splits)
def convergence_method_v2(X_train, y_train, X_test, y_test, k=3):
    classifier = KNeighborsClassifier(n_neighbors=k)
    classifier.fit(X_train, y_train)
    y_pred = classifier.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    return acc

In [78]:
# This method for Iris, Wine, Sonar datasets(a leave-one-out cross validation)
def rblm_process(x, y, splitter):
    rbml_ = RBML(x, y)
    flag = True
    old_acc = convergence_method(x, y, splitter.split(x))

    while flag:
        print(f"Accuracy before iteration = {old_acc}")
        x_stars = []
        for i in tqdm(range(rbml_.len_)):
            x_stars.append(rbml_.xi_star(i))
        x_stars = np.array(x_stars)

        rbml_.x = x_stars
        new_acc = convergence_method(rbml_.x, rbml_.y, splitter.split(rbml_.x))
        print(f"Accuracy after iteration = {new_acc}")
        # Iteration will continue until it converges
        # Converge conditions are accuracy will be 100% or accuracy won't increase from previous one 
        if new_acc <= old_acc or new_acc == 1:
            flag = False
        else:
            old_acc = new_acc
    return x_stars

# This method for Balance, pima, vowel datasets(With train and test(250 sample) splits)
def rblm_process_v2(X_train, y_train, X_test, y_test):
    rbml_ = RBML(X_train, y_train)
    flag = True
    old_acc = convergence_method_v2(rbml_.x, rbml_.y, X_test, y_test)

    while flag:
        print(f"Accuracy before iteration = {old_acc}")
        x_stars = []
        # Find the x stars according to algorithm
        for i in tqdm(range(rbml_.len_)):
            x_stars.append(rbml_.xi_star(i))
        x_stars = np.array(x_stars)

        rbml_.x = x_stars
        new_acc = convergence_method_v2(rbml_.x, rbml_.y, X_test, y_test)
        
        print(f"Accuracy after iteration = {new_acc}")
        # Iteration will continue until it converges
        # Converge conditions are accuracy will be 100% or accuracy won't increase from previous one 
        if new_acc <= old_acc or new_acc == 1:
            flag = False
        else:
            old_acc = new_acc
    return x_stars

In [111]:
def init_regressor(x, x_star):
    regressionSets = []
    predictors = []
    arr_x = np.array(x)
    arr_x_star = np.array(x_star)
    arr_x[0]
    featureSize = len(x[0])
    for i in range(featureSize):
        predictors.append(RandomForestRegressor(max_depth=2, random_state=0))
        predictors[i].fit(arr_x,arr_x_star[:,i])
    
    return predictors

def regression(predictor, x):
    result = []
    for i in range(len(x)):
        val = predictor[i].predict([x])
        result.append(val[0])
    return result

# Iris Dataset

In [5]:
loo = LeaveOneOut()

In [6]:
data = pd.read_csv("IRIS.csv")
x = data.iloc[:, [1, 2, 3, 4]].values
y = data.iloc[:, [5]].values.ravel()

In [7]:
x_starts = rblm_process(x,y, loo)

Accuracy before iteration = 0.96


100%|██████████| 150/150 [01:05<00:00,  2.29it/s]


Accuracy after iteration = 1.0


In [114]:
predictor = init_regressor(x, x_starts)
x_start_reg = []
for x_ in x:
    x_start_reg.append(regression(predictor, x_))

convergence_method_v2(x_start_reg,y, x,y)


0.9733333333333334

# Wine Dataset

In [None]:
from sklearn.datasets import load_wine

In [None]:
wine_dataset = load_wine()
x = wine_dataset['data']
y = wine_dataset['target'].ravel()

In [None]:
x_stars = rblm_process(x,y,loo)

# Sonar dataset

In [300]:
data = pd.read_csv("sonar_csv.csv")
x = data.iloc[:, :-1].values
y = data.iloc[:, -1].values.ravel()

In [297]:
x_stars = rblm_process(x,y,loo)

Accuracy before iteration = 0.8894230769230769


100%|█████████████████████████████████████████| 208/208 [01:28<00:00,  2.36it/s]


Accuracy after iteration = 0.9711538461538461
Accuracy before iteration = 0.9711538461538461


100%|█████████████████████████████████████████| 208/208 [01:40<00:00,  2.07it/s]


Accuracy after iteration = 0.9951923076923077
Accuracy before iteration = 0.9951923076923077


100%|█████████████████████████████████████████| 208/208 [01:26<00:00,  2.40it/s]

Accuracy after iteration = 0.9951923076923077





# Balance dataset

In [352]:
import pandas as pd
data =  pd.read_csv('balance-scale.data', sep=",")
x = data.iloc[:, 1:].values
y = data.iloc[:, 0].values.ravel()

In [353]:
# Split train and test data(250 test data according to paper)
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=250, random_state=42)

In [354]:
x_stars = rblm_process_v2(X_train, y_train, X_test, y_test)

Accuracy before iteration = 0.8


100%|█████████████████████████████████████████| 374/374 [05:45<00:00,  1.08it/s]

Accuracy after iteration = 0.788





# Pima Dataset

In [356]:
data = pd.read_csv("diabetes.csv")
x = data.iloc[:, :-1].values
y = data.iloc[:, -1].values.ravel()

In [362]:
# Split train and test data(250 test data according to paper)
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=250, random_state=42)

In [None]:
x_stars = rblm_process_v2(X_train, y_train, X_test, y_test)

# Vowel Dataset

In [408]:
data = pd.read_csv("vowel-context.data", header=None, sep=' ')
x = data.iloc[:, [3,4,5,6,7,8,9,10,11,12]].values
y = data.iloc[:, 14].values.ravel()
y = np.nan_to_num(y, nan=10)

In [409]:
# Split train and test data(250 test data according to paper)
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=250, random_state=42)

In [None]:
x_stars = rblm_process_v2(X_train, y_train, X_test, y_test)