In [None]:
%autosave 15
%matplotlib inline

import numpy as np
import scipy as sp
import pandas as pd
import math as math
import random
import os

from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
from IPython.display import display, HTML

In [None]:
SPAM = "spmsg"
LEGIT = "legit"

class Email:
    def __init__(self, filename, lines):
        self.filename = filename
        self.isSpam = SPAM in filename
        
        self.title = list(map(int, (lines[0].strip().split())[1:]))
        self.body = []
        for line in lines[2:]:
            self.body += list(map(int, line.strip().split()))
    
    def __repr__(self):
        return str(self)
    
    def __str__(self):
        kind = "LEGIT"
        if (self.isSpam):
            kind = "SPAM"
        return kind + " EMAIL " + self.filename + ": " + str(self.title) + ", " + str(self.body) + "\n"

In [None]:
class DictValue:
    def __init__(self):
        self.sTitle = 0
        self.lTitle = 0
        self.sBody = 0
        self.lBody = 0
        
    def foundInTitle(self, isSpam):
        if (isSpam):
            self.sTitle += 1
        else:
            self.lTitle += 1
            
    def foundInBody(self, isSpam):
        if (isSpam):
            self.sBody += 1
        else:
            self.lBody += 1
    
    def __repr__(self):
        return str(self)
    
    def __str__(self):
        t = (self.sTitle, self.lTitle)
        b = (self.sBody, self.lBody)
        return "TITLE: " + str(t) + ", BODY: " + str(b)

In [None]:
PARTS_DIR = "./Bayes/pu1/"
PARTS_NUM = 10

def getData():
    parts = []  
    for i in range(PARTS_NUM):
        curPart = PARTS_DIR + "part" + str(i + 1)
        emails = []
        for filename in os.listdir(curPart):
            file = open(curPart + "/" + filename, 'r')
            emails.append(Email(filename, file.readlines()))
            file.close()
        parts.append(emails)
    return parts

parts = getData()

In [None]:
# takes: array of emails
# returns: Dict where keys are words and values are DictValues (contain key-word statistic for all given emails)
def learnBayes(emails):
    dict = {}
    for email in emails:
        for word in email.title:
            dict.setdefault(word, DictValue()).foundInTitle(email.isSpam)
        for word in email.body:
            dict.setdefault(word, DictValue()).foundInBody(email.isSpam)
    return dict
            
#print(learnBayes(parts[0]))

In [None]:
# spamWeight, titleWeight = (0..1)
def testBayes(spamDict, emails, spamWeight = 0.5, titleWeight = 0.5):
    TP = 0
    FP = 0
    TN = 0
    FN = 0
    for email in emails:
        p_spam = math.log(spamWeight)
        p_legit = math.log(1 - spamWeight)
        
        for word in email.title:
            wf = spamDict.get(word, DictValue())
            if (wf.sTitle + wf.lTitle > 0):
                x = titleWeight / (wf.sTitle + wf.lTitle)
                if (wf.sTitle > 0):
                    p_spam += math.log(wf.sTitle * x)
                if (wf.lTitle > 0):
                    p_legit += math.log(wf.lTitle * x)
                    
        for word in email.body:
            wf = spamDict.get(word, DictValue())
            if (wf.sBody + wf.lBody > 0):
                x = (1 - titleWeight) / (wf.sBody + wf.lBody)
                if (wf.sBody > 0):
                    p_spam += math.log(wf.sBody * x)
                if (wf.lBody > 0):
                    p_legit += math.log(wf.lBody * x)
                    
        #res.append((email, p_legit >= p_spam))
        #res.append((email.isSpam, p_spam >= p_legit))
        isCurrentSpam = p_spam >= p_legit
        if (email.isSpam != isCurrentSpam):
            if isCurrentSpam:
                FP += 1
            else:
                FN += 1
        else:
            if isCurrentSpam:
                TP += 1
            else:
                TN += 1
    if TP == 0:
        return 0
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)

    return 2 * (precision * recall) / (precision + recall)

In [None]:
def cv(spamWeight, titleWeight):
    best_result = None
    best_score  = -1

    for i in range(PARTS_NUM):
        learningSet = []
        for j in range(PARTS_NUM):
            if (j != i):
                learningSet += parts[j]
        testingSet  = parts[i]
    
        spamDict = learnBayes(learningSet)
        score = testBayes(spamDict, testingSet, spamWeight, titleWeight)
        if score > best_score:
            best_score = score
            best_result = spamDict
        print(score)
    # and average f1 score for given parameters
#     return best_score, best_result

cv(1e-9, 1e-9)

In [None]:
def kNN(metric, data, classes, kernel, cv_params = (1, 10)):
    
    def compute_score(k, train_suit, test_suit):
        true_positive = [0, 0]
        false_positive = [0, 0]
        all_points = [0, 0]

        for point in test_suit:
            predicted = predict_class(k, metric, train_suit, classes, point, kernel)
            real_class = int(classes[point])
            
            if predicted == real_class:
                true_positive[predicted] += 1
            else:
                false_positive[predicted] += 1
            all_points[real_class] += 1
                
        if true_positive[1] > 0:
            recall = true_positive[1] / all_points[1]
            precision = true_positive[1] / (true_positive[1] + false_positive[1])
            # F1 measure
            return 2 * (precision * recall) / (precision + recall)
        return 0


    tfold, kfold = cv_params
    max_neighbors = 0
    max_score = 0
    for neighbors in range(2, int(np.sqrt(len(data)))):
        average_score = 0
        for i in range(tfold):
            kfold_index = k_fold_cv(kfold, len(data))
            score = 0
            for train_suit, test_suit in kfold_index:
                training_suit = [data[i] for i in train_suit]
                testing_suit = [data[i] for i in test_suit]
                score += compute_score(neighbors, training_suit, testing_suit)
            average_score += score
        
        average_score /= (tfold * kfold)
        
        if average_score > max_score:
            max_neighbors = neighbors
            max_score = average_score
    
    return (max_neighbors, max_score)

In [None]:
results = pd.DataFrame(columns=['k', 'metric', 'accuracy'])


best_accuracy = 0
best_kernel   = None
best_metric   = None
best_data     = None
best_neighbor = None
best_kfold = None

pd.set_option('display.height', 250)
pd.set_option('display.max_rows', 250)

for kfold in range(5, 11):
    for kernel in kernels:
        for metric in metrics:
            for input_data, classes, transform_name, transform in data:
                k, accuracy = (kNN(metric, input_data, classes, kernel[0], (1, kfold)))
                raw = pd.DataFrame([[kfold, k, metric_names[metric], transform_name, accuracy, kernel[1]]], columns=['folds', 'k', 'metric', 'transformation name', 'accuracy', 'kernel'])
                results = results.append(raw, ignore_index=True)
            
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    best_kernel = kernel[0]
                    best_metric = metric
                    best_data = (input_data, classes, transform)
                    best_neighbor = k
                    best_kfold = kfold
                
                    best_string = str(accuracy) + ', ' + str(k) + ', ' + kernel[1] + ', ' + metric_names[metric] + ", " + transform_name + " folds: " + str(best_kfold)

display(results)