In [1]:
import re
from typing import Set

def tokenize(text: str) -> Set[str]:
    text = text.lower()
    text = re.sub(r'\W', ' ', text)
    return set(text.split())

assert tokenize("Data Science is science") == {"data", "science", "is"}

In [2]:
from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool

In [29]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5, min_count: int = 0) -> None:
        self.k = k
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0
        self.min_count = min_count

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
        
        if self.min_count:
            for token in list(self.tokens):
                if self.token_spam_counts[token] < self.min_count:
                    del self.token_spam_counts[token]
                if self.token_ham_counts[token] < self.min_count:
                    del self.token_ham_counts[token]
            self.tokens.remove(token)
            
    def _propabilities(self, token: str) -> Tuple[float, float]:
        spam_token_count = self.token_spam_counts[token]
        ham_token_count = self.token_ham_counts[token]

        p_token_spam = (spam_token_count + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham_token_count + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham
    
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._propabilities(token)

            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)
        
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)

        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [30]:
messages = [
    Message("spam rules", is_spam=True),
    Message("ham rules", is_spam=False),
    Message("hello ham", is_spam=False)
]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [31]:
assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

In [32]:
text = "hello spam"

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),
    1 - (0 + 0.5) / (1 + 2 * 0.5),
    1 - (1 + 0.5) / (1 + 2 * 0.5),
    (0 + 0.5) / (1 + 2 * 0.5)
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),
    1 - (2 + 0.5) / (2 + 2 * 0.5),
    1 - (1 + 0.5) / (2 + 2 * 0.5),
    (1 + 0.5) / (2 + 2 * 0.5)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

In [33]:
from pathlib import Path

SPAM_DATA_FOLDER = Path() / 'datasets' / 'spam_data'
assert SPAM_DATA_FOLDER.is_dir()

In [34]:
data: List[Message] = []

for file in SPAM_DATA_FOLDER.glob("**/*.*"):
    is_spam = "ham" not in file.parent.name
    with file.open(errors='ignore') as file_reader:
        for line in file_reader.readlines():
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break

In [35]:
import random
from scratch.machine_learning import split_data

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

In [54]:
model = NaiveBayesClassifier()
model.train(train_messages)

In [55]:
predictions = [
    (message, model.predict(message.text))
    for message in test_messages
]

In [56]:
from collections import Counter

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                            for message, spam_probability in predictions)

print(confusion_matrix)

Counter({(False, False): 656, (True, True): 84, (True, False): 55, (False, True): 30})


In [57]:
from scratch.machine_learning import accuracy, precision, recall, f1_score

metric_kwargs = {
    'tp': confusion_matrix[(True, True)],
    'tn': confusion_matrix[(False, False)],
    'fp': confusion_matrix[(False, True)],
    'fn': confusion_matrix[(True, False)]
}

{
    "Accuracy": accuracy(**metric_kwargs),
    "Precision": precision(**metric_kwargs),
    "Recall": recall(**metric_kwargs),
    "F1 Score": f1_score(**metric_kwargs)
}

{'Accuracy': 0.896969696969697,
 'Precision': 0.7368421052631579,
 'Recall': 0.60431654676259,
 'F1 Score': 0.6640316205533597}

In [58]:
def p_span_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._propabilities(token)

    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key=lambda t: p_span_given_token(t, model))

print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

spammiest_words ['zzzz', '500', 'reps', 'clearance', 'mortgage', 'systemworks', 'year', 'rates', 'money', 'adv']
hammiest_words ['satalk', 'spambayes', 'users', 'zzzzteana', 'razor', 'sadev', 'apt', 'ouch', 'perl', 'bliss']
