In [1]:
import numpy as np
import pandas as pd
import os
from misc_utils import dataset_filtering
from sklearn.feature_extraction.text import TfidfVectorizer
import re
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
from collections import Counter
import textstat
from sklearn.naive_bayes import MultinomialNB
from sklearn.preprocessing import MinMaxScaler
from scipy.sparse import hstack, csr_matrix
from sklearn.naive_bayes import ComplementNB
from sklearn.metrics import (
    accuracy_score, f1_score, precision_score, recall_score,
    confusion_matrix, classification_report, make_scorer)
from sklearn.pipeline import Pipeline
from itertools import product

In [2]:
git_repo_path = '/Users/hunterworssam/Datascience'
gutenberg_repo_path = os.path.join(git_repo_path, 'gutenberg')
gutenberg_analysis_repo = os.path.join(git_repo_path, 'gutenberg_corpus_analysis')

In [3]:
# Load the CSV files
train = pd.read_csv('final_train.csv')
test = pd.read_csv('final_test.csv')
val = pd.read_csv('final_val.csv')

In [4]:
# Define a function to apply the word, line and token counts
def enrich_dataframe(df):
    count_path = os.path.join(gutenberg_repo_path, 'data', 'counts')
    text_path = os.path.join(gutenberg_repo_path, 'data', 'text')
    token_path = os.path.join(gutenberg_repo_path, 'data', 'tokens')

    df['word_count'] = df['id'].apply(lambda pid: dataset_filtering.get_word_count(pid, count_path))
    df['unique_word_count'] = df['id'].apply(lambda pid: dataset_filtering.get_unique_word_count(pid, count_path))
    df['line_count'] = df['id'].apply(lambda pid: dataset_filtering.get_line_count(pid, text_path))
    df['token_count'] = df['id'].apply(lambda pid: dataset_filtering.get_token_count(pid, token_path))

    return df

Token count, unique word count and line count were not added because the nltk module was not downloaded properly at the time of the download.

In [6]:
# Apply to all datasets
train = enrich_dataframe(train)
val = enrich_dataframe(val)
test = enrich_dataframe(test)

# Check shapes
print("Train shape:", train.shape)
print("Val shape:", val.shape)
print("Test shape:", test.shape)

Train shape: (1920, 13)
Val shape: (240, 14)
Test shape: (240, 14)


Add text and raw data from gutenberg import to the dataframes.

In [8]:
text_folder = os.path.join(gutenberg_repo_path, 'data', 'text')

# Function to load text for a given Project Gutenberg ID
def load_book_text(pg_id):
    filename = f'{pg_id}_text.txt'
    filepath = os.path.join(text_folder, filename)
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        return None  # or "", depending on preference

# Apply the function to each row
train['text'] = train['id'].apply(load_book_text)
val['text'] = val['id'].apply(load_book_text)
test['text'] = test['id'].apply(load_book_text)

In [13]:
raw_folder = os.path.join(gutenberg_repo_path, 'data', 'raw')

# Function to load text for a given Project Gutenberg ID
def load_book_text_raw(pg_id):
    filename = f'{pg_id}_raw.txt'
    filepath = os.path.join(raw_folder, filename)
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        return None  # or "", depending on preference

# Apply the function to each row
train['raw'] = train['id'].apply(load_book_text_raw)
val['raw'] = val['id'].apply(load_book_text_raw)
test['raw'] = test['id'].apply(load_book_text_raw)

Confirm all rows have text data appended.

In [15]:
print("Missing raw files:", train['raw'].isnull().sum())

Missing raw files: 0


In [16]:
print("Missing text files:", train['text'].isnull().sum())

Missing text files: 0


It was noted that the first page of many of these texts includes the author and title which can potentially influence our models. Lats remove the first 100 words from all texts to make sure this is not cofounding our results.

In [27]:
def remove_first_50_words(text):
    if not isinstance(text, str):
        return text  # Leave NaN or None untouched
    words = text.split()
    return ' '.join(words[50:])  # Remove first 50 words

In [29]:
# Preprocess text column
train['text'] = train['text'].apply(remove_first_50_words)
val['text'] = val['text'].apply(remove_first_50_words)
test['text'] = test['text'].apply(remove_first_50_words)

#### Texts have been loaded in, now we can begin TF-IDF Vectorization.

In [31]:
vectorizer = TfidfVectorizer(
    stop_words='english', # Removes a lot of common english words like it, and, that, is etc. Uses predifined scikit list of common english words.
    sublinear_tf=True, # Uses logarithmic word frequency weighting, reducing the weight of extremely frequent terms & helps prevent domination by larger text files
    max_features=10000, # Consideration for both overfitting and computational requirements.
    ngram_range=(1,2)
)

In [32]:
X_train = vectorizer.fit_transform(train['text'])
X_val = vectorizer.transform(val['text'])
X_test = vectorizer.transform(test['text'])

Create seperate dataframe for TF-IDF Vectorization output. Next we'll want to append our stylometric features and our readability metrics to this dataframe.

In [34]:
X_train.shape

(1920, 10000)

In [35]:
# Function to extract stylometric features from a single text
def extract_stylometric_features(text):
    if not isinstance(text, str) or not text.strip():
        return [0] * 12  # updated to match total number of features

    # Use NLTK tokenizers with explicit language call
    words = word_tokenize(text, language='english', preserve_line=True)
    sentences = sent_tokenize(text, language='english')
    chars = list(text)

    word_lengths = [len(w) for w in words if w.isalpha()]
    total_words = len(words)
    unique_words = len(set(w.lower() for w in words if w.isalpha()))
    total_sentences = len(sentences)

    avg_word_length = np.mean(word_lengths) if word_lengths else 0
    avg_sentence_length = total_words / total_sentences if total_sentences else 0
    type_token_ratio = unique_words / total_words if total_words else 0

    punctuation_counts = Counter(c for c in text if c in ".,!?;:-")
    punctuation_freqs = [punctuation_counts[p] / len(text) for p in ".,!?;:-"]

    uppercase_chars = sum(1 for c in text if c.isupper())
    uppercase_ratio = uppercase_chars / len(text)

    digit_ratio = sum(1 for c in text if c.isdigit()) / len(text)

    return [
        avg_word_length,
        avg_sentence_length,
        type_token_ratio,
        *punctuation_freqs,
        uppercase_ratio,
        digit_ratio
    ]

# List of feature names
stylo_feature_names = [
    "avg_word_length",
    "avg_sentence_length",
    "type_token_ratio",
    "period_freq", "comma_freq", "exclam_freq",
    "question_freq", "semicolon_freq", "colon_freq", "dash_freq",
    "uppercase_ratio", "digit_ratio"
]

# Apply to your dataset
def get_stylometric_features(df):
    features = df['text'].apply(extract_stylometric_features)
    return pd.DataFrame(features.tolist(), columns=stylo_feature_names)

In [38]:
# For train set
train_stylo = get_stylometric_features(train)

# For val and test
val_stylo = get_stylometric_features(val)
test_stylo = get_stylometric_features(test)

# Preview
print(train_stylo.head())

   avg_word_length  avg_sentence_length  type_token_ratio  period_freq  \
0         4.250333            15.664977          0.078897     0.010975   
1         4.248404            14.571429          0.081391     0.013303   
2         4.066894            20.021214          0.072704     0.012683   
3         4.139240            20.450896          0.075695     0.012977   
4         4.125742            21.993048          0.069692     0.012717   

   comma_freq  exclam_freq  question_freq  semicolon_freq  colon_freq  \
0    0.012713     0.001262       0.001098        0.000138    0.000208   
1    0.014442     0.001212       0.001377        0.000256    0.000176   
2    0.013631     0.001044       0.001168        0.000244    0.000173   
3    0.013200     0.001576       0.001345        0.000166    0.000083   
4    0.015411     0.000932       0.001211        0.000102    0.000185   

   dash_freq  uppercase_ratio  digit_ratio  
0   0.002203         0.027425     0.000093  
1   0.002391         0.030

Add common readability quantification metrics to the dataset. Our training set will now be 1920 x (TF-IDF value + 12 + 6)

In [40]:
def readability_metrics(text):
    return [
        textstat.flesch_reading_ease(text),
        textstat.flesch_kincaid_grade(text),
        textstat.gunning_fog(text),
        textstat.smog_index(text),
        textstat.coleman_liau_index(text),
        textstat.automated_readability_index(text)
    ]

readability_feature_names = [
    'flesch_reading_ease',
    'flesch_kincaid_grade',
    'gunning_fog',
    'smog_index',
    'coleman_liau_index',
    'automated_readability_index'
]

# Apply to a DataFrame
def get_readability_features(df):
    scores = df['text'].apply(readability_metrics)
    return pd.DataFrame(scores.tolist(), columns=readability_feature_names)

In [41]:
# Get readability feature matrices
train_readability = get_readability_features(train)
val_readability = get_readability_features(val)
test_readability = get_readability_features(test)

In [42]:
train_readability.head()

Unnamed: 0,flesch_reading_ease,flesch_kincaid_grade,gunning_fog,smog_index,coleman_liau_index,automated_readability_index
0,73.78,6.5,6.68,9.1,7.71,7.7
1,75.4,5.9,6.2,8.6,7.29,6.9
2,74.9,6.1,6.16,8.4,6.77,6.7
3,75.4,5.9,6.0,8.6,6.83,6.6
4,74.59,6.2,6.24,8.3,7.3,7.1


In [43]:
# Horizontally concatenate
train_extra = pd.concat([train_stylo, train_readability], axis=1)
val_extra = pd.concat([val_stylo, val_readability], axis=1)
test_extra = pd.concat([test_stylo, test_readability], axis=1)

In [44]:
# Define labels from original dataset
y_train = train['author']
y_val = val['author']
y_test = test['author']

In [45]:
def evaluate_model(model, X_train, y_train, X_val, y_val):
    # Train and predict
    model.fit(X_train, y_train)
    y_pred = model.predict(X_val)

    # Metrics (set zero_division=0 to silence warnings)
    acc = accuracy_score(y_val, y_pred)
    f1 = f1_score(y_val, y_pred, average='weighted', zero_division=0)
    precision = precision_score(y_val, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_val, y_pred, average='weighted', zero_division=0)

    # Print performance
    print(f"Model: {model.__class__.__name__}")
    print(f"Accuracy:  {acc:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print("-" * 40)

    return model, y_pred

To run Naive Bayes we need only positive features, which our scaled stylometric and readability features do not adhere to. (TF-IDF features are strictly positive).

In [47]:
# Naive Bayes can't use standard scaler because it generated negative values, thus we pivot to min max scaler
scaler = MinMaxScaler()
train_scaled_nb = scaler.fit_transform(train_extra)
val_scaled_nb = scaler.transform(val_extra)
test_scaled_nb = scaler.transform(test_extra)

X_train_combined_nb = hstack([X_train, csr_matrix(train_scaled_nb)])
X_val_combined_nb   = hstack([X_val, csr_matrix(val_scaled_nb)])
X_test_combined_nb  = hstack([X_test, csr_matrix(val_scaled_nb)])

We can start with a baseline MultinomialNB model using our pre-defined max_features of 10,000 and n-gram setting of (1,2). Alpha will be kept as 1.0 for this first pass.

### Naive Bayes with multinomial NB & No stylometric features

In [50]:
# Naive Bayes, using unreduced feature set and stylo features
nb_model_multi_no_stylo = MultinomialNB()
model, y_pred = evaluate_model(nb_model_multi_no_stylo, X_train, y_train, X_val, y_val)

Model: MultinomialNB
Accuracy:  0.8583
F1 Score:  0.8483
Precision: 0.8910
Recall:    0.8583
----------------------------------------


In [64]:
# Your text and labels
X_train_texts = train['text']
y_train = train['author']
X_val_texts = val['text']
y_val = val['author']

# Parameter grid
param_grid = {
    'tfidf__max_features': [5000, 10000, 15000],
    'tfidf__ngram_range': [(1, 1), (1, 2), (1, 3)],
    'nb__alpha': [0.1, 0.5, 1.0, 2.0]
}

# Create all combinations of parameters
param_combos = list(product(param_grid['tfidf__max_features'],
                            param_grid['tfidf__ngram_range'],
                            param_grid['nb__alpha']))

results = []

# Loop through each param combo
for max_feat, ngram_range, alpha in param_combos:
    pipeline = Pipeline([
        ('tfidf', TfidfVectorizer(stop_words='english', sublinear_tf=True, max_features=max_feat, ngram_range=ngram_range)),
        ('nb', MultinomialNB(alpha=alpha))
    ])
    
    pipeline.fit(X_train_texts, y_train)
    y_pred = pipeline.predict(X_val_texts)
    
    results.append({
        'max_features': max_feat,
        'ngram_range': ngram_range,
        'alpha': alpha,
        'accuracy': accuracy_score(y_val, y_pred),
        'precision': precision_score(y_val, y_pred, average='weighted', zero_division=0),
        'recall': recall_score(y_val, y_pred, average='weighted', zero_division=0),
        'f1': f1_score(y_val, y_pred, average='weighted', zero_division=0)
    })

# Convert to DataFrame to view
df_results = pd.DataFrame(results)
df_results = df_results.sort_values(by='f1', ascending=False)

# Convert to DataFrame
df_results = pd.DataFrame(results)
df_results = df_results.sort_values(by='f1', ascending=False)

# Export to CSV
df_results.to_csv('nb_results.csv', index=False)

#### Now we can use the best set of parameters on the test set.

In [69]:
# Best parameters from grid search
best_max_features = 15000
best_ngram_range = (1, 3)
best_alpha = 0.1

# Text and labels
X_train_texts = train['text']
y_train = train['author']
X_val_texts   = val['text']
y_val   = val['author']
X_test_texts  = test['text']
y_test  = test['author']

# Final pipeline
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(
        stop_words='english',
        sublinear_tf=True,
        max_features=best_max_features,
        ngram_range=best_ngram_range
    )),
    ('nb', MultinomialNB(alpha=best_alpha))
])

# Fit the model on training data
pipeline.fit(X_train_texts, y_train)

# Predict on all sets
y_train_pred = pipeline.predict(X_train_texts)
y_val_pred   = pipeline.predict(X_val_texts)
y_test_pred  = pipeline.predict(X_test_texts)

# Evaluation function
def print_scores(name, y_true, y_pred):
    print(f"📊 {name} Set Results:")
    print(f"  Accuracy:  {accuracy_score(y_true, y_pred):.4f}")
    print(f"  F1 Score:  {f1_score(y_true, y_pred, average='weighted', zero_division=0):.4f}")
    print(f"  Precision: {precision_score(y_true, y_pred, average='weighted', zero_division=0):.4f}")
    print(f"  Recall:    {recall_score(y_true, y_pred, average='weighted', zero_division=0):.4f}")
    print("-" * 40)

# Print results
print_scores("Training", y_train, y_train_pred)
print_scores("Validation", y_val, y_val_pred)
print_scores("Test", y_test, y_test_pred)

📊 Training Set Results:
  Accuracy:  0.9792
  F1 Score:  0.9792
  Precision: 0.9807
  Recall:    0.9792
----------------------------------------
📊 Validation Set Results:
  Accuracy:  0.9375
  F1 Score:  0.9376
  Precision: 0.9575
  Recall:    0.9375
----------------------------------------
📊 Test Set Results:
  Accuracy:  0.9542
  F1 Score:  0.9520
  Precision: 0.9700
  Recall:    0.9542
----------------------------------------
