# Naive Bayes from scratch

In [12]:
import re
import glob
from pathlib import Path
from random import shuffle
from math import exp, log
from collections import defaultdict, Counter
from typing import NamedTuple, List, Set, Tuple, Dict

# Load data

In [13]:
# Ensure that we have a `data` directory we use to store downloaded data
!mkdir -p data
data_dir: Path = Path('data')

In [14]:
# We're using the "Enron Spam" data set
!wget -nc -P data http://nlp.cs.aueb.gr/software_and_datasets/Enron-Spam/preprocessed/enron1.tar.gz

File ‘data/enron1.tar.gz’ already there; not retrieving.



In [15]:

import os
os.chdir("/content/data/enron1")

# Read and print summary.txt (assuming it's in the current directory)
try:
  with open("Summary.txt", "r") as f:
    print(f.read())
except FileNotFoundError:
  print("Summary.txt not found in data/enron1 directory")

Legitimate
----------
- Owner: farmer-d
- Total number: 3672 emails
- Date of first email: 1999-12-10
- Date of last email: 2002-01-11
- Similars deletion: No
- Encoding: No


Spam
----
- Owner: GP
- Total number: 1500 emails
- Date of first email: 2003-12-18
- Date of last email: 2005-09-06
- Similars deletion: No
- Encoding: No

Spam:Legitimate rate = 1:3
Total number of emails (legitimate + spam): 5975



In [16]:
!tar -xzf data/enron1.tar.gz -C data

# View Data

In [17]:
# The data set has 2 directories: One for `spam` messages, one for `ham` messages
spam_data_path: Path = data_dir / 'enron1' / 'spam'
ham_data_path: Path = data_dir / 'enron1' / 'ham'

In [18]:
# Our data container for `spam` and `ham` messages
class Message(NamedTuple):
    text: str
    is_spam: bool

In [19]:
# Globbing for all the `.txt` files in both (`spam` and `ham`) directories
spam_message_paths: List[str] = glob.glob(str(spam_data_path / '*.txt'))
ham_message_paths: List[str] = glob.glob(str(ham_data_path / '*.txt'))

message_paths: List[str] = spam_message_paths + ham_message_paths
message_paths[:5]

['data/enron1/spam/1503.2004-07-07.GP.spam.txt',
 'data/enron1/spam/1879.2004-08-18.GP.spam.txt',
 'data/enron1/spam/3990.2005-03-07.GP.spam.txt',
 'data/enron1/spam/1417.2004-06-25.GP.spam.txt',
 'data/enron1/spam/4111.2005-03-20.GP.spam.txt']

In [20]:
len(message_paths)

5172

In [21]:
# The list which eventually contains all the parsed Enron `spam` and `ham` messages
messages: List[Message] = []

In [22]:
# Open every file individually, turn it into a `Message` and append it to our `messages` list
for path in message_paths:
    with open(path, errors='ignore') as file:
        is_spam: bool = True if 'spam' in path else False
        # We're only interested in the subject for the time being
        text: str = file.readline().replace('Subject:', '').strip()
        messages.append(Message(text, is_spam))

In [23]:
shuffle(messages)
messages[:5]

[Message(text='hpl nom for may 16 , 2001', is_spam=False),
 Message(text='performance review - mid - year', is_spam=False),
 Message(text='re : chevron - winter', is_spam=False),
 Message(text='lst rev feb . 2000 josey ranch nom', is_spam=False),
 Message(text="she ' ll enjoy this .", is_spam=True)]

In [24]:
len(messages)

5172

In [25]:
# Given a string, normalize and extract all words with length greater than 2
def tokenize(text: str) -> Set[str]:
    words: List[str] = []
    for word in re.findall(r'[A-Za-z0-9\']+', text):
        if len(word) >= 2:
            words.append(word.lower())
    return set(words)

assert tokenize('Is this a text? If so, Tokenize this text!...') == {'is', 'this', 'text', 'if', 'so', 'tokenize'}

In [26]:
assert tokenize('Is this a text? If so, Tokenize this text!...') == {'is', 'this', 'text', 'if', 'so', 'tokenize', 'is'}

In [27]:
tokenize(messages[0].text)

{'16', '2001', 'for', 'hpl', 'may', 'nom'}

In [28]:
# Split the list of messages into a `train` and `test` set (defaults to 80/20 train/test split)
def train_test_split(messages: List[Message], pct=0.8) -> Tuple[List[Message], List[Message]]:
    shuffle(messages)
    num_train = int(round(len(messages) * pct, 0))
    return messages[:num_train], messages[num_train:]

assert len(train_test_split(messages)[0]) + len(train_test_split(messages)[1]) == len(messages)

# Model

In [29]:
# The Naive Bayes classifier
class NaiveBayes:
    def __init__(self, k=1) -> None:
        # `k` is the smoothening factor
        self._k: int = k
        self._num_spam_messages: int = 0
        self._num_ham_messages: int = 0
        self._num_word_in_spam: Dict[int] = defaultdict(int)
        self._num_word_in_ham: Dict[int] = defaultdict(int)
        self._spam_words: Set[str] = set()
        self._ham_words: Set[str] = set()
        self._words: Set[str] = set()

    # Iterate through the given messages and gather the necessary statistics
    def train(self, messages: List[Message]) -> None:
        msg: Message
        token: str
        for msg in messages:
            tokens: Set[str] = tokenize(msg.text)
            self._words.update(tokens)
            if msg.is_spam:
                self._num_spam_messages += 1
                self._spam_words.update(tokens)
                for token in tokens:
                    self._num_word_in_spam[token] += 1
            else:
                self._num_ham_messages += 1
                self._ham_words.update(tokens)
                for token in tokens:
                    self._num_word_in_ham[token] += 1

    # Probability of `word` being spam
    def _p_word_spam(self, word: str) -> float:
        return (self._k + self._num_word_in_spam[word]) / ((2 * self._k) + self._num_spam_messages)

    # Probability of `word` being ham
    def _p_word_ham(self, word: str) -> float:
        return (self._k + self._num_word_in_ham[word]) / ((2 * self._k) + self._num_ham_messages)

    # Given a `text`, how likely is it spam?
    def predict(self, text: str) -> float:
        text_words: Set[str] = tokenize(text)
        log_p_spam: float = 0.0
        log_p_ham: float = 0.0

        for word in self._words:
            p_spam: float = self._p_word_spam(word)
            p_ham: float = self._p_word_ham(word)
            if word in text_words:
                log_p_spam += log(p_spam)
                log_p_ham += log(p_ham)
            else:
                log_p_spam += log(1 - p_spam)
                log_p_ham += log(1 - p_ham)

        p_if_spam: float = exp(log_p_spam)
        p_if_ham: float = exp(log_p_ham)
        return p_if_spam / (p_if_spam + p_if_ham)



##  Test

In [38]:
#-----------------------------#
# Tests
def test_naive_bayes():
    messages: List[Message] = [
        Message('Spam message', is_spam=True),
        Message('Ham message', is_spam=False),
        Message('Ham message about Spam', is_spam=False)]


    nb: NaiveBayes = NaiveBayes()
    nb.train(messages)

    assert nb._num_spam_messages == 1
    assert nb._num_ham_messages == 2
    assert nb._spam_words == {'spam', 'message'}
    assert nb._ham_words == {'ham', 'message', 'about', 'spam'}
    assert nb._num_word_in_spam == {'spam': 1, 'message': 1}
    assert nb._num_word_in_ham == {'ham': 2, 'message': 2, 'about': 1, 'spam': 1}
    assert nb._words == {'spam', 'message', 'ham', 'about'}

    # Our test message
    text: str = 'A spam message'

    # Reminder: The `_words` we iterater over are: {'spam', 'message', 'ham', 'about'}

    # Calculate how spammy the `text` might be
    p_if_spam: float = exp(sum([
        log(     (1 + 1) / ((2 * 1) + 1)),  # `spam` (also in `text`)
        log(     (1 + 1) / ((2 * 1) + 1)),  # `message` (also in `text`)
        log(1 - ((1 + 0) / ((2 * 1) + 1))), # `ham` (NOT in `text`)
        log(1 - ((1 + 0) / ((2 * 1) + 1))), # `about` (NOT in `text`)
    ]))

    # Calculate how hammy the `text` might be
    p_if_ham: float = exp(sum([
        log(     (1 + 1)  / ((2 * 1) + 2)),  # `spam` (also in `text`)
        log(     (1 + 2)  / ((2 * 1) + 2)),  # `message` (also in `text`)
        log(1 - ((1 + 2)  / ((2 * 1) + 2))), # `ham` (NOT in `text`)
        log(1 - ((1 + 1)  / ((2 * 1) + 2))), # `about` (NOT in `text`)
    ]))

    p_spam: float = p_if_spam / (p_if_spam + p_if_ham)

    assert p_spam == nb.predict(text)

test_naive_bayes()

[Message(text='Spam message', is_spam=True), Message(text='Ham message', is_spam=False), Message(text='Ham message about Spam', is_spam=False)]


## Note
-

In [32]:
train: List[Message]
test: List[Message]

# Splitting our Enron messages into a `train` and `test` set
train, test = train_test_split(messages)

In [33]:
# Train our Naive Bayes classifier with the `train` set
nb: NaiveBayes = NaiveBayes()
nb.train(train)

print(f'Spam messages in training data: {nb._num_spam_messages}')
print(f'Ham messages in training data: {nb._num_ham_messages}')
print(f'Most spammy words: {Counter(nb._num_word_in_spam).most_common(20)}')

Spam messages in training data: 1181
Ham messages in training data: 2957
Most spammy words: [('you', 106), ('your', 101), ('the', 92), ('for', 92), ('to', 85), ('re', 69), ('and', 56), ('on', 53), ('is', 46), ('get', 45), ('in', 44), ('of', 42), ('this', 39), ('software', 36), ('all', 35), ('from', 33), ('online', 33), ('with', 33), ('it', 32), ('at', 31)]


In [40]:
# Grabbing all the spam messages from our `test` set
spam_messages: List[Message] = [item for item in test if item.is_spam]
spam_messages[:5]

[Message(text='fwd : this is is porn money - -', is_spam=True),
 Message(text='hey ,', is_spam=True),
 Message(text='date a new wild babe tonight', is_spam=True),
 Message(text='', is_spam=True),
 Message(text='re : no more injections', is_spam=True)]

In [43]:
len(spam_messages)

319

# Evalutation

In [48]:
# Using our trained Naive Bayes classifier to classify a spam message
message= spam_messages[10]
print(message)
print(f'Predicting likelihood of "{message.text}" being spam.')
nb.predict(message.text)

Message(text='you can stay rock hard , longer', is_spam=True)
Predicting likelihood of "you can stay rock hard , longer" being spam.


0.9999797915954709

In [49]:
# Grabbing all the ham messages from our `test` set
ham_messages: List[Message] = [item for item in test if not item.is_spam]
ham_messages[:5]

[Message(text='mobil beaumont', is_spam=False),
 Message(text='epgt', is_spam=False),
 Message(text='neon lesson # 5', is_spam=False),
 Message(text='point change for deals', is_spam=False),
 Message(text='new nomination', is_spam=False)]

In [52]:
# Using our trained Naive Bayes classifier to classify a ham message
message = ham_messages[]
print(message)
print(f'Predicting likelihood of "{message.text}" being spam.')
nb.predict(message.text)

Message(text='from your clerk . its been great !', is_spam=False)
Predicting likelihood of "from your clerk . its been great !" being spam.


0.9934977992916378

In [56]:
from sklearn.metrics import confusion_matrix, classification_report
from math import exp

def predict_spam(model, test_data):
    """
    Predicts spam/ham for test data based on a probability threshold of 0.5.
    """
    predictions = []
    for message in test_data:
        # Probability from model's prediction
        probability = model.predict(message.text)
        # Determine spam/ham: Spam if probability > 0.5, Ham otherwise
        is_spam = probability > 0.5
        predictions.append(is_spam)
    return predictions

# Assuming 'test' is a list of Message objects with `text` and `is_spam` attributes
# Generate predictions
predictions = predict_spam(nb, test)

# Extract actual labels
actual_labels = [message.is_spam for message in test]

# Evaluate performance
conf_matrix = confusion_matrix(actual_labels, predictions)
print("Confusion Matrix:")
print(conf_matrix)

# Classification Report: Includes F1, Recall, and Precision
print("\nClassification Report:")
print(classification_report(actual_labels, predictions, target_names=["Ham", "Spam"]))

# Manually calculate metrics
true_positive = conf_matrix[1, 1]  # Spam correctly classified as Spam
false_positive = conf_matrix[0, 1]  # Ham incorrectly classified as Spam
false_negative = conf_matrix[1, 0]  # Spam incorrectly classified as Ham
true_negative = conf_matrix[0, 0]  # Ham correctly classified as Ham

precision = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0
recall = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print("\nManual Metrics:")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1-Score: {f1_score:.2f}")


Confusion Matrix:
[[694  21]
 [100 219]]

Classification Report:
              precision    recall  f1-score   support

         Ham       0.87      0.97      0.92       715
        Spam       0.91      0.69      0.78       319

    accuracy                           0.88      1034
   macro avg       0.89      0.83      0.85      1034
weighted avg       0.89      0.88      0.88      1034


Manual Metrics:
Precision: 0.91
Recall: 0.69
F1-Score: 0.78
