In [1]:
import sklearn
import numpy as np
import random
import math

from scipy import integrate
from scipy.stats import norm, uniform

In [2]:
class PNNClassifier:
    def __init__(self, isSilent=True):
        if(not isinstance(isSilent, bool)):
            raise TypeError("Expected bool, got " + str(type(isSilent)))
        
        self.c = 10 ** 6
        
        self.X = None
        self.Y = None
        
        self.tmpX = None
        self.tmpY = None
        
        self.n = None
        self.classes = None
        
        self.isSilent = isSilent
    
    def fit(self, X, Y):
        if(not isinstance(X, np.ndarray)):
            raise TypeError("Expected np.ndarray, got " + str(type(X)))
        if(not isinstance(Y, np.ndarray)):
            raise TypeError("Expected np.ndarray, got " + str(type(Y)))
        if(len(X.shape) != 2):
            raise ValueError("Wrong X shape")
        if(len(Y.shape) != 1):
            raise ValueError("Wrong Y shape")
        if(X.shape[0] != Y.shape[0]):
            raise ValueError("Shapes does not match")

        self.X = np.copy(X)
        self.Y = np.copy(Y)
        
        self.tmpX = np.copy(X)
        self.tmpY = np.copy(Y)
        
        self.n = X.shape[0]
        self.classes = sorted(np.unique(self.Y))
    
    def _rearrangeData(self, x):
        self.tmpX, self.tmpY = zip(*sorted(zip(self.tmpX, self.tmpY), key = lambda q: np.linalg.norm(x - q[0])))
    
    def _YgivenXBetaK(self, beta, k):
        pYgivenXBetaK = 1
        
        beta = float(beta)
        
        for i in range(self.n):
            self._rearrangeData(self.X[i])
            numerator = 0
            denominator = 0
            for j in range(1, k + 1):
                if(self.Y[i] == self.tmpY[j]):
                    numerator += 1
            numerator = math.e ** ((numerator * beta) / k)
            
            cnt = np.zeros(len(self.classes), dtype=int)
            
            for j in range(1, k + 1):
                cnt[self.tmpY[j]] += 1
            
            for item in cnt:
                denominator += math.e ** ((item * beta) / k)

            pYgivenXBetaK *= (numerator / denominator)
        
        return pYgivenXBetaK
    
    def pygivenYXBetaK(self, x, y, beta, k):
        self._rearrangeData(x)
        
        beta = float(beta)
        
        numerator = 0
        denominator = 0
        
        for j in range(k):
            if(self.tmpY[j] == y):
                numerator += 1
        
        numerator = math.e ** ((numerator * beta) / k)
        
        cnt = np.zeros(len(self.classes), dtype=int)
            
        for j in range(k):
            cnt[self.tmpY[j]] += 1
            
        for item in cnt:
            denominator += math.e ** ((item * beta) / k)
        
        return numerator / denominator
    
    def predict_proba(self, X):
        if(self.X is None or self.Y is None):
            raise sklearn.exceptions.NotFittedError("You should fit before predicting")
        if(not isinstance(X, np.ndarray)):
            raise TypeError("Expected np.ndarray, got " + str(type(X)))
        if(X.shape[1] != self.X.shape[1]):
            raise ValueError("Invalid number of features")
        
        k = 1
        beta = 0.01
        
        res = np.zeros((X.shape[0], len(self.classes)), dtype=float)
        
        for epoch in range(20):
            for i, x in enumerate(X):
                for ci, clas in enumerate(self.classes):
                    res[i][ci] += self.pygivenYXBetaK(x, clas, beta, k)
            
            #print("-------------")
            #print("-------------")
            #print("-------------")
            #print("K:", k)
            #print("Beta:", beta)
            print(res/res.sum(axis=1)[:,None])
            #print("-------------")
            #print("-------------")
            #print("-------------")
            
            newK = k + random.randint(-5, 5)
            newK = max(newK, 1)
            newK = min(self.n - 1, newK)
            newBeta = beta + norm.rvs(loc=0, scale=10)
            newBeta = max(newBeta, 0)
            
            j = float(self._YgivenXBetaK(newBeta, newK)) / self._YgivenXBetaK(beta, k)
            j = min(j, 1)
            
            # print("J",j)
            
            if(random.random() <= j):
                k = newK
                beta = newBeta
        

In [3]:
import pandas as pd

In [4]:
df = pd.read_csv("./dataset/synth/train.csv")
xTrain = np.array(df[["xs", "ys"]])
yTrain = np.array(df["yc"])

model = PNNClassifier()
model.fit(xTrain, yTrain)

In [5]:
model.predict_proba(np.array([[-1, 0.2]]))

[[0.50249998 0.49750002]]
[[0.50124999 0.49875001]]
[[0.50083333 0.49916667]]
[[0.50062499 0.49937501]]
[[0.5005 0.4995]]
[[0.50041666 0.49958334]]
[[0.57168345 0.42831655]]
[[0.62513353 0.37486647]]
[[0.66670582 0.33329418]]
[[0.69996365 0.30003635]]
[[0.72717461 0.27282539]]
[[0.7498504 0.2501496]]
[[0.76903761 0.23096239]]
[[0.78548379 0.21451621]]
[[0.79973715 0.20026285]]
[[0.81220884 0.18779116]]
[[0.82321327 0.17678673]]
[[0.83299498 0.16700502]]
[[0.82808279 0.17191721]]
[[0.82366181 0.17633819]]


In [6]:
model.predict_proba(np.array([[-0.5, 0.6]]))

[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50249998 0.49750002]]
[[0.50218748 0.49781252]]
[[0.50194443 0.49805557]]
[[0.50174999 0.49825001]]
[[0.5015909 0.4984091]]
[[0.46798684 0.53201316]]
[[0.43955264 0.56044736]]
[[0.41518047 0.58481953]]
[[0.39405792 0.60594208]]
[[0.37557569 0.62442431]]
[[0.35926784 0.64073216]]
[[0.35326242 0.64673758]]
[[0.34788914 0.65211086]]
[[0.3430532 0.6569468]]
