# Naive Bayes Task

In [53]:
from typing import Set
import re

In [54]:
def tokenize(text: str) -> Set[str]:
    text = text.lower()                         # Convert to lowercase
    all_words = re.findall("[a-z0-9]+", text)   # extract the words, and
    return set(all_words)                       # remove duplicates.

assert tokenize("Data Science is science") == {"data", "science", "is"}

In [55]:
from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool

In [56]:
from typing import List, Tuple, Dict, Iterable, Any
import math
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self, k: float= 0.5) -> None:
        self.k = k  # smoothing factor
        
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0
        
    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # Increment message counts.
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
            
            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
                    
        # Private "helper" function:
    def _probabilities(self, token: str) -> Tuple[float, float]:
        """returns P(token | spam) and P(token | ham)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
    
        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
    
        return p_token_spam, p_token_ham
    
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0
    
        # Iterate through each word in our vocabulary.
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            # If *token* appears in the message, add the log proability of seeting it.
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
                
            # Otherwise add the log probability of _not_ seeing it, which is log(1 - probability of seeting it).
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)


#

### Testing Our Model

In [57]:
# Unit tests:

messages = [Message("spam rules", is_spam=True),
            Message("ham rules", is_spam=False),     
            Message("hello ham", is_spam=False)]

model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [58]:
assert model.tokens == {"spam", "ham", "rules", "hello"}
assert model.spam_messages == 1
assert model.ham_messages == 2
assert model.token_spam_counts == {"spam": 1, "rules": 1}
assert model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}


In [59]:
text = "hello spam"

probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),      # "spam" (present)
    1 - (0 + 0.5) / (1 + 2 * 0.5),  # "ham" (not present)
    1 - (1 + 0.5) / (1 + 2 * 0.5),  # "rules" (not present)
    (0 + 0.5) / (1 + 2 * 0.5)       # "hello" (present)
]

probs_if_ham = [
    (0 + 0.5) / (2 + 2 * 0.5),      # "spam" (present)
    1 - (2 + 0.5) / (2 + 2 * 0.5),  # "ham" (not present)
    1 - (1 + 0.5) / (2 + 2 * 0.5),  # "rules" (not present)
    (1 + 0.5) / (2 + 2 * 0.5)       # "hello" (present)
]

p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

In [60]:
# Should be about 0.83.
assert model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

#

### Using Our Model

In [61]:
from io import BytesIO  # So we can treat bytes as a file.
import requests         # To download the files, which are in .tar.bz format.
import tarfile

In [62]:
BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
        "20021010_hard_ham.tar.bz2",
        "20021010_spam.tar.bz2"]
OUTPUT_DIR = "spam_data"

for filename in FILES:
    # Use requests to get the file contents at each URL.
    content = requests.get(f"{BASE_URL}/{filename}").content
    
    # Wrap the in-memory bytes so we can use them as a "file".
    fin = BytesIO(content)
    
    # And extract all the files to the specified output dir.
    with tarfile.open(fileobj=fin, mode="r:bz2") as tf:
        tf.extractall(OUTPUT_DIR)

In [63]:
import glob, re

# Modify the path to wherever you've put the files.
path = "./spam_data/*/*"

data: List[Message] = []

# glob.glob returns every filename that matches the wildcarded path.
for filename in glob.glob(path):
    is_spam = "ham" not in filename
    
    # There are some garbage characters in the emails; the errors="ignore" skips them instead of raising an exception.
    with open(filename, errors="ignore") as email_file:
        for line in email_file:
            if line.startswith("Subject"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break   # Done with this file.

In [64]:
import random

def split_data(data: List[Any], proportion: float) -> Tuple[List[Any], List[Any]]:
    """
    Splits the data into a training set and a test set by the given proportion
    :param data: The complete dataset.
    :param proportion: The proportion of the training set size.
    :return: A tuple containing the training set and the test set.
    """

    data = data[:]  # Make a shallow copy of the data
    random.shuffle(data)  # Shuffle the data
    cut = int(len(data) * proportion)  # Calculate the cut-off point for the split
    return data[:cut], data[cut:]

random.seed(0)
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)


In [65]:
from collections import Counter

predictions = [(message, model.predict(message.text)) for message in test_messages]

# Assume that spam_probability > 0.5 corresponds to spam prediction and count the combinations of (actual is_spam, predicted is_spam).
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                            for message, spam_probability in predictions)
print(confusion_matrix)

Counter({(False, False): 673, (True, True): 87, (True, False): 39, (False, True): 26})


In [67]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    # We probably shouldn't call private methods, but it's for a good cause.
    probs_if_spam, probs_if_ham = model._probabilities(token)
    
    return probs_if_spam / (probs_if_spam + probs_if_ham)

words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

spammiest_words ['zzzz', 'assistance', '95', 'attn', 'money', 'per', 'clearance', 'sale', 'systemworks', 'adv']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'sadev', 'apt', 'perl', 'ouch', 'spamassassin', 'bliss']
