In [35]:
from typing import Set, NamedTuple, List, Tuple, Dict, Iterable
import re
import math
from collections import defaultdict, Counter
from io import BytesIO
import requests
import tarfile
import glob
import random
from scratch.machine_learning import split_data, precision, recall

In [36]:
def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9]+", text)
    return set(all_words)

print(tokenize("Data Science is science"))

{'data', 'science', 'is'}


In [37]:
class Message(NamedTuple):
    text: str
    is_spam: bool

In [38]:
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5):
        self.k = k
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages +=1

            for token in tokenize(message.text):
                self.tokens.add(token)

                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> Tuple[float, float]:
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
        
        return p_token_spam, p_token_ham
    
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)

            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)

        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [39]:
BASE_URL = 'https://spamassassin.apache.org/old/publiccorpus'
FILES = ['20021010_easy_ham.tar.bz2',
         '20021010_hard_ham.tar.bz2',
         '20021010_spam.tar.bz2']
OUTPUT_DIR = 'data/spam_data'

for filename in FILES:
    content = requests.get(f"{BASE_URL}/{filename}").content

    fin = BytesIO(content)

    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

In [40]:
path = 'data/spam_data/*/*'

data: List[Message] = []

for filename in glob.glob(path):
    is_spam = "ham" not in filename

    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                subject = subject.strip()
                data.append(Message(subject, is_spam))
                break


In [41]:
random.seed(42)

train_messages, test_messages = split_data(data, 0.75)
model = NaiveBayesClassifier()
model.train(train_messages)

In [42]:
predictions = [(message, model.predict(message.text)) for message in test_messages]

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)

print(confusion_matrix)


Counter({(False, False): 690, (True, True): 70, (True, False): 45, (False, True): 20})


In [43]:
tp, fp, fn, tn = confusion_matrix[True, True], confusion_matrix[False, True], confusion_matrix[True, False], confusion_matrix[False, False]

In [44]:
print(precision(tp, fp, fn, tn))
print(recall(tp, fp, fn, tn))

0.7777777777777778
0.6086956521739131


In [45]:
def p_spam_given_token(token: str, model: NaiveBaiseClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)

    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key= lambda t: p_spam_given_token(t, model))
print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

spammiest_words ['norton', 'clearance', 'account', 'attn', 'mortgage', 'zzzz', 'sale', 'systemworks', 'money', 'adv']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'ouch', 'spamassassin', 'bliss', 'wedded']
