# Project 2: NB Classifier

### Course: CS 5420

### Author: Cooper Wooley

In [19]:
import tarfile
import tempfile
import shutil
import os
import atexit
import re
from collections import defaultdict, Counter
import random
import math

### Helper Functions for Managing Dataset

In [20]:
def extract_dataset(zip_path):
    # Create a temporary directory to extract into
    temp_dir = tempfile.mkdtemp(prefix="dataset_")

    # Extract contents
    with tarfile.open(zip_path, 'r:gz') as tar_ref:
        tar_ref.extractall(temp_dir)

    # Register cleanup handler so even if program crashes, data is removed
    atexit.register(lambda: cleanup_dataset(temp_dir))

    # Find the first subdirectory inside extracted directory
    contents = [os.path.join(temp_dir, d) for d in os.listdir(temp_dir)]
    subdirs = [d for d in contents if os.path.isdir(d)]

    if len(subdirs) == 1:
        data_root = subdirs[0]
    else:
        data_root = temp_dir # fallback if already data root

    return data_root

def cleanup_dataset(directory):
    if os.path.exists(directory):
        shutil.rmtree(directory)
        print(f"Cleaned up dataset directory: {directory}")

### Split Data

In [21]:
tar_path = "20_newsgroups.tar.gz"
extracted_path = extract_dataset(tar_path)
print(f"Dataset extracted to : {extracted_path}")

def split_dataset(base_dir, train_ratio=0.5, seed=42):
    random.seed(seed)

    train_files = []
    test_files = []
    train_labels = []
    test_labels = []

    for d in os.listdir(base_dir):
        d_path = os.path.join(base_dir, d)
        if not os.path.isdir(d_path):
            continue

        files = [
            os.path.join(d_path, f)
            for f in os.listdir(d_path)
            if os.path.isfile(os.path.join(d_path, f))
        ]

        random.shuffle(files)
        split_index = int(len(files) * train_ratio)

        train_files.extend(files[:split_index])
        test_files.extend(files[split_index:])
        train_labels.extend([d] * split_index)
        test_labels.extend([d] * (len(files) - split_index))

    return train_files, test_files, train_labels, test_labels # train_X, test_Y, train_Y, test_Y

  tar_ref.extractall(temp_dir)


Dataset extracted to : C:\Users\2003v\AppData\Local\Temp\dataset_qc_wwiax\20_newsgroups


## NB Classifier

### Training

In [22]:
def train_naive_bayes(train_files, train_labels, laplace_smooth=False):
    vocab = set()
    word_counts = defaultdict(Counter) # class: {word: count}
    classes = set(train_labels)
    total_docs = len(train_labels)

    for path, label in zip(train_files, train_labels):
        with open(path, 'r', errors='ignore') as f:
            # Tokenize
            words = re.findall(r'\b\w+\b', f.read().lower())
            vocab.update(w for w in words)
            word_counts[label].update(words)

    # Compute P(Y)
    priors = {}
    classes = set(train_labels)
    for cls in classes:
        priors[cls] = train_labels.count(cls) / total_docs

    # Compute P(X|Y)
    likelihoods = {}

    for cls, words in word_counts.items():
        total_words = sum(words.values())
        class_likelihoods = {}
        for word, count in words.items():
            class_likelihoods[word] = (count + int(laplace_smooth)) / (total_words + (int(laplace_smooth) * len(vocab)))
        likelihoods[cls] = class_likelihoods
        
    return priors, likelihoods, len(vocab), laplace_smooth, {cls: sum(words.values()) for cls, words in word_counts.items()}

### Predicting

In [23]:
def predict(text, priors, likelihoods, log_prob=False, vocab_size=None, laplace_smooth=False, word_counts=None):
    with open(text, 'r', errors='ignore') as f:
        words = re.findall(r'\b\w+\b', f.read().lower())

    score = {}
    for cls, _ in priors.items():
        score[cls] = math.log(priors[cls]) if log_prob else priors[cls]

        for word in words:
            if word in likelihoods[cls]:
                if log_prob:
                    score[cls] += math.log(likelihoods[cls][word])
                else:
                    score[cls] *= likelihoods[cls][word]
            else:
                # Handle unseen words with laplace smoothing
                if laplace_smooth:
                    if log_prob:
                        score[cls] += math.log(1 / (word_counts[cls] + vocab_size))
                    else:
                        score[cls] *= 1 / (word_counts[cls] + vocab_size)

    return max(score, key=score.get)

### Evaluation

In [24]:
def evaluate(test_files, test_labels, priors, likelihoods, log_prob=False, vocab_size=None, laplace_smooth=False, word_counts=None):
    correct = 0
    for file, y in zip(test_files, test_labels):
        y_hat = predict(file, priors, likelihoods, log_prob, vocab_size, laplace_smooth, word_counts)
        if y_hat == y:
            correct += 1
    return correct / len(test_files)

In [25]:
train_files, test_files, train_labels, test_labels = split_dataset(extracted_path)

priors, likelihoods, _, _, _ = train_naive_bayes(train_files, train_labels)

accuracy_base = evaluate(test_files, test_labels, priors, likelihoods)
print(f"Accuracy: {accuracy_base:.4f}")

Accuracy: 0.0446


## Expanding NB BoW Classifier

Below is the implementation of the NB BoW Classifier utilizing Laplace smoothing when computing likelihoods, using log probability when classifying, and filtering words that are not in the training vocabulary.

In [26]:
priors, likelihoods, vocab_size, laplace_smooth, word_counts = train_naive_bayes(train_files, train_labels, laplace_smooth=True)

accuracy_expand = evaluate(test_files, test_labels, priors, likelihoods, log_prob=True, vocab_size=vocab_size, laplace_smooth=laplace_smooth, word_counts=word_counts)
print(f"Accuracy: {accuracy_expand:.4f}")

print(f"\nDifference of accuracy between classifiers: {abs(accuracy_expand - accuracy_base):.4f}")

Accuracy: 0.8497

Difference of accuracy between classifiers: 0.8051


In [27]:
cleanup_dataset(extracted_path)

Cleaned up dataset directory: C:\Users\2003v\AppData\Local\Temp\dataset_qc_wwiax\20_newsgroups
