In [7]:
import sys

sys.path.append(r"Data_Science")

# Spam Filter (Simple Implementation)

In [6]:
from typing import Set
import re

def tokenise(text: str) -> Set[str]:
    text = text.lower()
    all_words = re.findall("[a-z0-9']+", text)
    return set(all_words)

print(tokenise("Data Science is science"))

{'data', 'is', 'science'}


In [2]:
from typing import NamedTuple

class Message(NamedTuple):
    text: str
    is_spam: bool

    

In [18]:
from typing import List, Tuple, Dict, Iterable
import math
from collections import defaultdict

class NaiveBayerClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k
        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_message = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            if message.is_spam:
                self.spam_message += 1
            else:
                self.ham_messages += 1
            
            for token in tokenise(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

    def _probabilities(self, token: str) -> Tuple[float, float]:
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        p_token_spam = (spam + self.k) / (self.spam_message + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham
    
    def predict(self, text:str) -> float:
        text_tokens = tokenise(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        for token in self.tokens:
            prob_if_spam, prob_if_ham = self._probabilities(token)
            
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)
            # if the word is not in the trained tokens, both spam and ham counts
            # will be 0 which mean that p_token_ham and p_token_spam will also be low
            # 1 - prob_if_spam will be closer to 1 which mean log( 1- prob_if_spam) will 
            # be closer to 0 reducing the relevance of the word on the final result
            else:
                log_prob_if_spam += math.log(1 - prob_if_spam)
                log_prob_if_ham += math.log(1 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam/(prob_if_spam + prob_if_ham)

In [23]:
messages = [Message("spam rules", is_spam = True),
            Message("ham rules", is_spam = False),
            Message("hello ham", is_spam = False)]

model = NaiveBayerClassifier(k = 0.5)
model.train(messages)

print(model.tokens)
print(model.spam_message)
print(model.ham_messages)
print(model.token_spam_counts)
print(model.token_ham_counts)

text = "spam"

prob_if_spam = model.predict(text)

print(prob_if_spam)

{'spam', 'rules', 'ham', 'hello'}
1
2
defaultdict(<class 'int'>, {'spam': 1, 'rules': 1})
defaultdict(<class 'int'>, {'rules': 1, 'ham': 2, 'hello': 1})
0.9382239382239382


In [24]:
from io import BytesIO # So we can treat bytes as a file.
import requests # To download the files, which
import tarfile # are in .tar.bz format.

BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = ["20021010_easy_ham.tar.bz2",
"20021010_hard_ham.tar.bz2",
"20021010_spam.tar.bz2"]

# This is where the data will end up,
# in /spam, /easy_ham, and /hard_ham subdirectories.
# Change this to where you want the data.
OUTPUT_DIR = 'spam_data'

for filename in FILES:
    # Use requests to get the file contents at each URL.
    content = requests.get(f"{BASE_URL}/{filename}").content
    # Wrap the in-memory bytes so we can use them as a "file."
    fin = BytesIO(content)
    # And extract all the files to the specified output dir.
    with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
        tf.extractall(OUTPUT_DIR)

In [25]:
import glob, re

# Modify this to point to where you put the files
path = 'spam_data/*/*'

data: List[Message] = []

for filename in glob.glob(path):
    is_spam = "ham" not in filename

    with open(filename, errors = 'ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject:")
                data.append(Message(subject, is_spam))
                break

In [None]:
import random
import library.MachineLearning as ml

random.seed(0)

train_data, test_data = ml.split_data(data, 0.75)

model = NaiveBayerClassifier()
model.train(train_data)

In [29]:
from collections import Counter

predictions = [(message, model.predict(message.text)) 
                for message in test_data]

# Assume that spam prob > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_prob > 0.5) 
                           for message, spam_prob in predictions )

print(confusion_matrix)


Counter({(False, False): 669, (True, True): 86, (True, False): 40, (False, True): 30})


In [None]:
def p_spam_given_token(token: str, model: NaiveBayerClassifier) -> float:
    prob_if_spam, prob_if_ham = model._probabilities(token)

    return prob_if_spam/(prob_if_ham + prob_if_spam)

words = sorted(model.tokens, key = lambda t: p_spam_given_token(t, model))

print("spammiest words", words[-10:])
print("hammiest words", words[:10])