<a href="https://colab.research.google.com/github/Yasir323/ML-algorithms-from-scratch/blob/main/Naive_Bayes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Some basic understanding

The key to Naive Bayes is making the (big) assumption that the presences (or absences) of each word are independent of one another, conditional on a message being spam or not. Intuitively, this assumption means that knowing whether a certain spam message contains the word bitcoin gives you no information about whether that same message contains the word rolex. In math terms, this means that:

$ P(X_1=x_1,..., X_n=x_n|S) = P(X_1=x_1|S)*...*P(X_n=x_n|S) $

This is an extreme assumption. (There’s a reason the technique has naive in its name.) Imagine that our vocabulary consists only of the words bitcoin and rolex, and that half of all spam messages are for “earn bitcoin” and that the other half are for “authentic rolex.” In this case, the Naive Bayes estimate that a spam message contains both bitcoin and rolex is:

$P (X_1 = 1, X_2 = 1|S) = P (X_1 = 1|S) P (X_2 = 1|S) =0.5 × 0.5 = 0.25 $

since we’ve assumed away the knowledge that bitcoin and rolex actually never occur together. Despite the unrealisticness of this assumption, this model often performs well and has historically been used in actual spam filters.

In practice, you usually want to avoid multiplying lots of probabilities together, to prevent a problem called underflow, in which computers don’t deal well with floating-point numbers that are too close to 0. Recalling from algebra that $log(ab) = log a + log b and that exp(log x) = x $, we usually compute $p_1 * ⋯ * p_n $ as the equivalent (but floating-point-friendlier):

$ exp(log (p_1) + ⋯ + log (p_n)) $

Imagine that in our training set the vocabulary word data only occurs in nonspam messages. Then we’d estimate P(data|S) = 0. The result is that our Naive Bayes classifier would always assign spam probability 0 to any message containing the word data, even a message like “data on free bitcoin and authentic rolex watches.” To avoid this problem, we usually use some
kind of **smoothing**.

In particular, we’ll choose a pseudocount—k—and estimate the probability of seeing the ith word in a spam message as:

$ P (X_i|S) = (k + number of spams containing w_i) / (2k + number of spams) $

In [1]:
from typing import Set, NamedTuple, List, Tuple, Dict, Iterable
import re
import math
from collections import defaultdict

In [2]:
def tokenize(text: str) -> Set[str]:
  """Tokenize the text

  This function will first convert the text to lower case,
  then it'll find all the words and numbers and then it'll
  return the distinct words only.
  """

  text = text.lower()  # Convert to lower case
  all_words = re.findall("[a-z0-9']+", text)  # Extract the words
  return set(all_words)  # Remove duplicates

In [3]:
assert tokenize("Data Science is Science") == {"data", "science", "is"}

In [4]:
#We'll define a type for our training data
class Message(NamedTuple):
  text: str
  is_spam: bool

Next, we’ll give it a method to train it on a bunch of messages. First, we increment the spam_messages and ham_messages counts. Then we tokenize each messagetext, and for each token we increment the token_spam_counts or token_ham_counts based on the message type:

Ultimately we’ll want to predict P(spam | token). As we saw earlier, to apply
Bayes’s theorem we need to know P(token | spam) and P(token | ham) for each
token in the vocabulary. 

In [5]:
# As our classifier needs to keep track of tokens, counts, and labels from the training
# data, we’ll make it a class.
class NaiveBayesClassifier:
  
  def __init__(self, k: float=0.5) -> None:
    self.k = k  # Smoothening Factor
    self.tokens: Set[str] = set()
    self.token_spam_counts: Dict[str, int] = defaultdict(int)
    self.token_ham_counts: Dict[str, int] = defaultdict(int)
    self.spam_messages = self.ham_messages = 0

  def train(self, messages: Iterable[Message]) -> None:
    for message in messages:
      # Increment message counts
      if message.is_spam:
        self.spam_messages += 1
      else:
        self.ham_messages += 1
      
      # Increment word count
      for token in tokenize(message.text):
        self.tokens.add(token)
        if message.is_spam:
          self.token_spam_counts[token] += 1
        else:
          self.token_ham_counts[token] += 1

  def fit(self, X: List[str], y: List[int]) -> None:
    for message, label in zip(X, y):
      # Increment message counts
      if label == 1:
        self.spam_messages += 1
      elif label == 0:
        self.ham_messages += 1
      
      # Increment word count
      for token in tokenize(message):
        self.tokens.add(token)
        if label == 1:
          self.token_spam_counts[token] += 1
        elif label == 0:
          self.token_ham_counts[token] += 1

  def _probabilities(self, token: str) -> Tuple[float, float]:
    """returns P(token|spam) and P(token|ham)"""
    spam = self.token_spam_counts[token]
    ham = self.token_ham_counts[token]
    p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
    p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)
    return p_token_spam, p_token_ham

  def predict(self, text: str) -> float:
    text_tokens = tokenize(text)
    log_prob_if_spam = log_prob_if_ham = 0.0
    # Iterate through each word in out vocab
    for token in self.tokens:
      prob_if_spam, prob_if_ham = self._probabilities(token)
      # If token appears in the message
      # add the log probability of seeing it
      if token in text_tokens:
        log_prob_if_spam += math.log(prob_if_spam)
        log_prob_if_ham += math.log(prob_if_ham)
      # Otherwise add the log probability of _not_ seeing it,
      # which is log(1 - probability of seeing it)
      else:
        log_prob_if_spam += math.log(1.0 - prob_if_spam)
        log_prob_if_ham += math.log(1.0 - prob_if_ham)
    
    prob_if_spam = math.exp(log_prob_if_spam)
    prob_if_ham = math.exp(log_prob_if_ham)
    return prob_if_spam / (prob_if_spam + prob_if_ham)

## Testing

In [6]:
messages = [
      Message("spam rules", is_spam=True),
      Message("ham rules", is_spam=False),
      Message("hello ham", is_spam=False)
]
model = NaiveBayesClassifier(k=0.5)
model.train(messages)

In [7]:
model.tokens == {"spam", "ham", "rules", "hello"}

True

In [8]:
model.spam_messages == 1

True

In [9]:
model.ham_messages == 2

True

In [10]:
model.token_spam_counts == {"spam": 1, "rules": 1}

True

In [11]:
model.token_ham_counts == {"ham": 2, "rules": 1, "hello": 1}

True

In [12]:
text = "hello spam"
probs_if_spam = [
    (1 + 0.5) / (1 + 2 * 0.5),  # "spam" (present)
    1 - (0 + 0.5) / (1 + 2 * 0.5),  # "ham" (not present)
    1 - (1 + 0.5) / (1 + 2 * 0.5),  # "rules" (not present)
    (0 + 0.5) / (1 + 2 * 0.5)  # "hello" (present)
]

In [13]:
probs_if_ham = [
(0 + 0.5) / (2 + 2 * 0.5),  # "spam" (present)
1 - (2 + 0.5) / (2 + 2 * 0.5),  # "ham" (not present)
1 - (1 + 0.5) / (2 + 2 * 0.5),  # "rules" (not present)
(1 + 0.5) / (2 + 2 * 0.5),  # "hello" (present)
]

In [14]:
p_if_spam = math.exp(sum(math.log(p) for p in probs_if_spam))
p_if_ham = math.exp(sum(math.log(p) for p in probs_if_ham))

In [15]:
model.predict(text)

0.8350515463917525

In [16]:
model.predict(text) == p_if_spam / (p_if_spam + p_if_ham)

True

This test passes, so it seems like our model is doing what we think it is. If you look
at the actual probabilities, the two big drivers are that our message contains spam
(which our lone training spam message did) and that it doesn’t contain ham (which
both our training ham messages did).

## Using Our Model

A popular (if somewhat old) dataset is the SpamAssassin public corpus. We’ll look
at the files prefixed with 20021010.

In [17]:
from io import BytesIO  # So we can treat bytes as a file.
import requests  # To download the files, which
import tarfile  # are in .tar.bz format.

In [18]:
BASE_URL = "https://spamassassin.apache.org/old/publiccorpus"
FILES = [
    "20021010_easy_ham.tar.bz2",
    "20021010_hard_ham.tar.bz2",
    "20021010_spam.tar.bz2"
]
OUTPUT_DIR = 'spam_data'

In [19]:
for file in FILES:
  # Use requests to get the file contents at each URL.
  content = requests.get(f"{BASE_URL}/{file}").content
  # Wrap the in-memory bytes so we can use them as a "file."
  fin = BytesIO(content)
  # And extract all the files to the specified output dir.
  with tarfile.open(fileobj=fin, mode='r:bz2') as tf:
    tf.extractall(OUTPUT_DIR)

After downloading the data you should have three folders: spam, easy_ham, and hard_ham. Each folder contains many emails, each contained in a single file. To keep things really simple, we’ll just look at the subject lines of each email.

In [20]:
! ls spam_data

easy_ham  hard_ham  spam


How do we identify the subject line? When we look through the files, they all seem to start with “Subject:”. So we’ll look for that:

In [21]:
import glob

In [22]:
path = 'spam_data/*/*'
data: List[Message] = []

In [23]:
# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
  # Boolean mask to set is_spam by looking at the filename
  # Since the filename contains the word 'spam' or 'ham'
  is_spam = "ham" not in filename
  # There are some garbage characters in the emails; the errors='ignore'
  # skips them instead of raising an exception.
  with open(filename, errors='ignore') as email_file:
    for line in email_file:
      if line.startswith("Subject:"):
        subject = line.lstrip("Subject: ")
        data.append(Message(subject, is_spam))
        break

In [24]:
import random
from typing import TypeVar
X = TypeVar('X')  # generic type to represent a data point

In [25]:
def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
    """Split data into fractions [prob, 1 - prob]"""
    data = data[:]                    # Make a shallow copy
    random.shuffle(data)              # because shuffle modifies the list.
    cut = int(len(data) * prob)       # Use prob to find a cutoff
    return data[:cut], data[cut:]     # and split the shuffled list there.

In [26]:
random.seed(0)
train_messages, test_messages = split_data(data, 0.75)
model = NaiveBayesClassifier()
model.train(train_messages)

Let’s generate some predictions and check how our model does:

In [27]:
from collections import Counter

In [28]:
predictions = [(message, model.predict(message.text)) for message in test_messages]

In [29]:
# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5) for message, spam_probability in predictions)
print(confusion_matrix)

Counter({(False, False): 675, (True, True): 85, (True, False): 44, (False, True): 21})


In [30]:
tn = confusion_matrix[(False, False)]
tp = confusion_matrix[(True, True)]
fp = confusion_matrix[(False, True)]
fn = confusion_matrix[(True, False)]

In [31]:
precision = tp / (tp + fp)
recall = tp / (tp + fn)
print(f"Precision: {round(precision * 100, 2)}%")
print(f"Recall: {round(recall * 100, 2)}%")

Precision: 80.19%
Recall: 65.89%


We can also inspect the model’s innards to see which words are least and most
indicative of spam:

In [32]:
def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
  # We probably shouldn't call private methods, but it's for a good cause.
  prob_if_spam, prob_if_ham = model._probabilities(token)
  return prob_if_spam / (prob_if_spam + prob_if_ham)
words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

In [33]:
print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

spammiest_words ['norton', 'per', 'mortgage', 'clearance', 'adv', 'sale', 'systemworks', 'only', 'rates', 'money']
hammiest_words ['spambayes', 'users', 'razor', 'zzzzteana', 'apt', 'sadev', 'ouch', 'perl', 'bliss', 'selling']


How could we get better performance? One obvious way would be to get more data to train on. There are a number of ways to improve the model as well. Here are some possibilities that you might try:

* Look at the message content, not just the subject line. You’ll have to be careful how you deal with the message headers.

* Our classifier takes into account every word that appears in the training set, even words that appear only once. Modify the classifier to accept an optional min_count threshold and ignore tokens that don’t appear at least that many times.

* The tokenizer has no notion of similar words (e.g., cheap and cheapest). Modify the classifier to take an optional stemmer function that converts words to equivalence classes of words. For example, a really simple stemmer function might be:
```
def drop_final_s(word):
  return re.sub("s$", "", word)
```

* Although our features are all of the form “message contains word wi,” there’s no reason why this has to be the case. In our implementation, we could add extra features like “message contains a number” by creating phony tokens like contains:number and modifying the tokenizer to emit them when appropriate.