# T4 : SMS Spam Detector

Semester 2221, CSEC 520/620, Team 4\
Assignment 1 - SMS Spam Detector\
Due Sep 15, 2022 11:59 PM EST\
Accounts for 12% of total grade.

> **ANTHONY REWRITE**\
> Don't submit this one.

## Description

Welcome to Team 4's SMS Spam Detector. This assignment's goal is to examine both k-NN and Naive Bayes classifiers for determining whether an SMS message is spam or not spam. We will provide some performance metrics in our analysis to hopefully determine which is more appropriate for this type of classification.

## Requirements

- Python 3
- Must have the `SMSSpamCollection` file in the same directory as this file. 

To start, we must import the various modules and libraries that we will depend on during execution.

In [None]:
import copy
from collections import Counter
import itertools
import math
import random
import re

## Utility Methods

Similarly to the import statements, we utilize the below utility methods across our models/notebook. This includes detagging tokens, calculating and printing metrics, etc. Run the code block in this section to ensure our utility methods are defined.

In [None]:
def detag_tokens(tokens):
    """
    Detags a tagged tokens list. Removes the first element of each child list.

    :param tokens: Two dimensional list, with the first element of each child list being a tag ham/spam.
    :return: Detagged tokens list.
    """
    # Copy of tokens, without the ham/spam tag.
    detagged_tokens = copy.deepcopy(tokens)
    detagged_tokens = [token[1:] for token in detagged_tokens]

    return detagged_tokens

In [None]:
def separate_tags(tokens):
    """
    Separates the list of tagged tokens based on the tags ham/spam.

    :param tokens: Array containing arrays of individual words. The first word in each array must be either "ham" or "spam".
    :return: Dictionary object containing separated "ham" and "spam" sets.
    """
    tokens = copy.deepcopy(tokens)

    separated_set = {"ham": list(filter(lambda token: token[0] == "ham", tokens)),
                     "spam": list(filter(lambda token: token[0] == "spam", tokens))}
    return separated_set

In [None]:
def print_metrics(dictionary, is_percentage=True):
    """
    Prints metrics from a dictionary, metric name is the key and metric result is the value.

    :param dictionary: Dictionary containing metric name and metric value.
    :param is_percentage: Flag that determines if dictionary metric values will be printed as a percentage.
    :return: None
    """

    if is_percentage:
        for metric in dictionary:
            print(f'{metric + ":":>20} {dictionary[metric]:024.20%}')
    else:
        for metric in dictionary:
            print(f'{metric + ":":>20} {dictionary[metric]}')

In [None]:
def calculate_metrics(tp, fp, tn, fn, print_results=True, print_only_percentages=True, title=None):
    """
    Calculates classification performance metrics. Can also handle printing of metrics.

    :param tp: Number of true positive predictions.
    :param fp: Number of false positive predictions.
    :param tn: Number of true negative predictions.
    :param fn: Number of false negative predictions.
    :param print_results: Flag that determines whether results will be printed. True by default.
    :param print_only_percentages: Flag that determines whether only percentages, and not raw numbers will be printed. True by default.
    :param title: Title of classifier the calculated metrics belong to. None by default.
    :return: Dictionary of calculated metrics.
    """
    # Calculate metrics
    accuracy = (tp + tn) / (tp + fp + tn + fn)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    # Bundle metrics into dictionary
    base_metrics = {"True Positive": tp, "False Positive": fp, "True Negative": tn, "False Negative": fn}
    additional_metrics = {"Accuracy": accuracy, "Precision": precision, "Recall": recall, "F1-Score": f1_score}
    result_metrics = {**base_metrics, **additional_metrics}

    if print_results:
        # Heading
        header = " Resulting Performance Metrics "
        print("-" * 45)
        print(header.center(45, "-"))
        if title: print(title.center(len(header), " ").center(45, "-"))
        print("-" * 45)

        # TP, FP, TN, and FN Numbers
        if not print_only_percentages:
            print_metrics(base_metrics, False)

        # Calculate TP, FP, TN, and FN Rates
        total_predictions = sum(base_metrics.values())
        rates = {}
        for metric in base_metrics:
            rates[metric + " Rate"] = base_metrics[metric] / total_predictions

        # Print All Percentages
        print_metrics(rates)
        print("-----".center(45, " "))
        print_metrics(additional_metrics)

    # Return dictionary of metrics
    return result_metrics

## Tokenizing

Our model begins by performing tokenization on the dataset. This takes every line of the file and essentially separates and sanitizes each word.

In [None]:
def tokenize(filename, print_info=False):
    """
    Performs sanitization and then tokenization on the given file.

    :param filename: The name or path of the file that contains the data to perform tokenization on.
    :param print_info: Flag that determines whether information will be printed. False by default.
    :return: An array of sanitized tokens derived from the data housed in the given file.
    """
    file = open(filename, 'r')
    lines = [line for line in file]

    # First, convert special characters into spaces
    clean_lines = [re.sub('\W+', ' ', line) for line in lines]

    # Second, separate each word in each line while also ensuring lowercase
    tokens = [line.lower().split() for line in clean_lines]

    # Print information
    if print_info:
        print(f'{"Lines:":>18} {lines}')
        print(f'{"Cleaned Lines:":>18} {clean_lines}')
        print(f'{"Tokens:":>18} {tokens}')

    # Return sanitized tokens
    return tokens

In [None]:
def split_dataset(og_list, percent_train=0.8):
    """
    Splits the original dataset into the training and testing sets. Testing set allocation size is derived from the given training set percentage. Token selection is performed randomly.

    :param og_list: The original set to split into training and testing sets.
    :param percent_train: The percentage, in decimal form, of the original set to allocate towards training data.
    :return: Dictionary object containing allocated "train" and "test" sets.
    """
    # Get the total number of tokens in the original list
    og_total = len(og_list)

    # Setup and determine training and testing set allocation
    percent_test = round(og_total * (1 - percent_train))
    train_set = copy.deepcopy(og_list)
    test_set = []

    # Fill up the testing set's allocated size by randomly choosing a token from the training set and moving it to the testing set
    while percent_test > 0:
        selected_token = random.choice(train_set)
        test_set.append(selected_token)
        train_set.remove(selected_token)
        percent_test -= 1

    return {"train": train_set, "test": test_set}

Execute the code block below to perform tokenization and then split the tokens into training and testing sets.

In [None]:
# Perform tokenization on the "SMSSpamCollection" file
tagged_tokens = tokenize('SMSSpamCollection')

# Generate dictionary containing "train" and "test" sets.
dataset = split_dataset(tagged_tokens)

Now that our data has been successfully tokenized and split, we can move on to classification.

## k Nearest Neighbors

### TF-IDF Methods

In [None]:
def calculate_idf(corpus):
    """
    Calculate the inverse document frequency of each term across all documents. This measures the importance of the term.

    :param corpus: A list of documents.
    :return: Dictionary containing a unique term as the key, and its idf as the value.
    """
    # Flatten list (2D -> 1D); list of terms
    flat_corpus = list(itertools.chain.from_iterable(corpus))

    # Count how many times a term occurs across corpus, while also removing duplicates
    counted_terms = Counter(flat_corpus)

    # Calculate Inverse Document Frequency
    document_count = len(corpus)
    idf_equation = lambda terms, term: math.log(document_count / terms[term])
    # idf_equation = lambda terms, term: math.log((document_count + 1) / (terms[term] + 1)) + 1 # Consider adding 1 to each?
    idf = {term: idf_equation(counted_terms, term) for term in counted_terms}

    # Return the IDFs
    return idf

In [None]:
def calculate_tf(document):
    """
    Calculate the term frequency of each term inside the document. This measures how frequently a term occurs in a document.

    :param document: A document containing terms.
    :return: A dictionary generated from the provided document, key being the term and value being the term frequency.
    """

    # Count how many times a term occurs in the document
    counted_document = Counter(document)

    # Calculate Term Frequency
    tf_equation = lambda doc, term: doc[term] / sum(doc.values())
    tf = {term: tf_equation(counted_document, term) for term in counted_document}

    # Return the TFs
    return tf

In [None]:
def calculate_tf_idf(document, idf_values):

    # Calculate TF-IDF
    # For each term in the document, take the term's TF and multiply by the term's IDF.
    get_idf = lambda term: idf_values.get(term) if idf_values.get(term) is not None else 0
    return {term: document[term] * get_idf(term) for term in document}

### K-NN Training and Classification

In [None]:
def knn_training(corpus):
    # Remove the ham/spam tags from the corpus.
    corpus = detag_tokens(corpus)

    # Calculate the IDF of each word across the corpus
    idf_values = calculate_idf(corpus)

    # Calculate the TFs for each document across the corpus
    tf_values = [calculate_tf(document) for document in corpus]

    # Calculate TF-IDF for each term in each document
    tf_idf_values = [calculate_tf_idf(document, idf_values) for document in tf_values]

    return idf_values, tf_idf_values


test = [['ham', 'how', 'is', 'your', 'day', 'going', 'thomas'],
        ['ham', 'i', 'am', 'happy', 'that', 'things', 'are', 'well'],
        ['spam', 'give', 'me', 'all', 'of', 'your', 'money', 'now']]
corpus_idf, corpus_tf_idf = knn_training(test)

In [None]:
def calculate_ed(q, p):
    print("q - Train Tf-IDF", q)
    print("p - New TF-IDF", p)

    # Some garbage implementation because I'm too tired to make this better atm
    dictnew = {}

    for value in q:
        if p.get(value) is not None:
            dictnew[value] = q.get(value)

    for value in p:
        if q.get(value) is not None:
            if dictnew.get(value) is not None:
                dictnew[value] = (dictnew.get(value) - q.get(value)) ** 2

    print(dictnew)
    distance = sum(dictnew.values())

    # distance = 0
    # for i in range(len(q)):
    #     distance += (q[i] - p[i]) ** 2

    return math.sqrt(distance)

In [None]:
def knn_classifier(message, idf_values, tf_idf_values):

    # Calculate the TF for the message
    msg_tf = calculate_tf(message)
    # print(msg_tf)

    # Calculate TF-IDF for the message
    msg_tf_idf = calculate_tf_idf(msg_tf, idf_values)
    # print(msg_tf_idf)

    print(calculate_ed(tf_idf_values[0], msg_tf_idf))

    # Compute euclidean between each entry in the tf_idf_values and the msg_tf_idf
    # for value in tf_idf_values:
    #     calculate_ed(value, message)

new_msg = ['i', 'want', 'all', 'of', 'your', 'money']
knn_classifier(new_msg, corpus_idf, corpus_tf_idf)


## Naive Bayes Classification

In [None]:
def naive_bayes_training(ham, spam):
    """
    Performs preliminary naive bayes calculations on the ham and spam set.

    :param ham: Array containing the ham set.
    :param spam: Array containing the spam set.
    :return: Percentages and calculates that will be used by the naive bayes classifier.
    """
    hamWordCounts = {}
    spamWordCounts = {}
    hamTotal = 0
    spamTotal = 0

    for msg in ham:
        for word in msg:
            hamTotal += 1
            if word in hamWordCounts:
                hamWordCounts[word] = hamWordCounts[word] + 1
            else:
                hamWordCounts[word] = 1

    for msg in spam:
        for word in msg:
            spamTotal += 1
            if word in spamWordCounts:
                spamWordCounts[word] = spamWordCounts[word] + 1
            else:
                spamWordCounts[word] = 1

    addNum = 0
    for word in hamWordCounts:
        if word not in spamWordCounts:
            if addNum != 1:
                addNum = 1
                hamTotal *= 2
                spamTotal *= 2
            spamWordCounts[word] = 0

    for word in spamWordCounts:
        if word not in hamWordCounts:
            hamWordCounts[word] = 0
            if addNum != 1:
                addNum = 1
                hamTotal *= 2
                spamTotal *= 2

    hamWPerc = {}
    for key in hamWordCounts:
        hamWordCounts[key] = hamWordCounts[key] + addNum
        hamWPerc[key] = hamWordCounts[key] / hamTotal

    spamWPerc = {}
    for key in spamWordCounts:
        spamWordCounts[key] = spamWordCounts[key] + addNum
        spamWPerc[key] = spamWordCounts[key] / spamTotal

    initHam = hamTotal / (hamTotal + spamTotal)
    initSpam = spamTotal / (hamTotal + spamTotal)

    return hamWPerc, spamWPerc, initHam, initSpam


separated_tokens = separate_tags(tagged_tokens)
hamWPerc, spamWPerc, initHam, initSpam = naive_bayes_training(separated_tokens.get("ham"), separated_tokens.get("spam"))

### Naive Bayes Classifier

This function performs the classificaiton on new unseen data. Essentially an "intelligent guessing" machine using our previosuly calculated metrics.

In [None]:
def naive_bayes_classifier(msg):
    """
    Performs naive bayes classification to determine whether a provided message is spam or ham.

    :param msg: A message to perform classification on.
    :return: The classification result "ham" or "spam".
    """
    ham_prob = math.log(initHam)
    spam_prob = math.log(initSpam)

    for word in msg:
        if word != msg[0] and (word != "ham" or word != "spam"):
            if word in hamWPerc:
                ham_prob += math.log(hamWPerc[word])

    for word in msg:
        if word != msg[0] and (word != "ham" or word != "spam"):
            if word in spamWPerc:
                spam_prob += math.log(spamWPerc[word])

    if ham_prob > spam_prob:
        return "ham"
    elif spam_prob > ham_prob:
        return "spam"

In [None]:
def naive_bayes_testing(test_set):
    """
    Performs naive bayes classification on the test set, and compares the results against their actual values.

    :param test_set: Test set that the model hasn't been trained on.
    :return: None
    """
    # Values to calculate rates later
    tpn = 0
    tnn = 0
    fpn = 0
    fnn = 0

    for msg in test_set:
        guess = naive_bayes_classifier(msg)
        if msg[0] == 'ham':
            if guess == 'ham':
                tnn += 1
            elif guess == 'spam':
                fpn += 1
        elif msg[0] == 'spam':
            if guess == 'ham':
                fnn += 1
            elif guess == 'spam':
                tpn += 1

    calculate_metrics(tpn, fpn, tnn, fnn, title="Naive Bayes")

naive_bayes_testing(dataset.get('test'))

Once we are finished executing it is best practice to flush our changes and unmount our personal Google Drive.

In [None]:
drive.flush_and_unmount()