In [1]:
from io import BytesIO
import requests
import tarfile
from typing import Set, NamedTuple, List, Tuple, Dict, Iterable

In [2]:
FILES = ["https://spamassassin.apache.org/old/publiccorpus/20021010_easy_ham.tar.bz2",
        "https://spamassassin.apache.org/old/publiccorpus/20021010_hard_ham.tar.bz2",
        "https://spamassassin.apache.org/old/publiccorpus/20021010_spam.tar.bz2"]

In [3]:
OUTPUT_DIR = 'spam_data'

In [4]:
for filename in FILES:
    content = requests.get(filename).content
    fin = BytesIO(content)
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)
    

In [5]:
import glob, re

path = 'spam_data/*/*'

In [6]:
# need to type in our classifer code from pp. 178-179 ...

from typing import Set
import re

class Message(NamedTuple):
    text: str
    is_spam: bool

def tokenize(text:str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9]+", text)
    return set(all_words)

assert tokenize("Data Science is science") == {"data", "science", "is"}

In [7]:
import math
from collections import defaultdict # gives a default value if you try to get key that doesn't exist

class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k # smoothing factor
        self.tokens: Set[str] = set() 
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0
        
    def train(self, messages: Iterable[Message]) -> None: # list of Message class objects (which are each NamedTuples)
        for message in messages:
            if message.is_spam:
                self.spam_messages += 1   # tallies up messages that are spam based on pre-existing boolean assignments
            else:
                self.ham_messages += 1
                
            for token in tokenize(message.text): # generate the set of unique words & iterate
                self.tokens.add(token) # add each token to the model's token set one at a time ...
                if message.is_spam:
                    self.token_spam_counts[token] += 1 # add token as key to spam count dict, and add a 1 to the tally
                else:
                    self.token_ham_counts[token] += 1 # add token as key to ham count dict, and add a 1 to the tally
                    
    def _probabilities(self, token: str) -> Tuple[float, float]:
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        
        p_token_spam = (spam + self.k) / (self.spam_messages + 2*self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2*self.k)
        
        return p_token_spam, p_token_ham
    
    
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0
        
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)
            
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [8]:
messages = [Message("spam rules", is_spam=True),
            Message("ham rules", is_spam=False),
            Message("hello ham", is_spam=False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [9]:
assert model.tokens == {"spam","ham","rules","hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam":1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello":1}

In [10]:
data: List[Message] = []

for filename in glob.glob(path):
    is_spam = "ham" not in filename
    
    with open(filename, errors='ignore') as email_file:
        for line in email_file: 
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break # done with this file

In [13]:
import random
from scratch.machine_learning import split_data

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

In [14]:
from collections import Counter

predictions = [(message, model.predict(message.text))
              for message in test_messages]

In [28]:
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                          for message, spam_probability in predictions)
confusion_matrix

Counter({(False, False): 664,
         (True, True): 85,
         (True, False): 54,
         (False, True): 22})

In [30]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)    
    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))
