In [25]:
import re
import zipfile
import csv
import pandas as pd
from random import shuffle
from math import exp, log
from collections import defaultdict, Counter
from typing import List, Set, Tuple, TypedDict

In [26]:
# Unzip dataset
with zipfile.ZipFile("data\\dataset.zip", 'r') as zip_ref:
    zip_ref.extractall("data\\")

In [27]:
class Message(TypedDict):
    text: str
    spam: bytes

In [28]:
messages: List[Message] = []
msgs: List[str] = []
spams: List[int] = []

# Take data input from csv and format them to Message class
def transform(dict_, typed_dict) -> dict:
    fields = typed_dict.__annotations__

    for name, value in dict_.items():
        if name == "text":
            if value == "":
                pass
            else:
                msgs.append(value.lstrip())
                
        elif name == "spam":
            if value == "":
                pass
            else:
                spams.append(value)

In [29]:
# read dataset
data = "data/emails.csv"

with open(data, encoding="utf8", newline='') as file:
    for i, row in enumerate(csv.DictReader(file), 1):
        transform(row, Message)

    messages = list(zip(msgs, spams))

In [30]:
# Normalize input string and extract words with length >= 3
def tokenize(text: str) -> Set[str]:
    words: List[str] = []
    for word in re.findall(r'[A-Za-z0-9\']+', text):
        if len(word) >= 3:
            if word != "Subject":
                words.append(word.lower())
    return set(words)

In [31]:
# Split dataset into train and test sets (80/20)
def train_test_split(messages: List[Message], pct=0.8) -> Tuple[List[Message], List[Message]]:
    shuffle(messages)
    num_train = int(round(len(messages) * pct, 0))
    return messages[:num_train], messages[num_train:]

In [32]:
class NaiveBayes:
    def __init__(self) -> None:
        # `k` is the smoothening factor
        self._k: int = 1
        self._num_spam_messages: int = 0
        self._num_ham_messages: int = 0
        self._num_word_in_spam: Dict[int] = defaultdict(int)
        self._num_word_in_ham: Dict[int] = defaultdict(int)
        self._spam_words: Set[str] = set()
        self._ham_words: Set[str] = set()
        self._words: Set[str] = set()

    # Iterate through the messages and gather the necessary numbers
    def train(self, messages: List[Message]) -> None:
        msg: Message
        token: str
        for msg in messages:
            tokens: Set[str] = tokenize(msg[0])
            self._words.update(tokens)
            if msg[1] == '1':
                self._num_spam_messages += 1
                self._spam_words.update(tokens)
                for token in tokens:
                    self._num_word_in_spam[token] += 1
            else:
                self._num_ham_messages += 1
                self._ham_words.update(tokens)
                for token in tokens:
                    self._num_word_in_ham[token] += 1                
    
    # Probability of a word being spam
    def word_spam_percent(self, word: str) -> float:
        return (self._k + self._num_word_in_spam[word]) / ((2 * self._k) + self._num_spam_messages)
    
    # Probability of a word being ham
    def word_ham_percent(self, word: str) -> float:
        return (self._k + self._num_word_in_ham[word]) / ((2 * self._k) + self._num_ham_messages)
    
    # Predict input message if it's spam
    def predict(self, text: str) -> float:
        text_words: Set[str] = tokenize(text)
        log_p_spam: float = 0.0
        log_p_ham: float = 0.0

        for word in self._words:
            p_spam: float = self.word_spam_percent(word)
            p_ham: float = self.word_ham_percent(word)
            if word in text_words:
                log_p_spam += log(p_spam)
                log_p_ham += log(p_ham)
            else:
                log_p_spam += log(1 - p_spam)
                log_p_ham += log(1 - p_ham)

        if_spam: float = exp(log_p_spam)
        if_ham: float = exp(log_p_ham)
        sum = if_spam + if_ham
        if sum == 0:
            sum = 0.1

        return round(if_spam / sum, 6)


In [33]:
# Split train/test from the dataset
train: List[Message]
test: List[Message]

train, test = train_test_split(messages)
print(len(train), len(test))

4581 1145


In [34]:
# Train the model
nb = NaiveBayes()
nb.train(train)

print(f'Spam messages in training data: {nb._num_spam_messages}')
print(f'Ham messages in training data: {nb._num_ham_messages}')
print(f'Most spam words: {Counter(nb._num_word_in_spam).most_common(20)}')

Spam messages in training data: 1103
Ham messages in training data: 3478
Most spam words: [('the', 865), ('and', 805), ('you', 787), ('your', 772), ('for', 726), ('this', 553), ('with', 522), ('our', 508), ('have', 504), ('not', 493), ('are', 490), ('from', 480), ('that', 429), ('here', 427), ('will', 398), ('all', 386), ('com', 367), ('more', 335), ('http', 321), ('now', 310)]


In [35]:
# Manual test
# spam: 1, 3
emails = [
    "Hey, Jill!\nIt's great to have you as part of our 8 Ball Pool family!\nHere's a quick tip we hope you'll find useful:\nPlay using Facebook login & reap rewards like...\n5 FREE Pool Cash at time of login\nFree Coins every hour\nPlay on multiple devices\nPlay with and challenge your friends\nFree Gifts\nHere's a little gift to help you along your 8 Ball Pool journey. Click the button below to collect now!",
    "It was nice meeting you earlier. It would be great to be able to see you again.",
    "Hello\nAre you tired? Are you exhausted after a long day at work? Signup to receive a free spa at testurl.com"
]

for email in emails:
    print(f"Probability: {nb.predict(email)}")

Probability: 0.993719
Probability: 1e-06
Probability: 0.974627


In [None]:
# Predict messages from test set
spam_prob: float = 0.0
spam_count: int = 0
spam_predict_count: int = 0

for msg in test:
    prob: float = nb.predict(msg[0])

    # Count spam messages
    if(msg[1] == "1"):
        spam_count += 1

    # Return the prediction of all the test message

    if(prob >= 0.4):
        spam_prob += prob
        spam_predict_count += 1

print(f"spam count: {spam_count}")
print(f"spam predicted: {spam_predict_count}")
print(f"Accuracy of spam prediction: {round((spam_prob / spam_count) * 100, 3)}\n")
