#### Let's create a simple function to tokenise messages into distinct words. We'll first convert each message to lowercase, then use `re.findall` to extract "words" consisting of letters, numbers and apostrophes. Finally, we'll use `set` to get just the distinct words:

In [2]:
from typing import Set
import re

def tokenize(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)

print(tokenize("Data science is science"))

{'is', 'science', 'data'}


#### We'll also define a type for our training data:

In [3]:
from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool

#### The constructor will take just one parameter, the pseudocount to use when computing probabilities. It also initialises an empty set of tokens, counters to track how often each token is seen in spam messages and non-spam messages, and counts how many spam and non-spam messages it was trained on:

In [15]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k # smoothing factor
        self.tokens: Set[str] = set()
        self.tokens_spam_counts: Dict[str, int] = defaultdict(int)
        self.tokens_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1
            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.tokens_spam_counts[token] += 1
                else:
                    self.tokens_ham_counts[token] += 1


#### Next, we'll give it a method to train it on a bunch of messages. First, we increment the `spam_messages` and `ham_messages` counts. Then we tokenize each message text, and for each token we increment the `token_spam_counts` or `token_ham_counts` based on the message type:

In [16]:
    def _probabilities(self, token: str) -> Tuple[float, float]:
        """ returns P(token | spam) and P(token | ham)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

#### Finally, we're ready to write our `predict` method. As mentioned earlier, rather than multiplying together lots of small probabilities, we'll instead sum up the log probabilities:

In [17]:
    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        # Iterate through each word in our vocabulary
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            # If *token* appears in the message, add the log probability of seeing it
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            # Otherwise add the log probability of _not_ seeing it, which is log(1 - probability of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)
        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

# Testing our model

In [19]:
messages = [Message("spam rules", is_spam = True),
            Message("ham rules", is_spam = False),
            Message("hello ham", is_spam = False)]
model = NaiveBayesClassifier(k=0.5)
model.train(messages)
print(model.tokens)
print(model.spam_messages)
print(model.ham_messages)

{'ham', 'spam', 'rules', 'hello'}
1
2


# Using our model

#### A popular dataset is the SpamAssassin public corpus. We'll look at the the files prefixed with 20021010

In [3]:
from io import BytesIO # So we can treat bytes as a file.
import requests # To download the files, which are in .tar.bz format
import tarfile

BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
         "20021010_hard_ham.tar.bz2",
         "20021010_spam.tar.bz2"]

# This is where the data will end up, in /spam, /easy_ham, and /hard_ham subdirectories
OUTPUT_DIR = 'spam_data'

for filename in FILES:
    # Use requests to get the file contents at each URL.
    content = requests.get(f"{BASE_URL}/{filename}").content
    # Wrap the in-memory bytes so we can use them as a file
    fin = BytesIO(content)

    # And extract all the files to the specified output dir
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

#### To keep things really simple, we'll just look at the subject lines of each email. How do we identify the subject line? when we look through the files, they all seem to start with "Subject:". So we'll looks for that:

In [None]:
import glob, re

# modify the path to wherever you've put the files
path = 'spam_data/spam/'
data: List[Message] = []

# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
    is_spam = "ham" not in filename
    # There are some garbage characters in the emails; the errors='ignore' skips them instead of raising an exception.
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break # done with this file