In [None]:
from typing import NamedTuple, Iterable, Set, Dict, Tuple
from collections import defaultdict
import math
import re

# ------------------------------
# Step 1: Define Message format
# ------------------------------
class Message(NamedTuple):
    text: str
    is_spam: bool

# ------------------------------
# Step 2: Tokenizer
# ------------------------------
def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)

# ------------------------------
# Step 3: Naive Bayes Classifier
# ------------------------------
class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = 0
        self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> Tuple[float, float]:
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)

            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)

        return prob_if_spam / (prob_if_spam + prob_if_ham)
    

    
messages = [
    Message("spam rules", is_spam=True),
    Message("ham rules", is_spam=False),
    Message("hello ham", is_spam=False)
]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

assert model.tokens=={"spam","ham","rules","hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam":1,"rules":1}
assert model.token_ham_counts == {"ham":2,"rules":1,"hello":1}

text = "hello spam"

probs_if_spam = [
(1 + 0.5) / (1 + 2 * 0.5), # "spam" (present)
1 - (0 + 0.5) / (1 + 2 * 0.5), # "ham" (not present)
1 - (1 + 0.5) / (1 + 2 * 0.5), # "rules" (not present)
(0 + 0.5) / (1 + 2 * 0.5) # "hello" (present)
]
probs_if_ham = [
(0 + 0.5) / (2 + 2 * 0.5), # "spam" (present)
1 - (2 + 0.5) / (2 + 2 * 0.5), # "ham" (not present)
1 - (1 + 0.5) / (2 + 2 * 0.5), # "rules" (not present)
(1 + 0.5) / (2 + 2 * 0.5), # "hello" (present)
]

probs_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
probs_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

assert model.predict(text) == probs_if_spam/(probs_if_spam+probs_if_ham)
from io import BytesIO
import requests
import tarfile

BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
"20021010_hard_ham.tar.bz2",
"20021010_spam.tar.bz2"]
# This is where the data will end up,
# in /spam, /easy_ham, and /hard_ham subdirectories.
# Change this to where you want the data.
OUTPUT_DIR = 'spam_data'

for filename in FILES:
    # Use requests to get the file contents at each URL.
    content = requests.get(f"{BASE_URL}/{filename}").content
    # Wrap the in-memory bytes so we can use them as a "file."
    fin = BytesIO(content)
    # And extract all the files to the specified output dir.
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

import glob, re
# modify the path to wherever you've put the files
path = 'spam_data/*/*'
data: List[Message] = []
# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
    is_spam = "ham" not in filename

    # There are some garbage characters in the emails; the errors='ignore'
    # skips them instead of raising an exception.
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break # done with this file


from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
# The random_state=0 parameter ensures you get the same split every time
train_messages, test_messages = train_test_split(data, 
                                                 test_size=0.25, 
                                                 random_state=0)
model = NaiveBayesClassifier()
model.train(train_messages)

from collections import Counter
predictions = [(message, model.predict(message.text))
for message in test_messages]
# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
for message, spam_probability in predictions)
print(confusion_matrix)

def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    # We probably shouldn't call private methods, but it's for a good cause.
    prob_if_spam, prob_if_ham = model._probabilities(token)
    return prob_if_spam / (prob_if_spam + prob_if_ham)
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))
print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])


  tf.extractall(OUTPUT_DIR)


Counter({(False, False): 678, (True, True): 86, (True, False): 42, (False, True): 19})
spammiest_words ['fortune', 'reps', 'zzzz', 'mortgage', 'clearance', 'money', 'sale', 'systemworks', 'rates', 'adv']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'spamassassin', 'problem', 'satalk']
