LA with NN and measurement parameters

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from collections import Counter
import os
from imblearn.over_sampling import RandomOverSampler
eta = 0.01
max_epochs = 1000
la_size = 2
golbal_delta=0.5

def tanh(x):
    a = (np.exp(x) - np.exp(-x))/(np.exp(x) + np.exp(-x))
    b = 1 - np.power(a,2)
    return (a,b)

def relu(x):
    #return np.maximum(0,x)
    a = np.where(x>0,x,0)
    b = np.where(x>0,1,0)
    return (a,b)

def softmax(x, y):
    expo=np.exp(x-np.max(x))
    a = expo / expo.sum()
    b = -(a-y)
    return (a,b)

def preprocess(data):
    '''
    perform log normalization and min-max normalization
    '''
    #Log Transformation
    data = np.ma.log(data.astype('float32')).filled(0)
    #min max normalization
    max_in_columns = np.max(data, 0)
    min_in_columns = np.amin(data, 0)
    for i in range(data.shape[1]):
        data[:, i] = data[:, i]-min_in_columns[i]
        if (max_in_columns[i]!=min_in_columns[i]):
            data[:, i]=data[:, i]/(max_in_columns[i]-min_in_columns[i])
    return data

def change(x):
    if x==True:
        return 1
    elif x==False:
        return 0

def get_data():
    '''
    read data from file; split into X,Y
    preprocess X
    split X,Y into x_train,x_test,y_train,y_test
    '''
    df=pd.read_csv("cm1.csv")
    X=df.iloc[:,:-1]
    Y=df.iloc[:,-1]
    Y=Y.apply(change)

    #data sampling by generating synthetic data 
    #smote=SMOTE(sampling_strategy='minority')
    #X,Y=smote.fit_sample(X,Y)

    #data oversampling
    oversample = RandomOverSampler(sampling_strategy='minority')
    X,Y = oversample.fit_resample(X, Y)
    print(Counter(Y))


    xtrain,xtest,ytrain,ytest = train_test_split(X,Y,test_size=0.2,random_state=42,stratify=Y)

    xtrain=preprocess(np.array(xtrain))
    xtest=preprocess(np.array(xtest))

    tr_x = []
    for x in xtrain:
        tr_x.append(x.reshape(-1,1))
    ts_x = []
    #print("train x=",len(tr_x),tr_x[0].shape)
    for x in xtest:
        ts_x.append(x.reshape(-1,1))
    #print("test x=",len(ts_x),ts_x[0].shape)
    tr_y = []
    for y in ytrain:
        temp = np.zeros((2,1))
        temp[y]=1.0
        tr_y.append(temp)
    #print("train y=",len(tr_y),tr_y[0].shape)

    return tr_x,tr_y,ts_x,ytest

class Layer:
    def __init__(self, inputsize, outputsize, activation, LA_size, is_outputlayer=False):
        '''
        initailize weights, biases, activation
        '''
        self.inputsize = inputsize
        self.outputsize = outputsize
        self.activation = activation
        self.LA_size = LA_size
        #shape of w:{outputsize, inputsize}
        #self.weights = np.random.normal(loc=0.0, scale = np.sqrt(2/(self.inputsize+self.outputsize)), 
        #                               size = (self.outputsize,self.inputsize))
        
        self.LA_weights = np.random.rand(self.LA_size,self.outputsize,self.inputsize)
        self.weights = np.random.rand(self.outputsize,self.inputsize)
        #self.weights = np.full((self.outputsize,self.inputsize),0.5)
        #shape of b:{outputsize, 1}
        #self.biases = np.random.normal(loc=0.0, scale = np.sqrt(2/(1+self.outputsize)), 
        #                                size = (self.outputsize,1))
        self.LA_biases = np.random.rand(self.LA_size,self.outputsize,1)
        self.biases = np.random.rand(self.outputsize,1)
        #self.biases = np.full((self.outputsize,1),0.5)

        self.LA_prob = np.full((self.outputsize,self.LA_size),1/self.LA_size)
        self.LA_choice = np.zeros((self.outputsize,1))

        self.err = np.zeros((self.outputsize, 1))
        self.delta_weights = np.zeros((self.outputsize, self.inputsize))
        self.delta_biases = np.zeros((self.outputsize, 1))
        self.output = np.zeros((self.outputsize, 1))
        self.input = np.zeros((self.inputsize,1))
        self.is_outputlayer = is_outputlayer
    
    def selectNN(self):
        '''
        select the neurons in LCU
        '''
        temp = np.arange(self.LA_size)
        for i in range(self.outputsize):
            self.LA_choice[i]=np.random.choice(temp,p=self.LA_prob[i])
            self.weights[i] = self.LA_weights[int(self.LA_choice[i])][i]
            self.biases[i] = self.LA_biases[int(self.LA_choice[i])][i]
        
    def feedforward(self, input):
        '''
        input=previous layers output
        '''
        self.selectNN()
        self.input = input
        if self.is_outputlayer:
            self.output = self.activation(np.dot(self.weights, self.input) + self.biases,1)[0]
        else:
            self.output = self.activation(np.dot(self.weights, self.input) + self.biases)[0]
        return self.output
    
    def backpropagate(self, next_weights, next_delta, learning_rate, target):
        '''
        next_weights=next layers weights
        next_delta=next layers error term
        previous_output = previous layers output
        '''
        if self.is_outputlayer:
            temp = self.activation(np.dot(self.weights, self.input) + self.biases, target)
            activation_prime = temp[1]
        else:
            activation_prime = self.activation(np.dot(self.weights, self.input) + self.biases)[1]
        self.err = activation_prime*np.dot(next_weights.T,next_delta)
        self.delta_weights = learning_rate*(np.dot(self.err,self.input.T))
        self.delta_biases = learning_rate*self.err
        return self.err
    
    def update_weights(self, flag, index):
        '''
        update the weights and biases
        '''
        delta = golbal_delta
        self.weights = self.weights + self.delta_weights
        self.biases = self.biases + self.delta_biases
        if flag:
            self.LA_prob[index]=self.LA_prob[index]-delta
            self.LA_prob[index]=np.where(self.LA_prob[index]>0,self.LA_prob[index],0)
            temp_sum=np.sum(self.LA_prob[index])-self.LA_prob[index][int(self.LA_choice[index])]
            self.LA_prob[index][int(self.LA_choice[index])]=1-temp_sum
        for i in range(self.outputsize):
            self.LA_weights[int(self.LA_choice[i])][i]=self.weights[i]
            self.LA_biases[int(self.LA_choice[i])][i]=self.biases[i]

    
class Model:
    '''
    It creates the layered model, has train, test functions
    '''
    def __init__(self,no_of_hidden_layers,list_of_activations,learning_rate,epochs,la_size):
        self.no_of_hidden_layers = no_of_hidden_layers
        self.list_of_activations=list_of_activations
        self.learning_rate=learning_rate
        self.epochs=epochs
        layers=[]
        for i in range(1,len(self.no_of_hidden_layers)):
            if i==len(self.no_of_hidden_layers)-1:
                layers.append(Layer(self.no_of_hidden_layers[i-1],self.no_of_hidden_layers[i],
                                    self.list_of_activations[i-1],la_size,is_outputlayer=True))
            else:
                layers.append(Layer(self.no_of_hidden_layers[i-1],self.no_of_hidden_layers[i],
                                    self.list_of_activations[i-1],la_size))
        self.layers=layers
    
    def train(self,input,output,verbose=True):
        for each in range(self.epochs):
            cnt = 0
            error = 0
            for i in range(len(input)):
                x=input[i]
                max_v,max_i,max_j=0,0,0
                for j in range(len(self.layers)):
                    x=self.layers[j].feedforward(x)
                    if (max_v<x[np.argmax(x)]):
                        max_v = x[np.argmax(x)]
                        max_i = j
                        max_j = np.argmax(x)
                error+=(-1)*(np.sum(output[i]*np.log(x+1e-8)))
                if (np.argmax(x)==np.argmax(output[i])):
                    cnt+=1
                
                next_layer_weights=np.array([1]).reshape(1,1)
                next_delta=np.array([1]).reshape(1,1)
                for j in range(len(self.layers)-1,-1,-1):
                    next_delta=self.layers[j].backpropagate(next_layer_weights,
                                                            next_delta,self.learning_rate,output[i])
                    next_layer_weights=self.layers[j].weights
                    self.layers[j].update_weights(j==max_i, max_j)
            error=error/len(input)
            acc=cnt/len(input)
            acc*=100
            if verbose:
                print("Epoch-",each,"-- error =",error,"-- accuracy =",acc)
        
    def test(self,input,output,verbose=True):
        cnt=0
        ypred=[]
        ytest=output
        for i in range(len(input)):
            x=input[i]
            for j in range(len(self.layers)):
                x=self.layers[j].feedforward(x)
            predicted=np.argmax(x)
            ypred.append(predicted)
            if (predicted==output[i]):
                cnt+=1
            if verbose:
                print("Predicted =",x.reshape(1,-1),"  ",predicted," Actual =",output[i])
        tn, fp, fn, tp = confusion_matrix(ytest, ypred).ravel()
        if verbose:
            print(confusion_matrix(ytest, ypred))
            print("Accuracy =", (tp + tn) / (tp + fp + fn + tn))
            print("Precision =", tp / (tp + fp))
            print("Recall =", tp / (tp + fn))
            print("False Alarm =", fp / (tn + fp))
            print(classification_report(ytest, ypred))
        acc = (tp + tn) / (tp + fp + fn + tn)
        pre = tp / (tp + fp)
        recall = tp / (tp + fn)
        false_alarm = fp / (tn + fp)
        #print("Predicted Correctly ",cnt,"/",len(input),"accuracy =",acc)
        return acc,pre,recall,false_alarm

def cross_validate(X,Y,k=10):
    '''
    perform k-fold cross validation
    X-training tuples; Y-target values;k-kfold value
    output:print k-fold avg. accuracy
    '''
    print(k,"- Fold Cross Validation")
    kf = StratifiedKFold(n_splits=k,shuffle=True)
    measures = []
    for i in range(4):
        measures.append([])
    tempy=np.argmax(Y,axis=1)
    tempx=np.zeros((len(X),))
    for train_index,test_index in kf.split(tempx,tempy):
        xtrain,ytrain,xtest,ytest=[],[],[],[]
        for i in train_index:
            xtrain.append(X[i])
            ytrain.append(Y[i])
        for i in test_index:
            xtest.append(X[i])
            ytest.append(Y[i])
        model=Model([xtrain[0].shape[0],20,10,2],[relu,relu,softmax],eta,max_epochs,la_size)
        model.train(xtrain,ytrain,verbose=False)
        ytest=np.argmax(ytest,axis=1)
        temp_measures=model.test(xtest,ytest,verbose=False)
        for i in range(len(temp_measures)):
            measures[i].append(temp_measures[i])
        
        print("accuracy =",temp_measures[0],"precision =",temp_measures[1],
              "recall =",temp_measures[2],"false alarm =",temp_measures[3])
    print(k,"- fold cross validation Avg. Measures =",np.average(measures,axis=1))

if __name__ == "__main__":
    xtrain,ytrain,xtest,ytest=get_data()
    model=Model([xtrain[0].shape[0],20,10,2],[relu,relu,softmax],eta,max_epochs,la_size)
    model.train(xtrain,ytrain)
    model.test(xtest,np.array(ytest))
    test_y=[]
    class_labels = np.unique(ytest)
    for y in ytest:
        temp = np.zeros((len(class_labels),1))
        temp[y] = 1.0
        test_y.append(temp)
    X = xtrain + xtest
    Y = ytrain + test_y
    cross_validate(X,Y,10)



Counter({0: 449, 1: 449})
Epoch- 0 -- error = 1.778306887464223 -- accuracy = 55.98885793871866
Epoch- 1 -- error = 0.672863005582176 -- accuracy = 62.116991643454035
Epoch- 2 -- error = 0.6628165610751805 -- accuracy = 62.116991643454035
Epoch- 3 -- error = 0.6646027806813407 -- accuracy = 62.67409470752089
Epoch- 4 -- error = 0.6581329384908579 -- accuracy = 64.06685236768801
Epoch- 5 -- error = 0.6559074694140021 -- accuracy = 65.18105849582173
Epoch- 6 -- error = 0.6502585841278102 -- accuracy = 64.20612813370474
Epoch- 7 -- error = 0.6540173397283995 -- accuracy = 64.06685236768801
Epoch- 8 -- error = 0.6393509291945118 -- accuracy = 65.59888579387186
Epoch- 9 -- error = 0.6408295430630838 -- accuracy = 65.18105849582173
Epoch- 10 -- error = 0.6415384728869146 -- accuracy = 65.18105849582173
Epoch- 11 -- error = 0.6403838517649986 -- accuracy = 65.87743732590529
Epoch- 12 -- error = 0.6326727021943589 -- accuracy = 66.15598885793872
Epoch- 13 -- error = 0.6345254539178106 -- accur

Classification Model using normal NN with kc2 dataset

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
import random

def preprocess(data):
    '''
    perform log normalization and min-max normalization
    '''
    #Log Transformation
    data=np.ma.log(data.astype('float32')).filled(0)
    #min max normalization
    max_in_columns=np.max(data,0)
    min_in_columns=np.amin(data,0)
    for i in range(data.shape[1]):
        data[:,i]=data[:,i]-min_in_columns[i]
        if (max_in_columns[i]!=min_in_columns[i]):
            data[:,i]=data[:,i]/(max_in_columns[i]-min_in_columns[i])
    return data


def get_data():
    '''
    read data from k2.csv file; split into X,Y
    preprocess X
    split X,Y into x_train,x_test,y_train,y_test
    '''
    df=pd.read_csv("kc2.csv")
    data=np.array(df)
    np.random.shuffle(data)
    x=data[:,:-1]
    y=data[:,-1:]
    x=np.asarray(x).astype('float32')
    xtrain=x[0:int(x.shape[0]*0.8)]
    xtest=x[int(x.shape[0]*0.8):]
    xtrain=preprocess(xtrain)
    xtest=preprocess(xtest)
    class_labels=np.unique(y)
    for i in range(len(y)):
      if y[i] in class_labels:
        y[i]=np.where(class_labels == y[i])
    y=np.asarray(y).astype('int')
    ytrain=y[0:int(y.shape[0]*0.8)]
    ytest=y[int(y.shape[0]*0.8):]
    return xtrain,ytrain,xtest,ytest

def get_model(n):
    #defining model structure
    model =tf.keras.models.Sequential()
    #input layer
    model.add(tf.keras.Input(shape=(n,)))
    #first hidden layer
    model.add(tf.keras.layers.Dense(20,activation=tf.nn.tanh))
    #second hidden layer
    #model.add(tf.keras.layers.Dense(10,activation=tf.nn.relu))
    #third hidden layer
    model.add(tf.keras.layers.Dense(6,activation=tf.nn.relu))
    #output layer
    model.add(tf.keras.layers.Dense(2,activation=tf.nn.softmax))
    model.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics=['accuracy'])
    return model

def cross_validate(X,Y,k=10):
    '''
    perform k-fold cross validation
    X-training tuples; Y-target values;k-kfold value
    output:print k-fold avg. accuracy
    '''
    kf = StratifiedKFold(n_splits=k,shuffle=True)
    Accuracy = []
    for train_index,test_index in kf.split(X,Y):
        print(train_index.shape,test_index.shape)
        X_train,X_test,Y_train,Y_test = X[train_index],X[test_index],Y[train_index],Y[test_index]
        n = X_train.shape[1]
        model = get_model(n)
        model.fit(X_train,Y_train,epochs=100,verbose=0)
        loss,acc = model.evaluate(X_test,Y_test,verbose=0)
        print("Accuracy =",acc)
        Accuracy.append(acc)
    Average = sum(Accuracy)/len(Accuracy)
    print(k,"- fold cross validation Avg. accuracy =",Average)

if __name__ == "__main__":
    xtrain,ytrain,xtest,ytest=get_data()
    print(xtrain.shape,ytrain.shape,xtest.shape,ytest.shape)
    #no. of attributes
    n=xtrain.shape[1]
    model = get_model(n)
    model.fit(xtrain, ytrain, epochs=100)
    
    # evaluate the model
    loss,acc=model.evaluate(xtest,ytest)
    ypred=model.predict(xtest)
    for i in range(len(ypred)):
        print(np.argmax(ypred[i]),ytest[i])
    print("Accuracy = ",acc)

    #cross-validation
    X=np.vstack((xtrain,xtest))
    Y=np.vstack((ytrain,ytest))
    cross_validate(X,Y,10)