In [298]:
import csv
import numpy as np
from sklearn.model_selection import train_test_split


def read_csv(filename):
    with open(filename, "r", newline="") as file:
        reader = csv.reader(file)
        next(reader)  # skip one row
        for row in reader:
            text, label = row
            yield text, label


texts = []
labels = []
for text, label in read_csv("input/email_classification.csv"):
    texts.append(text)
    labels.append(1 if label == "spam" else 0)


X_train, X_test, y_train, y_test = train_test_split(
    texts, labels, test_size=0.50, random_state=42, shuffle=True
)

In [299]:
from typing import List, Tuple, Dict, Iterable, Set
import math
import string

from collections import defaultdict


class NaiveBayes:

    def __init__(self, k: float = 0.5, min_token_count: int = 10) -> None:
        """Initializr Naive Bayes model.

        :param float k: Smoothing factor, to avoid token probability 0, defaults to 0.5.
        :param int min_token_count: Minimal token counts to be used for prediction, defaults to 10.
        """
        self.k = k
        self.min_token_count = min_token_count
        self.count_spam: int = 0
        self.count_ham: int = 0
        self.vocab_counts = defaultdict(int)
        self.token_spam_counts = defaultdict(int)
        self.token_ham_counts = defaultdict(int)

    def tokenize(self, text: str) -> Set[str]:
        """Returns set of tokens, lowercased and minus punctutation."""
        text = text.lower()
        translator = str.maketrans("", "", string.punctuation)
        text = text.translate(translator)
        tokens = text.split()
        return set(tokens)

    def fit(self, xs: List[str], ys: List[int]) -> None:
        """Fit training data."""
        # iterate through each sample
        for x, y in zip(xs, ys):
            if y == 1:  # if spam
                self.count_spam += 1
            elif y == 0:  # if ham
                self.count_ham += 1

            # iterate through each token in sample
            x_tokenized = self.tokenize(x)
            for token in x_tokenized:
                self.vocab_counts[token] += 1
                if y == 1:  # if spam
                    self.token_spam_counts[token] += 1
                elif y == 0:  # if ham
                    self.token_ham_counts[token] += 1

    def _proba_single_token(self, token: str) -> Tuple[float, float]:
        """
        Returns probability that we see this token in spam and in ham messages.
        P(token|spam) : count this token in spam messages / count spam messages
        P(token|ham) : count this token in ham messages / count ham messages
        """
        count_token_in_spam = self.token_spam_counts[token]
        count_token_in_ham = self.token_ham_counts[token]
        p_token_spam = (count_token_in_spam + self.k) / (self.count_spam + self.k * 2)
        p_token_ham = (count_token_in_ham + self.k) / (self.count_ham + self.k * 2)
        return p_token_spam, p_token_ham

    def _predict_single_sample(self, x: str) -> float:
        """Returns proba that a sample is spam."""
        x_tokenized = self.tokenize(x)
        log_prob_if_spam = 0
        log_prob_if_ham = 0
        for token, count in self.vocab_counts.items():

            # skip rare occuring word
            if count < self.min_token_count:
                continue

            p_token_spam, p_token_ham = self._proba_single_token(token)
            # if token appear in text
            # add log proba of seeing it
            if token in x_tokenized:
                log_prob_if_spam += math.log(p_token_spam)
                log_prob_if_ham += math.log(p_token_ham)
            # if token do not appear in text
            # add log proba of not seeing it
            elif token not in x_tokenized:
                log_prob_if_spam += math.log(1 - p_token_spam)
                log_prob_if_ham += math.log(1 - p_token_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

    def predict(self, xs: List[str]) -> List[int]:
        """Returns list of proba that samples are spam."""
        return [self._predict_single_sample(x) for x in xs]

    def _p_spam_given_token(self, token) -> float:
        """Returns proba of spam given a token."""
        p_token_spam, p_token_ham = self._proba_single_token(token)
        return p_token_spam / (p_token_spam + p_token_ham)

    def word_ranking(self) -> List[str]:
        """Returns vocab sorted by proba of being in spam message."""
        words = sorted(
            self.vocab_counts.keys(),
            key=lambda token: self._p_spam_given_token(token),
            reverse=True,
        )
        return words


texts = [
    "spam rules",
    "ham rules",
    "hello ham",
]
labels = [1, 0, 0]

bayes = NaiveBayes(k=0.5, min_token_count=0)
bayes.fit(texts, labels)

assert bayes.vocab_counts.keys() == {"spam", "ham", "rules", "hello"}
assert bayes.count_spam == 1
assert bayes.count_ham == 2
assert bayes.token_spam_counts == {"spam": 1, "rules": 1}
assert bayes.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}
texts = ["hello spam", "hello ham", "okay all"]
bayes.predict(texts)

[0.8350515463917525, 0.022004889975550123, 0.5031055900621119]

In [300]:
bayes = NaiveBayes(k=1, min_token_count=25)
bayes.fit(X_train, y_train)
y_pred = [int(y > 0.5) for y in bayes.predict(X_test)]

acc = np.sum(np.asarray(y_pred) == np.asarray(y_test)) / len(y_test)
acc

0.9777777777777777

In [301]:
for i, (x, y) in enumerate(zip(X_test, y_test)):
    pred = bayes.predict([x])[0] > 0.5
    print(f"Predicted {pred: <1} , Actual {y: < 1}: {x}")
    if i == 5:
        break

Predicted 1 , Actual  1: Act now! Limited-time offer on luxury watches. Click here to buy now!
Predicted 1 , Actual  1: You're a winner! Click here to claim your exclusive prize.
Predicted 1 , Actual  1: You've been selected for a free iPhone X. Click here to claim your prize!
Predicted 1 , Actual  1: Get exclusive access to our VIP club. Click here to join now!
Predicted 1 , Actual  1: Claim your prize now! Click here to confirm your winnings.
Predicted 1 , Actual  1: Congratulations! You've been selected for a job interview. Click here to schedule your interview.


In [302]:
top10_spam = bayes.word_ranking()[:10]
top10_spam

['click',
 'claim',
 'here',
 'rich',
 'prize',
 'won',
 'youve',
 'suspended',
 'quick',
 'join']

In [303]:
top10_ham = bayes.word_ranking()[-10:]
top10_ham.reverse()
top10_ham

['you',
 'were',
 'is',
 'thank',
 'heres',
 'latest',
 'excited',
 'purchase',
 'team',
 'us']

# Explaination

## How does that tokenization works?

In [304]:
test = "Recall that we were investigating the relationship between a DataSciencester..."
bayes.tokenize(test)

{'a',
 'between',
 'datasciencester',
 'investigating',
 'recall',
 'relationship',
 'that',
 'the',
 'we',
 'were'}