## Naive Bayes
<br>
<b>Problem:</b> Is it possible to enhance the score metrics of a Default Naive Bayes Algorithm?
<br>

In [1]:
import pandas as pd
import os
from pathlib import Path
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from collections import defaultdict

# Initial Model (Not my model!)

In [8]:
os.getcwd()

'C:\\Users\\adilson.junior\\Desktop\\Pós\\Trabalho 02'

In [9]:
from typing import TypeVar, List, Tuple, Dict, Iterable, NamedTuple, Set
from collections import defaultdict, Counter
import re
import random
import math

X = TypeVar('X')  # generic type to represent a data point

def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
    """Split data into fractions [prob, 1 - prob]"""
    data = data[:]                    # Make a shallow copy
    random.shuffle(data)              # because shuffle modifies the list.
    cut = int(len(data) * prob)       # Use prob to find a cutoff
    return data[:cut], data[cut:]     # and split the shuffled list there.

def tokenize(text: str) -> Set[str]:
    text = text.lower()                         # Convert to lowercase,
    all_words = re.findall("[a-z0-9']+", text)  # extract the words, and
    return set(all_words)                       # remove duplicates.

assert tokenize("Data Science is science") == {"data", "science", "is"}

class Message(NamedTuple):
    text: str
    is_spam: bool

class NaiveBayesClassifier:
    
    def __init__(self, k: float = 0.5) -> None:
        self.k = k  # smoothing factor

        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1

                    
    def probabilities(self, token: str) -> Tuple[float, float]:
        """returns P(token | spam) and P(token | not spam)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]

        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0

        # Iterate through each word in our vocabulary.
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self.probabilities(token)

            # If *token* appears in the message,
            # add the log probability of seeing it;
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # otherwise add the log probability of _not_ seeing it
            # which is log(1 - probability of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        return prob_if_spam / (prob_if_spam + prob_if_ham)

    
########################################################################################################################   
    
#def main():
import glob

# modify the path to wherever you've put the files
path = r'C:/Users/adilson.junior/Desktop/Pós/Trabalho 02/emails/*/*'

data: List[Message] = []

# glob.glob returns every filename that matches the wildcarded path
for filename in glob.glob(path):
    is_spam = "ham" not in filename

    # There are some garbage characters in the emails, the errors='ignore'
    # skips them instead of raising an exception.
    with open(filename, errors='ignore') as email_file:
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")
                data.append(Message(subject, is_spam))
                break  # done with this file
                
train_messages, test_messages = split_data(data, 0.75)

model = NaiveBayesClassifier()
model.train(train_messages)

predictions = [(message, model.predict(message.text))
               for message in test_messages]

# Assume that spam_probability > 0.5 corresponds to spam prediction
# and count the combinations of (actual is_spam, predicted is_spam)
confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                           for message, spam_probability in predictions)

print(confusion_matrix)

def p_spam_given_token(token: str, model: NaiveBayesClassifier) -> float:
    prob_if_spam, prob_if_ham = model.probabilities(token)

    return prob_if_spam / (prob_if_spam + prob_if_ham)

words = sorted(model.tokens, key=lambda t: p_spam_given_token(t, model))

print("spammiest_words", words[-10:])
print("hammiest_words", words[:10])

Counter({(False, False): 683, (True, True): 71, (True, False): 45, (False, True): 26})
spammiest_words ['attn', 'zzzz', 'clearance', 'per', 'money', 'only', 'sale', 'systemworks', 'adv', 'rates']
hammiest_words ['satalk', 'spambayes', 'users', 'razor', 'sadev', 'zzzzteana', 'ouch', 'apt', 'perl', 'bliss']


# New Model (now that's my code)

In [12]:
%%time

#def main():
import glob

# função para buscar texto do body
def substring_after(s, delim):
    return s.partition(delim)[2]

# Path teste

#path = r'C:/Users/adilson.junior/Desktop/Pós/Trabalho 02/emails/easy_ham/0001.ea7e79d3153e7469e7a9c3e0af6a357e'

path = r'C:/Users/adilson.junior/Desktop/Pós/Trabalho 02/emails/*/*'

data: List[Message] = []

for filename in glob.glob(path):
    
    stop_list = set(stopwords.words("english"))
    is_spam = "ham" not in filename

    with open(filename, errors='ignore') as email_file:      
        for line in email_file:
            if line.startswith("Subject:"):
                subject = line.lstrip("Subject: ")                  
                break  
                
    with open(filename, errors='ignore') as email_file:    
        string = email_file.read()
        string = substring_after(string,"Message-ID: ")
        
    with open(filename, errors='ignore') as email_file:    
        string_x = email_file.read()
        string_x = substring_after(string_x,"Content-Transfer-Encoding: ")
    
    # Limpeza dos caracteres especiais subject
    subject = subject.replace('Re: ','').replace('\n', '')
    subject = re.sub('[^\w\s]','',subject)
    
    string = string.replace('\n', '')
    string = re.sub('[^\w\s]','',string)
    
    string_x = string_x.replace('\n', '')
    string_x = re.sub('[^\w\s]','',string_x)   
    
    message_concat = subject + string + string_x
    
    filtered_message = ' '.join([word for word in message_concat.split() if word not in stop_list and len(word) <= 15])
        
    data.append(Message(filtered_message, is_spam))

CPU times: total: 4.22 s
Wall time: 4.28 s


In [13]:
from typing import TypeVar, List, Tuple, Dict, Iterable, NamedTuple, Set
from collections import defaultdict, Counter
import re
import random
import math

X = TypeVar('X')  # generic type to represent a data point

def split_data(data: List[X], prob: float) -> Tuple[List[X], List[X]]:
    """Split data into fractions [prob, 1 - prob]"""
    data = data[:]                    # Make a shallow copy
    random.shuffle(data)              # because shuffle modifies the list.
    cut = int(len(data) * prob)       # Use prob to find a cutoff
    return data[:cut], data[cut:]     # and split the shuffled list there.

def tokenize(text: str) -> Set[str]:
    text = text.lower()                         # Convert to lowercase,
    all_words = re.findall("[a-z0-9']+", text)  # extract the words, and
    return set(all_words)                       # remove duplicates.

assert tokenize("Data Science is science") == {"data", "science", "is"}

class Message(NamedTuple):
    text: str
    is_spam: bool

class NaiveBayesClassifier:
    def __init__(self, k: float = 0.5) -> None:
        self.k = k  # smoothing factor

        self.tokens: Set[str] = set()
        self.token_spam_counts: Dict[str, int] = defaultdict(int)
        self.token_ham_counts: Dict[str, int] = defaultdict(int)
        self.spam_messages = self.ham_messages = 0

    def train(self, messages: Iterable[Message]) -> None:
        for message in messages:
            # Increment message counts
            if message.is_spam:
                self.spam_messages += 1
            else:
                self.ham_messages += 1

            # Increment word counts
            for token in tokenize(message.text):
                self.tokens.add(token)
                if message.is_spam:
                    self.token_spam_counts[token] += 1
                else:
                    self.token_ham_counts[token] += 1
                    
        # Condição de filtrar palavras com 3 ou menos aparições
        self.token_spam_counts = defaultdict(int,{key: frequency for key, frequency in self.token_spam_counts.items() if frequency >= 5})
        self.token_ham_counts = defaultdict(int,{key: frequency for key, frequency in self.token_ham_counts.items() if frequency >= 5})
        

    def probabilities(self, token: str) -> Tuple[float, float]:
        """returns P(token | spam) and P(token | not spam)"""
        spam = self.token_spam_counts[token]
        ham = self.token_ham_counts[token]
        
        # Probabilidade da palavra aparecer dentro do set de spam messages
        p_token_spam = (spam + self.k) / (self.spam_messages + 2 * self.k)
        
        # Probabilidade da palavra aparecer dentro do set de ham  messages
        p_token_ham = (ham + self.k) / (self.ham_messages + 2 * self.k)

        return p_token_spam, p_token_ham

    def predict(self, text: str) -> float:
        
        text_tokens = tokenize(text)
        log_prob_if_spam = log_prob_if_ham = 0.0
        
        # Iterate through each word in our vocabulary.
        # Primeiro Itera sob a lista de tokens salvos e suas probabilidades de estar em spam e ham
        for token in self.tokens:
            prob_if_spam, prob_if_ham = self.probabilities(token)

            # If *token* appears in the message,
            # add the log probability of seeing it;
            # Depois faz o comparativo com os dados das mensagens de teste
            if token in text_tokens:
                log_prob_if_spam += math.log(prob_if_spam)
                log_prob_if_ham += math.log(prob_if_ham)

            # otherwise add the log probability of _not_ seeing it
            # which is log(1 - probability of seeing it)
            else:
                log_prob_if_spam += math.log(1.0 - prob_if_spam)
                log_prob_if_ham += math.log(1.0 - prob_if_ham)

        prob_if_spam = math.exp(log_prob_if_spam)
        prob_if_ham = math.exp(log_prob_if_ham)
        
        try:
            result = prob_if_spam / (prob_if_spam + prob_if_ham)
        except:
            result = 0.5
        
        return result

In [14]:
train_messages, test_messages = split_data(data, 0.50)

model = NaiveBayesClassifier(k=0.5)
model.train(train_messages)

predictions = [(message, model.predict(message.text))
               for message in test_messages]

confusion_matrix = Counter((message.is_spam, spam_probability > 0.5)
                           for message, spam_probability in predictions)

print(confusion_matrix)

Counter({(False, False): 1397, (True, False): 193, (True, True): 58, (False, True): 3})


# Final Analysis

In [20]:
print('First Model')

Counter({(False, False): 683, (True, True): 71, (True, False): 45, (False, True): 26})

Primeiro Modelo


Counter({(False, False): 683,
         (True, True): 71,
         (True, False): 45,
         (False, True): 26})

In [21]:
print('Updated Model')

Counter({(False, False): 1397, (True, False): 193, (True, True): 58, (False, True): 3})

Modelo Atualizado


Counter({(False, False): 1397,
         (True, False): 193,
         (True, True): 58,
         (False, True): 3})

## Considering the above upgrades:

    1. Tokenizing not only the email subject but also the email BODY
    2. Filters words with 3 or less occurrences (min_count)
    3. Implementation of stop_words to avoid noises on the analysis
    4. At the end - apply a try/except as a 50% classificator to avoid zero division problem
    
## Results

    Even though the new model is able to identify more spams (2x comparing to the old model) que amount of False positives     also increased by 3 times. As it is more important for the data safety to avoid Spam than to classify a healthy email       as Spam, we can considered the updates satisfactory