In [1]:
from nltk.stem import PorterStemmer
from collections import Counter
from typing import Dict, List
import numpy as np
import pandas as pd
from collections import defaultdict
from tqdm import tqdm

In [2]:
df = pd.read_csv('./data/enron_spam_data.csv')
df.dropna(inplace=True)

folders = ['ham', 'spam']

emails = []
labels = []

ps = PorterStemmer()

for index, row in tqdm(df.iterrows(), total=len(df), desc="Processing emails"):
    words = row['Message'].split()
    email = Counter(words)
    
    emails.append(email)
    labels.append(1 if row['Spam/Ham'] == 'spam' else 0)

Processing emails: 100%|██████████| 33107/33107 [00:02<00:00, 16190.74it/s]


In [11]:

class Vectorizer:
    def __init__(self, dictionaries: List[Dict[str, int]], labels: List[int]):
        # Step 1: Count in how many messages each word appears
        token_doc_count = defaultdict(int)
        for d in dictionaries:
            for word in d.keys():
                token_doc_count[word] += 1

        # Step 2: Only include tokens that appear in at least 5 messages
        candidate_tokens = [word for word, count in token_doc_count.items() if count >= 5]
        
        self.word_means = {}
        for word in candidate_tokens:
            self.word_means[word] = token_doc_count[word] / len(dictionaries)

        self.vocab = {}
        
        # Step 3: Rank the tokens by their information gain
        N = len(labels)
        label_array = np.array(labels)
        p_pos = np.mean(label_array)
        p_neg = 1 - p_pos

        def entropy(p):
            return -p * np.log2(p) if p > 0 else 0

        base_entropy = entropy(p_pos) + entropy(p_neg)

        # Build document-term matrix for fast access (binary presence)
        dtm = {word: np.zeros(N, dtype=bool) for word in candidate_tokens}
        for i, doc in enumerate(dictionaries):
            for word in doc.keys():
                if word in dtm:
                    dtm[word][i] = True

        info_gains = {}
        for word in tqdm(candidate_tokens, desc="Calculating Info Gain"):
            present = dtm[word]
            absent = ~present

            n_present = np.sum(present)
            n_absent = N - n_present

            if n_present == 0 or n_absent == 0:
                info_gains[word] = 0
                continue

            p_pos_given_present = np.mean(label_array[present]) if n_present > 0 else 0
            p_pos_given_absent = np.mean(label_array[absent]) if n_absent > 0 else 0

            H_present = entropy(p_pos_given_present) + entropy(1 - p_pos_given_present)
            H_absent = entropy(p_pos_given_absent) + entropy(1 - p_pos_given_absent)

            cond_entropy = (n_present / N) * H_present + (n_absent / N) * H_absent
            info_gain = base_entropy - cond_entropy
            info_gains[word] = info_gain

        # Sort and assign vocab
        sorted_tokens = sorted(info_gains.items(), key=lambda x: -x[1])
        for i, (word, _) in enumerate(sorted_tokens):
            self.vocab[word] = i
        
    def transform(self, X: Dict[str, int]) -> np.ndarray:
        vec = np.empty(len(self.vocab), dtype=float)
        
        total_number_of_tokens = sum(X.values())
        
        for word, count in X.items():
            if word in self.vocab:
                # term frequency vectorization
                # vec[self.vocab[word]] = count
                
                # boolean vectorization
                # vec[self.vocab[word]] = 1
                
                # normalized term frequency vectorization
                # vec[self.vocab[word]] = count / total_number_of_tokens
                
                # standardized term frequency
                if np.std(self.word_means[word]) != 0:
                    vec[self.vocab[word]] = (count - self.word_means[word]) / np.std(self.word_means[word])
                else:
                    vec[self.vocab[word]] = 0  # If std is 0, set to 0 to maintain zero mean
        return vec

# Instantiate the vectorizer
vectorizer = Vectorizer(emails, labels)

# Track progress while transforming emails
X = np.array([vectorizer.transform(email) for email in tqdm(emails, desc="Vectorizing emails")])
Y = np.array(labels)

print("Saving Outputs")

Calculating Info Gain: 100%|██████████| 42957/42957 [00:06<00:00, 6688.59it/s]
Vectorizing emails: 100%|██████████| 33107/33107 [00:44<00:00, 742.09it/s] 


Saving Outputs


In [14]:
np.save('X.npy', X)
np.save('Y.npy', Y)