In [101]:
import math
from typing import List, NewType
from collections import Counter, defaultdict

# Using Welford's algorithm for efficient Mean and Variance calculation
# Reference: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
class FeatureStats:
    __slots__ = ["n_features", "n", "means", "M2s"]
    def __init__(self, n_features: int):
        self.n_features = n_features
        self.n = 0
        self.means = [0] * n_features
        self.M2s= [0] * n_features

    def update(self, xs: List[float]):
        self.n += 1
        for idx, x in enumerate(xs):
            delta = x - self.means[idx]
            self.means[idx] += delta / self.n
            delta2 = x - self.means[idx]
            self.M2s[idx] += delta * delta2

    def logprob(self, xs: List[float]) -> float:
        ret = 0
        for idx in range(self.n_features):
            x = xs[idx]
            mean = self.means[idx]
            M2 = self.M2s[idx]
            if self.n <= 1:
                var = 1e-6
            else:
                var = max(M2 / (self.n - 1), 1e-6)
            logp = -0.5 * (math.log(2 * math.pi * var) + (x - mean) ** 2 / var)
            ret += logp
        return ret

class GaussianNaiveBayesClassifierModel:
    def __init__(self, X_train: List[List[float]], classes: List[int]):
        class_counts = Counter(classes)
        self.class_probs = {
            cls: cnt / len(classes)
            for cls, cnt in class_counts.items()
        }
        self.n_features = len(X_train[0])
        self.feature_probs = defaultdict(lambda: FeatureStats(self.n_features))
        for idx in range(len(X_train)):
            features = X_train[idx]
            cls = classes[idx]
            self.feature_probs[cls].update(features)

    def predict_cls_single(self, features: List[float]) -> int:
        posteriors = {}
        for cls, p in self.class_probs.items():
            posterior = math.log(p)
            posterior += self.feature_probs[cls].logprob(features)
            posteriors[cls] = posterior
        return max(posteriors, key=posteriors.get)
        
    def predict_cls(self, X: List[List[float]]) -> List[int]:
        return [self.predict_cls_single(features) for features in X]

In [102]:
from collections import OrderedDict
from typing import Tuple

class BagOfWords:
    def __init__(self, stopwords=None):
        self.stopwords = stopwords or set()
        self.stopwords = set(self.stopwords)
        self.vocabs = OrderedDict()

    def update(self, *texts: Tuple[str]):
        for text in texts:
            for word in text.split():
                if word in self.stopwords:
                    continue
                if word not in self.vocabs:
                    self.vocabs[word] = len(self.vocabs)

    def vectorize(self, *texts: Tuple[str]) -> List[int]:
        ret = []
        for text in texts:
            vector = [0] * len(self.vocabs)
            for word, cnt in Counter(text.split()).items():
                if word in self.stopwords:
                    continue
                if not word in self.vocabs:
                    continue
                vector[self.vocabs[word]] = cnt
            ret.append(vector)
        return ret

In [34]:
import tarfile as tf
from glob import glob

enron_files = glob("dataset/enron/enron*.tar.gz")
print(enron_files)

texts = []
classes = []

for enron_file in enron_files:
    with tf.open(enron_file) as fp:
        for member in fp.getmembers():
            if not member.isfile():
                continue
            cls = int(member.name.split("/")[1] == "spam")
            text = (
                fp.extractfile(member)
                .read()
                .decode("latin-1")
                .strip()
                .lower()
            )
            texts.append(text)
            classes.append(cls)
    print(f"Done reading from {enron_file}")

print(f"{len(texts)=}")
print(f"{len(classes)=}")

['dataset/enron/enron2.tar.gz', 'dataset/enron/enron6.tar.gz', 'dataset/enron/enron4.tar.gz', 'dataset/enron/enron1.tar.gz', 'dataset/enron/enron3.tar.gz', 'dataset/enron/enron5.tar.gz']
Done reading from dataset/enron/enron2.tar.gz
Done reading from dataset/enron/enron6.tar.gz
Done reading from dataset/enron/enron4.tar.gz
Done reading from dataset/enron/enron1.tar.gz
Done reading from dataset/enron/enron3.tar.gz
Done reading from dataset/enron/enron5.tar.gz
len(texts)=33722
len(classes)=33722


In [35]:
Counter(classes)

Counter({1: 17171, 0: 16551})

In [36]:
import re
import string
from collections import Counter

def clean_text(text):
    text = text.lower()
    # Remove emails, URLs, numbers
    text = re.sub(r'\S+@\S+', '', text)  # Remove emails
    text = re.sub(r'http\S+', '', text)   # Remove URLs
    text = re.sub(r'\d+', '', text)       # Remove numbers
    
    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # Remove extra whitespace
    text = ' '.join(text.split())
    
    return text

# Common English stop words
STOP_WORDS = set(open("./dataset/english_stopwords.txt").readlines())

def remove_stopwords(text):
    return ' '.join(word for word in text.split() if word not in STOP_WORDS)

def filter_by_length(text, min_len=3, max_len=15):
    words = text.split()
    filtered = [word for word in words if min_len <= len(word) <= max_len]
    return ' '.join(filtered)

def preprocess_for_vocab_reduction(texts, min_len=3, max_len=15):
    # Step 1: Clean and preprocess
    processed_texts = []
    for text in texts:
        # Clean
        text = clean_text(text)
        # Remove stop words
        text = remove_stopwords(text)
        # Filter by length
        text = filter_by_length(text, min_len, max_len)
        processed_texts.append(text)
    return processed_texts

In [37]:
def train_test_split(X, y, split=0.8):
    assert len(X) == len(y), f"Size of X and y must be same, got: {len(X)=}, {len(y)=}"
    n = len(X)
    train_size = math.ceil(n*split)
    return X[:train_size], y[:train_size], X[train_size:], y[train_size:]

In [98]:
texts_cleaned = preprocess_for_vocab_reduction(texts, min_len=3, max_len=7)

In [99]:
X_train, y_train, X_test, y_test = train_test_split(texts_cleaned, classes)
print(f"{len(X_train)=}")
print(f"{len(y_train)=}")
print(f"{len(X_test)=}")
print(f"{len(y_test)=}")

len(X_train)=26978
len(y_train)=26978
len(X_test)=6744
len(y_test)=6744


In [103]:
bow = BagOfWords()
bow.update(*X_train)
len(bow.vocabs)

70833

In [104]:
X_train_vectorized = bow.vectorize(*X_train)
X_test_vectorized = bow.vectorize(*X_test)

In [105]:
model = GaussianNaiveBayesClassifierModel(X_train_vectorized, y_train)
model

<__main__.GaussianNaiveBayesClassifierModel at 0x11060aec0>

In [108]:
model.predict_cls(X_test_vectorized[:20])

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [109]:
y_test[:20]

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [110]:
model.predict_cls(X_test_vectorized[-25:])

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0]

In [111]:
y_test[-25:]

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0]