In [36]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler,StandardScaler  # for normalization
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,f1_score,precision_score,recall_score,roc_curve,auc
from sklearn.utils import shuffle

import random
from sklearn.linear_model import SGDClassifier #imported for comparison
from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix
random.seed(12)
np.random.seed(123)


In [46]:
#For the given file name and target label name, data is extracted from the file and converted into 
#features and labels, X and Y, respectively
def readingToDF(filename,Target_label):
    with open(filename) as f: 
        all_data = f.readlines()
    data=[]
    for index in all_data:
        data.append(index.split())
    features=[]
    for feature in data[0]:
        features.append(feature)
    data.pop(0)
    #converting to dataframe
    train_data = pd.DataFrame(data = np.array(data), columns=features)
    #encoding the data into -1 and 1. -1 represents fire as 'no' and 1 with 'yes'
    train_data[Target_label]=train_data[Target_label].map({'yes': 1.0, 'no': -1.0})
    #selecting the target label
    Y = train_data[Target_label]
    # selecting all rows and columns excluding 1 representing the label
    X = train_data.iloc[:,1:]
    return X,Y

As described in the assignment details, test and train data should be split in 1:2 ratio 10 times to introduce randomness and perform better evaluations.

In [48]:
# element of randomness is added and the data is normalized using the standard scaler
def CreateTrainAndTest(randomseed,X,Y):
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.33, random_state=randomseed)
    scaler = StandardScaler()
    # normalize data for better convergence and to prevent overflow
    X_train_normalized = scaler.fit_transform(X_train)  
    X_test_normalized=scaler.transform(X_test)
    X_train = pd.DataFrame(X_train_normalized)
    X_test=pd.DataFrame(X_test_normalized)
    return X_train, X_test, y_train, y_test


Algorithm:
1. fit: 
        a.initialize the weights and biases
        b.repeat the below steps as many times as the epoch or until the loss function steadies
        c.for each data point (X,Y), calculate the derivative of the loss function. This gives the           value of dw and db
        d.update the current W and b by a factor of dw and db mutiplied by learning rate.
2.  predict:
        a.use the value of W to calculate f(x)=Xi*W and take the sign of the value.
        b. If the sign is +ve, fire is 'yes' else, 'no'


In [57]:
class SVM(object):

    def __init__(self,featuresize):
        
        self.biases = 1
        self.epochs=1000
        self.loss_values=[]
        self.lr=0.0001
        self.C=2
        self.weights=[]
        self.precision=[]
        self.recall=[]
        self.f1=[]
        self.accuracy=[]
        
    def fit(self,X_train, y_train):
        # stochastic gradient descent
        #initializing weights
        weights = np.zeros(X_train.shape[1])
        b=1
        for epoch in range(self.epochs):
            # to introduce an element of randomness
           # X, Y = shuffle(X_train, y_train)
            X, Y = X_train, y_train

            l=0 #to calculate the loss
            for ind, x in enumerate(X):
                l+=self.calculate_loss(weights,x, Y[ind],b)# loss calculated according to the equation
                #so that it can be plotted.
                dl,db= self.calculate_cost_gradient(weights,x, Y[ind],b)#The gradient of the weights and the biases are calculated
                weights = weights - (self.lr * dl) # update weights
                b=b-(self.lr*db)   #update bias
            l=l/len(X)
            self.loss_values.append(l)
        self.weights=weights
        return weights
        # formula in the report is directly applied ie derivative of the loss function
    def calculate_cost_gradient(self,W, x, y,b):
        distance = 1 - (y * (np.dot(W.T, x)+b)) 
        if distance<0:
            dw = W
            db=0
        else:
            dw =W - (self.C * y * x)
            db=-y
        return dw,db
    # equation given in report. Just used to plot the loss graph. Not used directly 
    def calculate_loss(self,W, x, y,b):
        hinge=1 - (y * (np.dot(x, W)+b))
        if hinge<0:
            hinge=0
        l=self.C*hinge+(np.dot(W.T,W)/2)
        return l
    #predication is made by using the calculated weights on the datapoint and taking the sign.
    def predict(self,X):
        label_predicted = np.array([])
        for i in range(X.shape[0]):
            yp = np.sign(np.dot(X.to_numpy()[i], self.weights))
            label_predicted = np.append(label_predicted, yp)
        return label_predicted
    # just to plot the variation in loss function 
    def plot_loss(self):
        plt.plot(self.loss_values)
        plt.title('Variation in loss values with epoch')
        plt.xlabel('Number of epochs')
        plt.ylabel('Loss Value')
        plt.show()
        

In [50]:
# to plot the Roc Curve
def plot_roc(tr_false_pos,tr_true_pos, tt_false_pos,tt_true_pos,auc_tr,auc_tt):
    fig4 = plt.figure(figsize=(5,5))
    plt.plot(tr_false_pos, tr_true_pos, label = 'ROC Curve for Train Data: '.format(auc_tr))
    plt.plot(tt_false_pos, tt_true_pos, label = 'ROC Curve for Test Data'.format(auc_tt))
    plt.legend()
    plt.show()


In [63]:
#Print relevant data for each iteration
def evaluate_metrics(y,y_test,pred_train,pred_test):
    accuracy=round(accuracy_score(y_test, pred_test),4)
    precision=round(precision_score(y_test, pred_test),4)
    recall=round(recall_score(y_test, pred_test),4)
    f1=round(f1_score(y_test, pred_test),4)
    print("Evaluation of the Current Run of the Model")
    print('Accuracy of test data: %.3f' % accuracy_score(y_test, pred_test))
    # train data is also evaluated to understand underfitting and overfitting.
    #Since the accuracy of training data is high, overfitting is very likely
    print('Accuracy of train data: %.3f' % accuracy_score(y, pred_train)) 

    print('Precision of test data: %.3f' % precision_score(y_test, pred_test))
    print('Precision of train data: %.3f' % precision_score(y, pred_train))

    print('Recall of test data: %.3f' % recall_score(y_test, pred_test))
    print('Recall of train data: %.3f' % recall_score(y, pred_train))

    print('f1_score of test data: %.3f' % f1_score(y_test, pred_test))
    print('f1_score of train data: %.3f' % f1_score(y, pred_train))
    tr_false_pos,tr_true_pos,tr_thres=roc_curve(y, pred_train)
    tt_false_pos,tt_true_pos,tt_thres=roc_curve(y_test, pred_test)
    auc_tr=auc(tr_false_pos, tr_true_pos)
    print('auc of train data: %.3f' % auc_tr)

    auc_tt=auc(tt_false_pos, tt_true_pos)
    print('auc of test data: %.3f' % auc_tt)

    plot_roc(tr_false_pos,tr_true_pos, tt_false_pos,tt_true_pos,auc_tr,auc_tt)
    return accuracy,precision,recall,f1

In [27]:
def confusion_matrix_plot_new(cm,cm_count,ModelType):
    cm=cm/cm_count            
    df_cm = pd.DataFrame(cm, index = [i for i in ["Truelabel -1","True label 1"]],
                  columns = [i for i in ["Predicted label -1","Predicted label 1"]])
    plt.figure(figsize = (5,4))
    if ModelType==1:
        plt.title("Confusion Matrix for SkLearn Test Data")
    elif ModelType==0:
        plt.title("Confusion Matrix MySVM Test Data")
    elif ModelType==2:
        plt.title("Confusion Matrix MySVM Train Data")
    else:
        plt.title("Confusion Matrix for SkLearn Train Data")
    sns.heatmap(df_cm, cmap="Blues",annot=True)

In [51]:
#to find the average value of the 10 confusion matrix 
def calculate_confusion_matrix(cm,cm_count,y_true,y_pred):
    cm_new=confusion_matrix(y_true,y_pred)
    cm=cm+cm_new
    cm_count=cm_count+1
    return cm,cm_count
   

SGD classifier with hinge loss function is used to compare my implementation as this is similar to the approach taken for my SVM implementation

In [62]:

def SKlearnSVM(cm1,cm1_count,cm1_t,cm1_ct_t,i):
    SVC=SGDClassifier(eta0=0.0001, learning_rate='constant')
    
    print("SK LEARN Linear SVM CLASSIFIER Test Run ",i,"------------------------------------")
    SVC.fit(X_train,y_train)
    pred_test = SVC.predict(X_test)
    pred_train = SVC.predict(X_train)
    print('weights of sklearn Classifier = ',SVC.coef_)
    #for average Calculation
    accuracy,precision,recall,f1=evaluate_metrics(y_train,y_test,pred_train,pred_test)
    Accuracy_skLearn.append(accuracy)
    Precision_skLearn.append(precision)
    Recall_skLearn.append(recall)
    f1_skLearn.append(f1)
    #for confusion matrix plot
    cm1,cm1_count=calculate_confusion_matrix(cm1,cm1_count,y_test,pred_test)
    cm1_t,cm1_ct_t=calculate_confusion_matrix(cm1_t,cm1_ct_t,y_train,pred_train)
    print(" ----------------------------------------------------------------------------------- ")
    return cm1,cm1_count,cm1_t,cm1_ct_t

In [60]:
def MySVM(cm,cm_count,cm_t,cm_ct_t,n ):
    SVMClf=SVM(X_train.shape[1])
    print("Linear SVM CLASSIFIER Custom Implementation Test run ",n,"----------------------------------")
    W=SVMClf.fit(X_train.to_numpy(),y_train.to_numpy()) # Calculates the weights and biases to classify with the train data
    print("weights of My SVM Classifier {} ".format(W))
    #predicts train and test data
    yp_test=SVMClf.predict(X_test)
    yp=SVMClf.predict(X_train) 
    #prints the results
    accuracy,precision,recall,f1=evaluate_metrics(y_train,y_test,yp,yp_test)
    #for average Calculation
    Accuracy_New_impl.append(accuracy)
    Precision_New_impl.append(precision)
    Recall_New_impl.append(recall)
    f1_New_impl.append(f1)
    #Plotting Confusion matrix
    cm,cm_count=calculate_confusion_matrix(cm,cm_count,y_test,yp_test)
    cm_t,cm_ct_t=calculate_confusion_matrix(cm_t,cm_ct_t,y_train,yp)
    #Writing the results of each prediction to a file as required in the assignment
    df = pd.DataFrame({'Test: Predicted': yp_test, 'Test: Actual': y_test} )  
    filename = 'result_{}.csv'.format(n)
    df.to_csv(filename,index=False)
    #plotting the loss values stored in the class
    SVMClf.plot_loss()
    print(" ----------------------------------------------------------------------------------------")
    return cm,cm_count,cm_t,cm_ct_t


In [61]:
#Printing the result of overall test done over 10 iterations with different split of test and train
def calculate_test_results():
    Tlen=len(Accuracy_New_impl)
    TotalA=TotalP=TotalR=Totalf1=0
    TotalSkA=TotalSkP=TotalSkR=TotalSkf1=0

    for (accuracy,precision,recall,f1) in zip(Accuracy_New_impl,Precision_New_impl,Recall_New_impl,f1_New_impl):
        TotalA+=accuracy
        TotalP+=precision
        TotalR+=recall
        Totalf1+=f1
    print('Evaluation of from Scratch Implementation of SVM:') 
    print('Average Accuracy  %.3f' % (TotalA/Tlen)) 
    print('Average recall=  %.3f' % (TotalR/Tlen)) 
    print('Average precision=  %.3f' % (TotalP/Tlen) )
    print('Average F1_score=  %.3f' %(Totalf1/Tlen) )

    for (accuracy,precision,recall,f1) in zip(Accuracy_skLearn,Precision_skLearn,Recall_skLearn,f1_skLearn):
        TotalSkA+=accuracy
        TotalSkP+=precision
        TotalSkR+=recall
        TotalSkf1+=f1
    print('Evaluation of from SK LEARN Implementation of SVM:') 
    print('Average Accuracy  %.3f' % (TotalSkA/Tlen) )
    print('Average recall=  %.3f' % (TotalSkR/Tlen) )
    print('Average precision=  %.3f' % (TotalSkP/Tlen)) 
    print('Average F1_score=  %.3f' %( TotalSkf1/Tlen)) 

Run the below cell only to run the machine learning algorithm. Expected format is txt or CSV files that are tab or space seperated. The features should be listed in the first row and the target variable should be passed along with the name of the file to readingToDF(). Additionaly, only Binary classification problems can be handled using this implementation.

In [None]:
Accuracy_New_impl=[]
Accuracy_skLearn=[]
Precision_New_impl=[]
Precision_skLearn=[]
Recall_New_impl=[]
Recall_skLearn=[]
f1_New_impl=[]
f1_skLearn=[]
cm=cm1=cm_train=cm1_train=np.array([[0, 0],[0 ,0]])#parameters to store the average confusion matrix
cm_count=cm1_count=cm_ct_train=cm1_ct_train=0
X,Y=readingToDF('wildfires.txt','fire')#Make changes to this function to try a different dataset
for i in range(10):
    X_train, X_test, y_train, y_test=CreateTrainAndTest(i,X,Y)#Creates a differenttrain test split every time
    cm,cm_count,cm_train,cm_ct_train=MySVM(cm,cm_count,cm_train,cm_ct_train,i)#creates an instance of SVM
    cm1,cm1_count,cm1_train,cm1_ct_train=SKlearnSVM(cm1,cm1_count,cm_train,cm_ct_train,i)#creates an instance of SK Learn SVM

calculate_test_results()#Print the average Data of the 10 iterations for comparison
confusion_matrix_plot_new(cm,cm_count,0) # outputs the Test data confusion matrix for custom implementaion
confusion_matrix_plot_new(cm1,cm1_count,1) # outputs the Test data confusion matrix for SGD Classifier in SK Learn
confusion_matrix_plot_new(cm_train,cm_ct_train,2)# outputs the Train data confusion matrix for custom implementaion
confusion_matrix_plot_new(cm1_train,cm1_ct_train,3)# outputs the Train data confusion matrix for SGD Classifier in SK Learn

