In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [2]:
import re

from collections import defaultdict


def split_tokenize(message):
    return message.split()


token_pattern = re.compile(r"(?u)\b\w\w+\b")
standard_tokenizer = token_pattern.findall


def tokenize(message):
    return standard_tokenizer(message.lower())


class NaiveBayes:
    def __init__(self, tokenize=tokenize):
        self.tokenize = tokenize

    def get_label_counts(self, messages):
        label_counts = defaultdict(int)
        for label, text in messages:
            label_counts[label] += 1
        return label_counts

    def get_prior_probabilities(self, label_counts):
        number_of_messages = sum(label_counts.values())
        return {
            label: count / number_of_messages for label, count in label_counts.items()
        }

    def get_word_label_counts(self, messages):
        counts = defaultdict(lambda: defaultdict(int))
        for label, text in messages:
            for word in self.tokenize(text):
                counts[word][label] += 1
        return counts

    def get_number_of_words(self, word_label_counts):
        number_of_words = defaultdict(int)
        for word, counts in word_label_counts.items():
            for label, count in counts.items():
                number_of_words[label] += 1
        return number_of_words

    def fit(self, messages):
        self.prior_probabilities = self.get_prior_probabilities(
            self.get_label_counts(messages)
        )
        self.word_label_counts = self.get_word_label_counts(messages)
        self.number_of_words = self.get_number_of_words(self.word_label_counts)
        return self

    def update_probabilities(self, probabilities, counts_per_label, number_of_words):
        updated_probabilites = {}
        for label, prior_probability in probabilities.items():
            word_count = counts_per_label.get(label, 0.5)
            word_probability = word_count / number_of_words[label]
            updated_probabilites[label] = prior_probability * word_probability
        return updated_probabilites

    def predict(self, message):
        probabilities = dict(self.prior_probabilities)
        for word in self.tokenize(message):
            counts_per_label = self.word_label_counts.get(word, {})
            probabilities = self.update_probabilities(
                probabilities, counts_per_label, self.number_of_words
            )
        return probabilities

    def predict_label(self, message):
        probabilities = self.predict(message)
        return sorted(
            [(prob, label) for label, prob in probabilities.items()], reverse=True
        )[0][1]

<IPython.core.display.Javascript object>

In [3]:
def test_predict():
    train = [
        ("spam", "foo bar baz"),
        ("spam", "foo asdf bsdf"),
        ("ham", "asdf csdf"),
    ]
    test_cases = [
        # message, expected_probabilities
        ("foo", {"ham": (1 / 3) * (0.5 / 2), "spam": (2 / 3) * (2 / 5)})
    ]
    model = NaiveBayes().fit(train)
    for message, expected_probabilites in test_cases:
        probabilities = model.predict(message)
        assert probabilities == expected_probabilites


test_predict()

<IPython.core.display.Javascript object>

In [4]:
def test_predict_word_probabilities():
    test_cases = [
        # (probabilities, counts_per_label, number_of_words), expected
        (({}, {}, {}), {}),
        (({"spam": 1}, {"spam": 10}, {"spam": 100}), {"spam": 0.1}),
        (
            ({"spam": 1, "ham": 0.5}, {"spam": 10, "ham": 5}, {"spam": 100, "ham": 50}),
            {"spam": 0.1, "ham": 0.05},
        ),
        # no word count -> make sure probability is > 0
        (({"spam": 1}, {}, {"spam": 100}), {"spam": 0.005}),
    ]
    model = NaiveBayes()
    for (probabilities, counts_per_label, number_of_words), expected in test_cases:
        updated = model.update_probabilities(
            probabilities, counts_per_label, number_of_words
        )
        assert updated == expected


test_predict_word_probabilities()

<IPython.core.display.Javascript object>

In [5]:
def test_initial_probabilities():
    test_cases = [
        # (train, expected_initial_probabilities)
        ([], {}),
        ([("ham", ""), ("spam", "")], {"ham": 0.5, "spam": 0.5}),
        ([(i, "") for i in range(3)], {k: 1 / 3 for k in range(3)}),
    ]
    for train, expected_initial_probabilities in test_cases:
        model = NaiveBayes().fit(train)
        probabilities = model.predict("")
        assert probabilities == expected_initial_probabilities


test_initial_probabilities()

<IPython.core.display.Javascript object>

# Evaluation

In [6]:
import random

random.seed(2021)  # make sure the same messages are choosen between restarts


def split_train_test(messages, test_quote=0.1):
    messages_by_label = defaultdict(list)
    for label, text in messages:
        messages_by_label[label].append((label, text))

    # stratified sampling
    test, train = [], []
    for label, label_messages in messages_by_label.items():
        indices = list(range(len(label_messages)))
        test_len = int(len(indices) * test_quote)
        test_indices = set(random.sample(indices, test_len))
        for index, message in enumerate(label_messages):
            if index in test_indices:
                test.append(message)
            else:
                train.append(message)
    return train, test

<IPython.core.display.Javascript object>

In [7]:
from django_comments import get_model as get_comments_model

Comment = get_comments_model()


def get_messages_from_comments():
    messages = []
    for comment in Comment.objects.all():
        label = "spam" if comment.is_removed else "ham"
        message = f"{comment.name} {comment.title} {comment.comment}"
        messages.append((label, message))
    return messages


messages = get_messages_from_comments()

<IPython.core.display.Javascript object>

In [8]:
train_messages, test_messages = split_train_test(messages, test_quote=0.1)
print(len(train_messages), len(test_messages))
model = NaiveBayes().fit(train_messages)

310 34


<IPython.core.display.Javascript object>

In [9]:
%%time

true_positives = 0
all_observations = len(test_messages)

for label, message in test_messages:
    if model.predict_label(message) == label:
        true_positives += 1

accuracy = true_positives / all_observations
print(f"Accuracy: {accuracy:.3f}")

Accuracy: 1.000
CPU times: user 6.75 ms, sys: 1.49 ms, total: 8.24 ms
Wall time: 7.09 ms


<IPython.core.display.Javascript object>

In [36]:
outcomes = (("true", "false"), ("positive", "negative"))
possible_results = [f"{a}_{b}" for b in outcomes[1] for a in outcomes[0]]
result_template = dict.fromkeys(possible_results, 0)

labels = set(model.prior_probabilities.keys())
label_results = {label: dict(result_template) for label in labels}
all_observations = len(test_messages)

for label, message in test_messages:
    predicted = model.predict_label(message)
    if label == predicted:
        label_results[label]["true_positive"] += 1
    else:
        label_results[label]["false_negative"] += 1
        label_results[predicted]["false_positive"] += 1

<IPython.core.display.Javascript object>

In [37]:
def precision_recall_f1(result):
    all_observations = sum(result.values())
    tp = result["true_positive"]
    fp = result["false_positive"]
    fn = result["false_negative"]
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * (precision * recall) / (precision + recall)
    return precision, recall, f1


def show_result(label_results):
    for label, result in label_results.items():
        precision, recall, f1 = precision_recall_f1(result)
        print(
            f"{label: >4} f1: {f1:.3f} precision: {precision:.3f} recall: {recall:.3f}"
        )

<IPython.core.display.Javascript object>

In [28]:
show_result(label_results)

 ham f1: 0.975 precision: 0.987 recall: 0.963
spam f1: 0.850 precision: 0.791 recall: 0.919


<IPython.core.display.Javascript object>

In [38]:
show_result(label_results)

 ham f1: 0.991 precision: 0.996 recall: 0.985
spam f1: 0.941 precision: 0.911 recall: 0.973


<IPython.core.display.Javascript object>

In [33]:
import io
import zipfile
import requests


def get_sms_messages():
    # download zip archive
    training_data_url = (
        "https://d2b7dn9rofvhjd.cloudfront.net/sms-spam-collection-dataset.zip"
    )
    response = requests.get(training_data_url)
    z = zipfile.ZipFile(io.BytesIO(response.content))

    # parse messages
    spam_text = z.read("spam.csv").decode("latin1")
    lines = iter(spam_text.split("\r\n"))
    skipped = next(lines)  # skip first line

    messages = []
    for line in lines:
        line = line.rstrip(",")
        label, *message = line.split(",")
        messages.append((label, " ".join(message)))
    return messages


messages = get_sms_messages()

<IPython.core.display.Javascript object>

In [34]:
train_messages, test_messages = split_train_test(messages, test_quote=0.1)
print(len(train_messages), len(test_messages))
model = NaiveBayes().fit(train_messages)

5016 556


<IPython.core.display.Javascript object>