In [1]:
import numpy as np
from collections import defaultdict, Counter
import re
import math
from sklearn.model_selection import train_test_split

class MarkovClassifier:
    def __init__(self, epsilon=1e-10):
        # Initialize dictionaries to store transition and probability information
        self.spam_transitions = defaultdict(lambda: defaultdict(int))
        self.ham_transitions = defaultdict(lambda: defaultdict(int))
        self.spam_initial = defaultdict(float)
        self.ham_initial = defaultdict(float)

        self.spam_vocab = set()
        self.ham_vocab = set()

        self.epsilon = epsilon

        self.log_spam_prior = 0
        self.log_ham_prior = 0

    def preprocess(self, text):
        """Preprocess the text by converting to lowercase and removing unwanted characters."""
        text = text.lower()
        text = re.sub(r'[^\w\s!?$]', ' ', text)
        return text.split()

    def fit(self, X_train, y_train):
        """Train the Markov Chain model on the training data."""
        # Count the occurrences of words and transitions for spam and ham
        spam_word_counts = Counter()
        ham_word_counts = Counter()
        spam_transition_counts = Counter()
        ham_transition_counts = Counter()

        # Count the number of messages in each class (spam and ham)
        num_spam = 0
        num_ham = 0

        # Loop over all training messages and their corresponding labels
        for message, label in zip(X_train, y_train):
            words = self.preprocess(message)
            if label == 'spam':
                num_spam += 1
                self._count_word_transitions(words, spam_word_counts, spam_transition_counts)
            else:
                num_ham += 1
                self._count_word_transitions(words, ham_word_counts, ham_transition_counts)

        # Calculate priors P(spam) and P(ham)
        total_messages = len(X_train)
        self.log_spam_prior = math.log(num_spam / total_messages) if num_spam > 0 else float('-inf')
        self.log_ham_prior = math.log(num_ham / total_messages) if num_ham > 0 else float('-inf')

        # Compute initial state probabilities for each class
        self.spam_initial = self._compute_initial_probabilities(spam_word_counts, num_spam)
        self.ham_initial = self._compute_initial_probabilities(ham_word_counts, num_ham)

        # Compute transition matrices for each class
        self.spam_transitions = self._compute_transition_matrix(spam_word_counts, spam_transition_counts)
        self.ham_transitions = self._compute_transition_matrix(ham_word_counts, ham_transition_counts)

    def _count_word_transitions(self, words, word_counts, transition_counts):
        """Helper function to count words and transitions."""
        prev_word = None
        for word in words:
            word_counts[word] += 1
            if prev_word:
                transition_counts[(prev_word, word)] += 1
            prev_word = word

    def _compute_initial_probabilities(self, word_counts, num_messages):
        """Compute initial state probabilities P(word | class)."""
        total_words = sum(word_counts.values())
        initial_probs = {word: (count + self.epsilon) / (total_words + self.epsilon * len(word_counts))
                         for word, count in word_counts.items()}
        return initial_probs

    def _compute_transition_matrix(self, word_counts, transition_counts):
        """Compute log-probabilities of word transitions P(word_{i+1} | word_i)."""
        M = len(word_counts)  # Vocabulary size
        log_transition_matrix = {}

        for i in word_counts:
            count_i = word_counts[i]
            for j in word_counts:
                count_ij = transition_counts[(i, j)] + self.epsilon
                A_ij = (count_ij + self.epsilon) / (count_i + self.epsilon)
                log_transition_matrix[(i, j)] = math.log(A_ij)

        return log_transition_matrix

    def predict(self, message):
        """Predict if a message is spam or ham based on log-probabilities."""
        words = self.preprocess(message)

        # Log-probabilities for P(M|spam) and P(M|ham)
        log_p_spam = self.log_spam_prior
        log_p_ham = self.log_ham_prior

        # Calculate log P(M|spam) and log P(M|ham)
        prev_word_spam = prev_word_ham = None
        for i, word in enumerate(words):
            # For the first word, use the initial probabilities
            if i == 0:
                log_p_spam += math.log(self.spam_initial.get(word, self.epsilon))
                log_p_ham += math.log(self.ham_initial.get(word, self.epsilon))
            else:
                # For subsequent words, use transition probabilities
                log_p_spam += self.spam_transitions.get((prev_word_spam, word), self.epsilon)
                log_p_ham += self.ham_transitions.get((prev_word_ham, word), self.epsilon)

            prev_word_spam = prev_word_ham = word

        # Return the class with higher log-probability
        return 'spam' if log_p_spam > log_p_ham else 'ham'

    def evaluate(self, X_test, y_test):
        """Evaluate the accuracy of the classifier on the test data."""
        correct_predictions = sum(self.predict(message) == true_label for message, true_label in zip(X_test, y_test))
        return correct_predictions / len(X_test)


# Example usage
if __name__ == '__main__':
    import pandas as pd

    !gdown 1j4lz577s-oSTOs6aMCBq_0etEafd7ybb
    # Load dataset
    df = pd.read_csv('spam.csv', encoding='latin-1')

    # Preprocess and split the data
    X = df['v2'].values
    y = df['v1'].values
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Initialize and train classifier
    classifier = MarkovClassifier()
    classifier.fit(X_train, y_train)

    # Evaluate accuracy on the test set
    accuracy = classifier.evaluate(X_test, y_test)
    print(f'Accuracy: {accuracy:.4f}')


Downloading...
From: https://drive.google.com/uc?id=1j4lz577s-oSTOs6aMCBq_0etEafd7ybb
To: /content/spam.csv
  0% 0.00/504k [00:00<?, ?B/s]100% 504k/504k [00:00<00:00, 34.4MB/s]
Accuracy: 0.5309
