In [1]:
import re
import time
import numpy as np
import pandas as pd
from pathlib import Path

from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score
from scipy.special import expit
from scipy.stats import norm

from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier

import plotly.offline as py
import plotly.graph_objs as go
py.init_notebook_mode(connected=True)


DATASETS_DIR = Path('Datasets')


def load_cancer_dataset():
    df = pd.read_csv(DATASETS_DIR / 'cancer.csv')
    _, y = np.unique(df['label'].to_numpy(), return_inverse=True)
    X = df.drop('label', axis=1).to_numpy()
    return X, y


def load_blobs2_dataset():
    df = pd.read_csv(DATASETS_DIR / 'blobs2.csv')
    _, y = np.unique(df['label'].to_numpy(), return_inverse=True)
    X = df.drop('label', axis=1).to_numpy()
    return X, y


def load_spam_dataset():
    df = pd.read_csv(DATASETS_DIR / 'spam.csv')
    _, y = np.unique(df['label'].to_numpy(), return_inverse=True)
    X = df.drop('label', axis=1).to_numpy()
    return X, y


def load_smsspam_dataset():
    df = pd.read_csv(DATASETS_DIR / 'smsspam.csv')
    y = df['label'].to_numpy()
    y[y == 'ham'] = 1
    y[y == 'spam'] = 0
    y = y.astype(np.int32)
    X = df['text'].to_numpy()
    return X, y

# SVM

### Blobs2

In [8]:
def test_svm_blobs2(X, y, params, plt_title):
    model = SVC(**params)
    model.fit(X, y)
    train_score = accuracy_score(y, model.predict(X))
    
    xlim = min(X[:,0]), max(X[:,0])
    ylim = min(X[:,1]), max(X[:,1])
    xx, yy = np.linspace(xlim[0], xlim[1], 50), np.linspace(ylim[0], ylim[1], 50)
    grid_x, grid_y = np.meshgrid(xx, yy)
    grid = np.vstack((grid_x.ravel(), grid_y.ravel())).T
    z = model.decision_function(grid)
    
    traces = [
        go.Scatter(x=grid[z > 0][:,0], y=grid[z > 0][:,1], 
                   mode='markers', name='positive hyperplane',
                   marker=dict(color='red', opacity=0.05, size=20), showlegend = False),
        go.Scatter(x=grid[z < 0][:,0], y=grid[z < 0][:,1], 
                   mode='markers', name='positive hyperplane',
                   marker=dict(color='green', opacity=0.05, size=20), showlegend = False),
        go.Scatter(x=model.support_vectors_[:, 0], y=model.support_vectors_[:, 1], 
                   mode='markers', name='support vectors', 
                   marker=dict(color='white', line=dict(color='black', width=2))),
        go.Scatter(x=X[y == 1][:, 0], y=X[y == 1][:, 1], mode='markers', name='positive', marker=dict(color='#1f77b4')),
        go.Scatter(x=X[y == 0][:, 0], y=X[y == 0][:, 1], mode='markers', name='negative', marker=dict(color='orange')),
    ]
    layout = go.Layout(title=f'{plt_title}, score={train_score}')
    figure = go.Figure(data=traces, layout=layout)
    py.iplot(figure)

In [9]:
X, y = load_blobs2_dataset()
test_svm_blobs2(X, y, {'C': 10, 'kernel': 'linear'}, 'Linear')
test_svm_blobs2(X, y, {'C': 10, 'kernel': 'rbf', 'gamma': 'scale'}, 'RBF')
test_svm_blobs2(X, y, {'C': 10, 'kernel': 'poly', 'gamma': 'scale', 'degree': 2}, 'Poly 2 deg')
test_svm_blobs2(X, y, {'C': 10, 'kernel': 'poly', 'gamma': 'scale', 'degree': 3}, 'Poly 3 deg')
test_svm_blobs2(X, y, {'C': 10, 'kernel': 'poly', 'gamma': 'scale', 'degree': 5}, 'Poly 5 deg')

In [10]:
def get_execution_time(void_func):
    start = time.process_time()
    void_func()
    end = time.process_time()
    return end - start


def filter_results(time, accuracy):
    filtered_time, filtered_accuracy = [time[0]], [accuracy[0]]
    for i in range(1, len(time)):
        if accuracy[i] > filtered_accuracy[-1] + 1e-5:
            filtered_time.append(time[i])
            filtered_accuracy.append(accuracy[i])
    return filtered_time, filtered_accuracy


def test_svm(X_train, y_train, X_val, y_val):
    time_svm, accuracy_svm = [], []
    
    params = [
        {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13},
        {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13},
        {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13},
        {'kernel': 'poly', 'degree': 3, 'gamma': 'scale', 'random_state': 13}
    ]
    for C in [0.1, 0.5, 1]:
        for param in params:
            param['C'] = C
            svm = SVC(**param)

            print(f'fitting SVM on {param}')
            execution_time = get_execution_time(lambda : svm.fit(X_train, y_train))
            score = accuracy_score(y_val, svm.predict(X_val))

            time_svm.append(execution_time)
            accuracy_svm.append(score)
    
    time_svm = np.array(time_svm)
    accuracy_svm = np.array(accuracy_svm)
    args = np.argsort(time_svm)
    
    time_svm = time_svm[args]
    accuracy_svm = accuracy_svm[args]
    return filter_results(time_svm, accuracy_svm)


def test_random_forest(X_train, y_train, X_val, y_val):
    time_rf, accuracy_rf = [], []
    
    for max_depth in [1, 2, 3, 4, 5, 6]:
        print(f'fitting RF with max_depth={max_depth}')
        random_forest = RandomForestClassifier(
            criterion='entropy', 
            n_estimators=0, 
            max_depth=max_depth, 
            warm_start=True, 
            random_state=13, 
        )
        for i in range(300):
            random_forest.n_estimators += 1
            execution_time = get_execution_time(lambda : random_forest.fit(X_train, y_train))
            score = accuracy_score(y_val, random_forest.predict(X_val))

            if i > 0:
                execution_time += time_rf[-1]
            time_rf.append(execution_time)
            accuracy_rf.append(score)
            
    time_rf = np.array(time_rf)
    accuracy_rf = np.array(accuracy_rf)
    args = np.argsort(time_rf)
    
    time_rf = time_rf[args]
    accuracy_rf = accuracy_rf[args]
    return filter_results(time_rf, accuracy_rf)
    
    
def compare_svm_and_random_forest(X, y, dataset_name):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)
    
    time_svm, accuracy_svm = test_svm(X_train, y_train, X_val, y_val)
    time_rf, accuracy_rf = test_random_forest(X_train, y_train, X_val, y_val)
    
    traces = [
        go.Scatter(x=accuracy_rf, y=time_rf, mode='lines', name='Random Forest'),
        go.Scatter(x=accuracy_svm, y=time_svm, mode='lines', name='SVM')
    ]
    layout = go.Layout(
        title=f'Comparison of Random Forest and SVM classifiers on {dataset_name} dataset', 
        xaxis=dict(title='accuracy'),
        yaxis=dict(title='time (sec.)')
    )
    figure = go.Figure(data=traces, layout=layout)
    py.iplot(figure)

In [11]:
compare_svm_and_random_forest(*load_cancer_dataset(), 'Cancer')
compare_svm_and_random_forest(*load_spam_dataset(), 'Spam')

fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'poly', 'degree': 3, 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'poly', 'degree': 3, 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'poly', 'd

fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'poly', 'degree': 3, 'gamma': 'scale', 'random_state': 13, 'C': 0.1}
fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'poly', 'degree': 3, 'gamma': 'scale', 'random_state': 13, 'C': 0.5}
fitting SVM on {'kernel': 'linear', 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'rbf', 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'poly', 'degree': 2, 'gamma': 'scale', 'random_state': 13, 'C': 1}
fitting SVM on {'kernel': 'poly', 'd

In [3]:
class GaussianNB:
    def __init__(self):
        self.probs = None
        self.class_map = None
        self.inv_class_map = None
        self.classes = None
        self.classes_count = None
        self.classes_prob = None
        
    def fit(self, X, y):
        cls, y = np.unique(y, return_inverse=True)
        
        self.class_map = {c : i for i, c in enumerate(cls)}
        self.inv_class_map = cls
        self.classes = len(cls)
        self.classes_count = np.array([sum(y == c) for c in range(self.classes)])
        self.classes_prob = np.array([c / X.shape[0] for c in self.classes_count])
        
        self.probs = np.zeros((X.shape[1], self.classes, 2))
        for i in range(X.shape[1]):
            for j in range(self.classes):
                cur = X[:, i][y == j]
                self.probs[i, j] = (np.average(cur), np.var(cur))
    
    def _prob(self, x, y):
        prob = 0
        for i, (mu, sigma2) in enumerate(self.probs[:, y]):
            prob += -(x[i] - mu)**2 / (2 * sigma2) - 0.5 * np.log(2 * np.pi * sigma2)
        return prob
    
    def predict_prob(self, X):
        y_prob = np.zeros((X.shape[0], self.classes))
        for i, x in enumerate(X):
            for c in range(self.classes):
                y_prob[i, c] = self._prob(x, c)
            
            diff = y_prob[i, 1] - y_prob[i, 0]
            diff = min(diff, 30)
            diff = max(diff, -30)
            
            prob0 = 1 / (1 + np.exp(diff))
            prob1 = 1 / (1 + np.exp(-diff))
            y_prob[i] = [prob0 * self.classes_prob[0], prob1 * self.classes_prob[1]]
        return self.inv_class_map, y_prob
    
    def predict(self, X):
        _, y_prob = self.predict_prob(X)
        return np.array([self.inv_class_map[np.argmax(prob)] for prob in y_prob])

In [5]:
def draw_roc_curve(fprs, tprs, datasets, dataset_name):
    traces = [
        go.Scatter(
            x=fpr, 
            y=tpr, 
            mode='lines', 
            name=f'{dataset}', 
            xaxis=f'x{i+1}', 
            yaxis=f'y{i+1}'
        ) for i, (fpr, tpr, dataset) in enumerate(zip(fprs, tprs, datasets))
    ]
    layout = go.Layout(
        title=f'ROC curves for the {dataset_name} dataset',
        width=950, 
        height=500,
        xaxis1=dict(domain=[0.0, 0.475]),
        xaxis2=dict(domain=[0.525, 1.0]),
        yaxis1=dict(domain=[0.0, 1.0], anchor='x1'),
        yaxis2=dict(domain=[0.0, 1.0], anchor='x2')
    )
    figure = go.Figure(data=traces, layout=layout)
    py.iplot(figure)
    

def test_continuous_naive_bayes(X, y, dataset_name):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)
    model = GaussianNB()
    model.fit(X_train, y_train)
    
    train_score = accuracy_score(y_train, model.predict(X_train))
    val_score = accuracy_score(y_val, model.predict(X_val))
    
    print(f'{dataset_name}: train_score={train_score}, val_score={val_score}')
    
    _, train_probs = model.predict_prob(X_train)
    _, val_probs = model.predict_prob(X_val)
    
    fpr_train, tpr_train, _ = roc_curve(y_train, train_probs[:,1])
    fpr_val, tpr_val, _ = roc_curve(y_val, val_probs[:,1])
    
    fprs, tprs, datasets = [fpr_train, fpr_val], [tpr_train, tpr_val], ['train', 'validation']
    draw_roc_curve(fprs, tprs, datasets, dataset_name)

In [5]:
test_continuous_naive_bayes(*load_cancer_dataset(), 'Cancer')
test_continuous_naive_bayes(*load_spam_dataset(), 'Spam')

Cancer: train_score=0.9406593406593406, val_score=0.9385964912280702


Spam: train_score=0.8195652173913044, val_score=0.8338762214983714


In [2]:
class MultinomialNB:
    def __init__(self, alpha=1.0):
        self.alpha = alpha
        self.feature_count_ = None
        self.feature_log_prob_ = None
        self.classes_ = None
        self.classes_log_prob_ = None
    
    def fit(self, X, y):
        self.feature_count_ = X
        self.classes_ = np.unique(y)
        self.classes_log_prob_ = np.array([np.log(np.average(y == c)) for c in self.classes_])
        self.feature_log_prob_ = np.array([
            np.log(X[y == c].sum(axis=0) + self.alpha) - 
            np.log(X[y == c].sum() + self.alpha * X.shape[1]) for c in self.classes_
        ])
    
    def predict_log_proba(self, X):
        log_proba = np.array([(x * self.feature_log_prob_).sum(axis=1) for x in X])
        log_proba += self.classes_log_prob_
        return log_proba
    
    def predict_proba(self, X):
        log_proba = self.predict_log_proba(X)
        minmax = lambda p_vec : np.array([min(30, max(-30, p)) for p in p_vec])
        proba = np.array([[1 / np.sum(np.exp(minmax(p_vec - p))) for p in p_vec] for p_vec in log_proba])
        return proba
    
    def predict(self, X):
        log_proba = self.predict_log_proba(X)
        y = np.argmax(log_proba, axis=1)
        return self.classes_[y]

In [3]:
def process_dataset(X):
    texts = []
    vocab = {}
    for x in X:
        words = re.sub(r'[^a-zA-Z ]', ' ', x.lower()).split()
        texts.append(words)
        for word in words:
            if word not in vocab:
                i = len(vocab)
                vocab[word] = i
    return vocab, texts


def predict_smsspam():
    X, y = load_smsspam_dataset()
    vocab, texts = process_dataset(X)
    print(f'vocab size: {len(vocab)}')
    
    X = np.zeros((X.shape[0], len(vocab)))
    for i, text in enumerate(texts):
        for word in text:
            X[i, vocab[word]] += 1
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=13)
    
    model = MultinomialNB()
    model.fit(X_train, y_train)
    
    train_pred = model.predict(X_train)
    test_pred = model.predict(X_test)
    
    train_acc = accuracy_score(y_train, train_pred)
    test_acc = accuracy_score(y_test, test_pred)
    print(f'train_accuracy_score: {train_acc}, test_accuracy_score: {test_acc}')
    
    train_prec = precision_score(y_train, train_pred)
    test_prec = precision_score(y_test, test_pred)
    print(f'train_precision_score: {train_prec}, test_precision_score: {test_prec}')
    
    train_rec = recall_score(y_train, train_pred)
    test_rec = recall_score(y_test, test_pred)
    print(f'train_recall_score: {train_rec}, test_recall_score: {test_rec}')
    
    train_probs = model.predict_proba(X_train)
    test_probs = model.predict_proba(X_test)
    
    fpr_train, tpr_train, _ = roc_curve(y_train, train_probs[:,1])
    fpr_test, tpr_test, _ = roc_curve(y_test, test_probs[:,1])
    
    fprs, tprs, datasets = [fpr_train, fpr_test], [tpr_train, tpr_test], ['train', 'test']
    draw_roc_curve(fprs, tprs, datasets, 'SMS Spam')

In [6]:
predict_smsspam()

vocab size: 7785
train_accuracy_score: 0.9914740857078752, test_accuracy_score: 0.9820627802690582
train_precision_score: 0.9945708376421923, test_precision_score: 0.9957850368809273
train_recall_score: 0.995600414078675, test_recall_score: 0.9833506763787722
