In [1]:
%%capture
%run ch11_maching_learning.ipynb

In [2]:
# Tokenizer

from typing import Set
import re

def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)

In [3]:
# Training data type

from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool # label

In [4]:
# Naive Bayes

from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k # smoothing factor (avoid zero spam probability)
        
        self.tokens: Set[str] = set() # dictionary
        self.token_spam_counts: Dict[str, int] = defaultdict(int) # number of spam messages per each token
        self.token_ham_counts: Dict[str, int] = defaultdict(int) # number of ham messages per each token
        self.spam_messages = self.ham_messages = 0 # total number of spam and ham messages
        
    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
                
            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
    
    def _probabilities(self, token: str) -> Tuple[float, float]:
        """Returns P(token|spam) and P(token|ham)"""
        
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        
        p_token_spam = (self.k + spam) / (self.k * 2 + self.spam_messages)
        p_token_ham = (self.k + ham) / (self.k * 2 + self.ham_messages)
        
        return p_token_spam, p_token_ham
    
    def predict(self, text: str) -> float:
        """Returns P(spam|token)"""
        input_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0
        
        # Iterate through each word in our dictionary
        for token in self.tokens:
            
            # Compute P(token|spam) and P(token|ham)
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            # If token appears in the message, add the log probability of seeing it
            if token in input_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            # Otherwise add the log probability of not seeing it
            else:
                log_prob_if_spam += math.log(1 - prob_if_spam)
                log_prob_if_ham += math.log(1 - prob_if_ham)
                
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        
        return prob_if_spam / (prob_if_spam + prob_if_ham)

In [5]:
# Naive Bayes - test with examples

messages = [Message("spam rules", is_spam=True),
            Message("ham rules", is_spam=False),
            Message("hello ham", is_spam=False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

In [6]:
text = "hello spam"

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),     # spam (present)
    1 - (0 + 0.5) / (1 + 2 * 0.5), # ham (not present)
    1 - (1 + 0.5) / (1 + 2 * 0.5), # rules (not present)
    (0 + 0.5) / (1 + 2 * 0.5)      # hello (present)
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),     # spam (present)
    1 - (2 + 0.5) / (2 + 2 * 0.5), # ham (not present)
    1 - (1 + 0.5) / (2 + 2 * 0.5), # rules (not present)
    (1 + 0.5) / (2 + 2 * 0.5)      # hello (present)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

# Should be about 0.83
# assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)
print(p_if_spam / (p_if_spam + p_if_ham))
print(model.predict(text))

0.8350515463917525
0.8350515463917525


In [7]:
# Naive Bayes - test with a dataset

import glob, re

path = 'data/spam_data/*/*'

"""
easy_ham sub-directory: https://spamassassin.apache.org/old/publiccorpus/20021010_easy_ham.tar.bz2
hard_ham sub-directory: https://spamassassin.apache.org/old/publiccorpus/20021010_hard_ham.tar.bz2
spam     sub-directory: https://spamassassin.apache.org/old/publiccorpus/20021010_spam.tar.bz2
"""

data: List[Message] = []

for filename in glob.glob(path):
    is_spam = "ham" not in filename
    
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam)) # we only see subject line
                break # done with this file

In [8]:
import random
from collections import Counter

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

# Train
model = NaiveBayesClassifier()
model.train(train_messages)

# Test
predictions = [(message, model.predict(message.text)) 
               for message in test_messages]

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) 
                           for message, spam_probability in predictions)

confusion_matrix

Counter({(False, False): 670,
         (True, False): 40,
         (True, True): 86,
         (False, True): 29})

In [9]:
TP = confusion_matrix[(True, True)]
FP = confusion_matrix[(False, True)]
FN = confusion_matrix[(True, False)]
TN = confusion_matrix[(False, False)]

print("Precision:", precision(TP, FP, FN, TN))
print("Recall:", recall(TP, FP, FN, TN))
print("F1:", f1_score(TP, FP, FN, TN))

Precision: 0.7478260869565218
Recall: 0.6825396825396826
F1: 0.7136929460580913


In [10]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)
    
    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

print("Top 10 spammiest words", words[-10:])
print("Top 10 hammiest words", words[:10])

Top 10 spammiest words ['assistance', 'attn', '95', 'clearance', 'per', 'money', 'rates', 'sale', 'systemworks', 'adv']
Top 10 hammiest words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'bliss']


In [11]:
def drop_final_s(word):
    return re.sub("s$", "", word)

assert drop_final_s("plays") == "play"