# Business Purpose:
The 20 Newsgroups dataset is commonly used for benchmarking text classification models. It serves the purpose of categorizing documents into different topics for applications such as content filtering, recommendation systems, and targeted advertising. A relevant citation can be found at [source].


In [1]:
import re
import os
import nltk
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict
from nltk.corpus import wordnet, stopwords
from nltk.stem import WordNetLemmatizer, PorterStemmer
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.metrics import make_scorer, balanced_accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
from nltk.tokenize import word_tokenize

In [2]:
NLTK_RESOURCES = ["punkt", "wordnet", "stopwords"]
DEFAULT_STOPWORDS = None

In [3]:
def download_nltk_resources() -> None:
    """Downloads NLTK resources required for text processing."""
    nltk.data.path.append(f"{os.getcwd()}/nltk_data")
    for resource in NLTK_RESOURCES:
        try:
            nltk.download(resource, download_dir=f"{os.getcwd()}/nltk_data")
        except Exception as e:
            print(f"Error downloading {resource}: {e}")
    DEFAULT_STOPWORDS = set(stopwords.words("english"))

In [4]:
def preprocess_text(text: str, stop_words: set = DEFAULT_STOPWORDS) -> str:
    """
    Preprocesses the input text by lowercasing, removing non-alphabetic characters,
    tokenizing, removing stopwords, and applying lemmatization and stemming.

    Args:
        text (str): The input text to preprocess.
        stop_words (set): A set of stopwords to remove from the text.

    Returns:
        str: The preprocessed and tokenized text.
    """
    lemmatizer = WordNetLemmatizer()
    stemmer = PorterStemmer()

    text = text.lower()
    text = re.sub(r"[^a-zA-Z\s]", "", text)

    tokens = word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words]
    lemmatized_tokens = [lemmatizer.lemmatize(word) for word in tokens]
    stemmed_tokens = [stemmer.stem(word) for word in lemmatized_tokens]

    return " ".join(stemmed_tokens)

In [5]:
def augment_text_with_wordnet(text: str) -> str:
    """
    Augments the input text by replacing words with their synonyms using WordNet.

    Args:
        text (str): The input text to augment.

    Returns:
        str: The augmented text with synonyms.
    """
    tokens = word_tokenize(text)
    augmented_tokens = []
    for token in tokens:
        synsets = wordnet.synsets(token)
        if synsets:
            synonym = synsets[0].lemmas()[0].name()
            augmented_tokens.append(synonym)
        else:
            augmented_tokens.append(token)

    return " ".join(augmented_tokens)

In [6]:
def preprocess_and_augment(text: str, stop_words: set = DEFAULT_STOPWORDS) -> str:
    """
    Preprocesses and augments the input text by applying text preprocessing and
    then augmenting it using WordNet synonyms.

    Args:
        text (str): The input text to process.
        stop_words (set): A set of stopwords to remove from the text.

    Returns:
        str: The preprocessed and augmented text.
    """
    preprocessed_text = preprocess_text(text, stop_words)
    augmented_text = augment_text_with_wordnet(preprocessed_text)
    return augmented_text

In [7]:
def extract_feature_importance(model_pipeline: Pipeline, model_name: str) -> List[str]:
    """
    Extracts the most important words (features) based on the trained model.

    Args:
        model_pipeline (Pipeline): The trained model pipeline.
        model_name (str): The name of the model (e.g., 'Logistic Regression', 'Random Forest').

    Returns:
        List[str]: A list of the most important words/features.
    """
    # Extract the TF-IDF vectorizer and classifier
    tfidf_vectorizer = model_pipeline.named_steps["tfidf"]
    classifier = model_pipeline.named_steps["clf"]

    # Get the feature names (words) from the TF-IDF vectorizer
    feature_names = np.array(tfidf_vectorizer.get_feature_names_out())

    # For models that provide coefficients (e.g., Logistic Regression, SVM)
    if model_name in ["Logistic Regression", "SVM"]:
        # Get the coefficients from the model
        coefficients = classifier.coef_
        importance = np.abs(coefficients).flatten()  # Take absolute value for SVM
    # For models like Random Forest, we use feature_importances_
    elif model_name == "Random Forest":
        importance = classifier.feature_importances_

    # Get the indices of the top important features
    top_n = 20  # You can change this to get more or fewer important words
    top_indices = np.argsort(importance)[-top_n:][::-1]

    # Get the most important feature names
    top_words = feature_names[top_indices]
    top_importances = importance[top_indices]

    # Display the top words with their importance scores
    print(f"Top {top_n} important words for {model_name}:")
    for word, score in zip(top_words, top_importances):
        print(f"{word}: {score:.4f}")

    return top_words

In [8]:
def create_pipeline(model) -> Pipeline:
    """
    Creates a machine learning pipeline that includes TF-IDF vectorization
    and a given classifier.

    Args:
        model: A scikit-learn classifier to include in the pipeline.

    Returns:
        Pipeline: A scikit-learn pipeline with TF-IDF and the provided model.
    """
    return Pipeline(
        [
            (
                "tfidf",
                TfidfVectorizer(preprocessor=preprocess_and_augment, max_features=5000),
            ),
            ("clf", model),
        ]
    )

In [9]:
def load_dataset() -> tuple:
    """
    Loads the 20 Newsgroups dataset and splits it into training and test sets.

    Returns:
        tuple: A tuple containing training and test datasets.
    """
    newsgroups = fetch_20newsgroups(
        subset="all", categories=None, shuffle=True, random_state=42
    )
    X_train, X_test, y_train, y_test = train_test_split(
        newsgroups.data, newsgroups.target, test_size=0.2, random_state=42
    )
    return X_train, X_test, y_train, y_test

In [10]:
def evaluate_models(
    models: Dict[str, Pipeline], X_train: List[str], y_train: List[int]
) -> Dict[str, List[float]]:
    """
    Evaluates multiple models using cross-validation and balanced accuracy.

    Args:
        models (dict): A dictionary where keys are model names and values are scikit-learn pipelines.
        X_train (List[str]): The training data (input texts).
        y_train (List[int]): The training labels.

    Returns:
        dict: A dictionary with model names as keys and cross-validation scores as values.
    """
    results = {}
    scorer = make_scorer(balanced_accuracy_score)

    for model_name, model in models.items():
        print(f"Evaluating {model_name}...")
        # Perform cross-validation for each model
        cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring=scorer, n_jobs=-1)
        results[model_name] = cv_scores
        print(f"{model_name} Balanced Accuracy Scores: {cv_scores}")

    return results

In [11]:
def perform_grid_search(
    best_model_pipeline: Pipeline,
    param_grid: Dict[str, List],
    X_train: List[str],
    y_train: List[int],
) -> Pipeline:
    """
    Performs hyperparameter tuning using GridSearchCV on the best model.

    Args:
        best_model_pipeline (Pipeline): The best-performing model pipeline.
        param_grid (dict): A dictionary of hyperparameters to search over.
        X_train (List[str]): The training data.
        y_train (List[int]): The training labels.

    Returns:
        Pipeline: The fine-tuned model pipeline.
    """
    scorer = make_scorer(balanced_accuracy_score)
    grid_search = GridSearchCV(best_model_pipeline, param_grid, scoring=scorer, cv=5, n_jobs=-1)
    grid_search.fit(X_train, y_train)

    print(f"Best hyperparameters: {grid_search.best_params_}")
    return grid_search.best_estimator_

In [12]:
def retrain_best_model(
    top_words: List[str],
    updated_stopwords: set,
    best_model_pipeline: Pipeline,
    X_train: List[str],
    y_train: List[int],
) -> Pipeline:
    """
    Retrains the best model with updated stopwords and limits TF-IDF to top important words.

    Args:
        top_words (List[str]): A list of the most important words.
        updated_stopwords (set): Updated stopword list excluding top words.
        best_model_pipeline (Pipeline): The best-performing model pipeline.
        X_train (List[str]): The training data (input texts).
        y_train (List[int]): The training labels.

    Returns:
        Pipeline: The retrained model pipeline.
    """
    # Update the preprocessing pipeline to use the updated stopwords
    tfidf_vectorizer = TfidfVectorizer(
        preprocessor=lambda text: preprocess_text(text, updated_stopwords),
        vocabulary=top_words,
    )

    # Recreate the pipeline with updated TF-IDF vectorizer
    model = best_model_pipeline.named_steps["clf"]  # Keep the same classifier
    retrained_pipeline = Pipeline([("tfidf", tfidf_vectorizer), ("clf", model)])

    # Retrain the model
    retrained_pipeline.fit(X_train, y_train)
    return retrained_pipeline

In [13]:
def plot_results(results: Dict[str, List[float]]) -> None:
    """
    Plots cross-validation results of multiple models using a boxplot.

    Args:
        results (dict): A dictionary with model names as keys and cross-validation scores as values.
    """
    plt.figure(figsize=(10, 6))
    plt.boxplot(results.values(), labels=results.keys(), showmeans=True)
    plt.title("Model Comparison using Balanced Accuracy Scores")
    plt.ylabel("Balanced Accuracy Score")
    plt.xticks(rotation=45)
    plt.show()

In [14]:
def plot_confusion_matrix(
    model: Pipeline, X_test: List[str], y_test: List[int]
) -> None:
    """
    Plots the confusion matrix for the fine-tuned model.

    Args:
        model (Pipeline): The fine-tuned model.
        X_test (List[str]): The test data.
        y_test (List[int]): The test labels.
    """
    y_pred = model.predict(X_test)
    cm = confusion_matrix(y_test, y_pred)

    plt.figure(figsize=(10, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.title("Confusion Matrix")
    plt.ylabel("Actual")
    plt.xlabel("Predicted")
    plt.show()

In [15]:
download_nltk_resources()

[nltk_data] Downloading package punkt to /home/nick/Documents/ai-
[nltk_data]     nlp/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/nick/Documents/ai-
[nltk_data]     nlp/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /home/nick/Documents/ai-
[nltk_data]     nlp/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [16]:
X_train, X_test, y_train, y_test = load_dataset()

In [17]:
models = {
    "Naive Bayes": create_pipeline(MultinomialNB()),
    "Logistic Regression": create_pipeline(
        LogisticRegression(max_iter=1000, n_jobs=-1)
    ),
    "SVM": create_pipeline(SVC(kernel="linear")),
    "Random Forest": create_pipeline(RandomForestClassifier(n_jobs=-1)),
    "KNN": create_pipeline(KNeighborsClassifier(n_jobs=-1)),
}

In [18]:
results = evaluate_models(models, X_train, y_train)

Evaluating Naive Bayes...


BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

In [None]:
plot_results(results)

In [None]:
best_model_name = max(results, key=lambda model: results[model].mean())
print(f"The best model is: {best_model_name}")

In [None]:
best_model_pipeline = models[best_model_name]

param_grids = {
    "Random Forest": {
        "clf__n_estimators": [100, 200, 300],
        "clf__max_depth": [10, 20, 30],
        "clf__min_samples_split": [2, 5, 10],
    },
    "SVM": {"clf__C": [0.1, 1, 10], "clf__gamma": ["scale", "auto"]},
}

if best_model_name in param_grids:
    param_grid = param_grids[best_model_name]
    best_model_pipeline = perform_grid_search(
        best_model_pipeline, param_grid, X_train, y_train
    )

In [None]:
plot_confusion_matrix(best_model_pipeline, X_test, y_test)

In [None]:
top_words = extract_feature_importance(best_model_pipeline, best_model_name)

In [None]:
updated_stopwords = DEFAULT_STOPWORDS.difference(set(top_words))

In [None]:
retrained_model = retrain_best_model(
    top_words, updated_stopwords, best_model_pipeline, X_train, y_train
)

In [None]:
plot_confusion_matrix(retrained_model, X_test, y_test)