Final Project: Authorship Identification
======

In [1]:
# Step 1: Import necessary libraries
import os
import numpy as np
import pandas as pd
import copy
import matplotlib.pyplot as plt

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import Normalizer
from sklearn.decomposition import TruncatedSVD,PCA
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, f1_score
from sklearn.pipeline import Pipeline

In [2]:
# setup debug for prints troubleshooting
# debug = True
debug = False

In [3]:
# Load in C50train located ../C50train/
train_dir = '../C50train'
# get name of directories, authors (these will be the labels)
train_sub = [name for name in os.listdir(train_dir) if os.path.isdir(os.path.join(train_dir, name))]
label_lst = np.copy(train_sub)

if debug:
    print(train_dir)
    print(label_lst)

# setup the initial empty variables
train = []
train_v = []
label = []

# load the input data from C50train directory and process it
auth_idx = 0

# go within the author directory to get list of the file names, this will be the training data
for i in train_sub:
    sub2_dir  = '../C50train/' + i 
    train_sub2 = [name for name in os.listdir(sub2_dir) if os.path.isfile(os.path.join(sub2_dir, name))]

    #if debug:
    #    print(sub2_dir)
    #    print(train_sub2)
        
    # in each author file, save the author as the label and the text as its training data
    for j in train_sub2:
        sub3  = '../C50train/' + i + '/' + j

        with open(sub3, 'r') as file:
            data = file.read()
            data_no_nw = data.replace('\n', '').replace('\r', '')
            train.append(data_no_nw)

        # append author index as label
        label.append(auth_idx)

    # increment author index
    auth_idx = auth_idx + 1
        
        #if debug:
        #    print(sub3)

if debug:
    print(np.shape(train))
    print(np.shape(label))

    # bin count looking at label
    unused, idx = np.unique(label, return_counts=True)
    #print(unused)
    print(idx)

    print(train[0])
    print(label[0])
    #print(label)

## Step 2:

In [4]:
# Step 2: Convert text to TF-IDF features
tfidf_vectorizer = TfidfVectorizer(
    max_features=10000,  # Limit to the top 10,000 features for performance
    stop_words='english',  # Remove common English stopwords
    lowercase=True,  # Convert text to lowercase
    )

# Transform text data into TF-IDF feature vectors
X = tfidf_vectorizer.fit_transform(train)
y = np.array(label)

print(f"TF-IDF Feature Shape: {X.shape}")

TF-IDF Feature Shape: (2500, 10000)


## Step 3:

In [5]:
# Step 3: Train-Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15, stratify=y, random_state=42)

print(f"Train Set Shape: {X_train.shape}, Test Set Shape: {X_test.shape}")

Train Set Shape: (2125, 10000), Test Set Shape: (375, 10000)


## Step 4-5:

In [7]:
# Step 4: Dimensionality Reduction with Truncated SVD
svd = TruncatedSVD(n_components=2000, random_state=42) 

# Step 4: Dimensionality Reduction with PCA
# pca = PCA(n_components=2000, random_state=42)  

# Normalize the data
normalizer = Normalizer(copy=False)

# # Scale the data 
# scaler = StandardScaler()

In [8]:
# Step 5: Create a Pipeline with Dimensionality Reduction and Classifier

pipeline = Pipeline([
    ('svd', svd),
#     ('pca', pca),
    ('normalize', normalizer),
    ('classifier', LogisticRegression(max_iter=1000, random_state=42)),
#     ('classifier', RandomForestClassifier(n_estimators=200, random_state=42)),  # Random Forest with class balancing
#     ('classifier', SVC(kernel='rbf', class_weight='balanced', random_state=42)),
    ])

# Train the Model
pipeline.fit(X_train, y_train)

# Evaluate the Model
y_pred = pipeline.predict(X_test)
print("Classification Report:")
print(classification_report(y_test, y_pred))

Classification Report:
              precision    recall  f1-score   support

           0       0.78      0.88      0.82         8
           1       1.00      0.88      0.93         8
           2       0.73      1.00      0.84         8
           3       0.80      0.57      0.67         7
           4       0.71      0.71      0.71         7
           5       0.67      0.50      0.57         8
           6       1.00      1.00      1.00         7
           7       0.80      1.00      0.89         8
           8       0.86      0.86      0.86         7
           9       0.64      1.00      0.78         7
          10       0.89      1.00      0.94         8
          11       0.78      0.88      0.82         8
          12       1.00      0.86      0.92         7
          13       0.55      0.86      0.67         7
          14       0.67      0.75      0.71         8
          15       1.00      1.00      1.00         7
          16       1.00      0.71      0.83         7
    

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


## Step 6:

In [9]:
# Step 6: Calculate F1 Score
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"Weighted F1 Score: {f1:.4f}")

Weighted F1 Score: 0.8012


In [12]:
# Analyze the Dimensionality Reduction
explained_variance_svd = svd.explained_variance_.sum()
print(f"Total Explained Variance by SVD: {explained_variance_svd:.4f}")

# explained_variance_pca = pca.explained_variance_.sum()
# print(f"Total Explained Variance by PCA: {explained_variance_pca:.4f}")

# # Optional Visualization of Explained Variance
# plt.plot(np.cumsum(svd.explained_variance_))
# plt.title('Cumulative Explained Variance by SVD Components')
# plt.xlabel('Number of Components')
# plt.ylabel('Cumulative Explained Variance')
# plt.grid()
# plt.show()

Total Explained Variance by SVD: 0.9620


## Step 7: 

In [13]:
# Load in C50test located ../C50test/
test_dir = '../C50test'
# get name of directories, authors (these will be the labels)
test_sub = [name for name in os.listdir(test_dir) if os.path.isdir(os.path.join(test_dir, name))]
test_lst = np.copy(train_sub)

if debug:
    print(test_dir)
    print(test_lst)

# setup the initial empty variables
test       = []
test_label = []

# load the input data from C50test directory and process it

auth_idx = 0

# go within the author directory to get list of the file names, this will be the training data
for i in train_sub:
    sub2_dir  = '../C50test/' + i 
    test_sub2 = [name for name in os.listdir(sub2_dir) if os.path.isfile(os.path.join(sub2_dir, name))]

    #if debug:
    #    print(sub2_dir)
    #    print(train_sub2)
        
    # in each author file, save the text as its test data
    for j in test_sub2:
        sub3  = '../C50test/' + i + '/' + j

        with open(sub3, 'r') as file:
            data = file.read()
            data_no_nw = data.replace('\n', '').replace('\r', '')
            test.append(data_no_nw)
        
        test_label.append(auth_idx)

    auth_idx = auth_idx + 1

if debug:
    print(np.shape(test))

In [14]:
# Transform Test Data into TF-IDF Features
X_test_new = tfidf_vectorizer.transform(test)  # Use the trained vectorizer on test data
y_test_new = np.array(test_label)

print(f"Transformed Test Feature Shape: {X_test_new.shape}")

# Step 2: Predict on Test Data
y_pred_new = pipeline.predict(X_test_new)

# Step 3: Evaluate Model on Test Data
print("Classification Report on Test Data:")
print(classification_report(y_test_new, y_pred_new))

# Optional: Calculate Weighted F1 Score
f1_new = f1_score(y_test_new, y_pred_new, average='weighted')
print(f"Weighted F1 Score on Test Data: {f1_new:.4f}")


Transformed Test Feature Shape: (2500, 10000)
Classification Report on Test Data:
              precision    recall  f1-score   support

           0       0.81      0.94      0.87        50
           1       0.85      0.46      0.60        50
           2       0.57      0.26      0.36        50
           3       0.32      0.24      0.27        50
           4       0.70      0.60      0.65        50
           5       0.59      0.86      0.70        50
           6       0.36      0.28      0.31        50
           7       0.56      0.28      0.37        50
           8       0.88      0.42      0.57        50
           9       0.40      0.44      0.42        50
          10       0.91      1.00      0.95        50
          11       0.70      0.92      0.79        50
          12       0.32      0.38      0.35        50
          13       0.22      0.16      0.18        50
          14       0.44      0.36      0.40        50
          15       0.89      1.00      0.94        50

## Plots

In [None]:
# Define the range of n_components to explore
components_range = range(0,1500,100)  # From 1 to 35 components (you can increase this range if needed)
explained_variances = []

# Get the number of features (dimensions) in your dataset
# n_features = X_train.shape[1]

# Ensure that the components do not exceed the number of features
# components_range = [n for n in components_range if n <= n_features]

# Fit PCA for each number of components and record the cumulative explained variance
for n in components_range:
    pca = PCA(n_components=n)
    pca.fit(X_train.toarray())  # Convert sparse matrix to dense if necessary
    explained_variances.append(np.sum(pca.explained_variance_))  # Cumulative explained variance

# Plot the explained variance
plt.figure(figsize=(10, 6))
plt.plot(components_range, explained_variances, marker='o', color='b', linestyle='-', markersize=3)
plt.title('Explained Variance vs. Number of PCA Components')
plt.xlabel('Number of Components (n)')
plt.ylabel('Cumulative Explained Variance')
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# List of test sizes to evaluate
test_sizes = np.linspace(0.1, 0.31, 10)  # Test sizes from 10% to 50%
f1_scores = []

# Evaluate the model for each test size
for test_size in test_sizes:
    # Split the data
    X_train_split, X_test_split, y_train_split, y_test_split = train_test_split(
        X, y, test_size=test_size, stratify=y, random_state=42
    )
    
    # Train the model
    pipeline.fit(X_train_split, y_train_split)
    
    # Predict and evaluate
    y_pred_split = pipeline.predict(X_test_split)
    f1 = f1_score(y_test_split, y_pred_split, average='weighted')
    f1_scores.append(f1)

    if debug:
        print(f"Test Size: {test_size:.2f}, F1 Score: {f1:.4f}")

# Plot Test Size vs. F1-Score
plt.plot(test_sizes, f1_scores, marker='o', linestyle='-', color='b')
plt.title("Test Size vs. Weighted F1-Score")
plt.xlabel("Test Size")
plt.ylabel("Weighted F1-Score")
plt.grid()
plt.show()


In [None]:
from wordcloud import WordCloud
all_text = " ".join(train)
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(all_text)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title("Most Frequent Words")
plt.show()


In [None]:
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, random_state=42)
X_embedded = tsne.fit_transform(X[:1000].toarray())  # Use a subset for efficiency
plt.scatter(X_embedded[:, 0], X_embedded[:, 1], c=label[:1000], cmap='viridis', s=5)
plt.colorbar()
plt.title("t-SNE Visualization of Feature Space")
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix

# highlight which classes are misclassified
cm = confusion_matrix(y_test_new, y_pred_new)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=False,, fmt='d' cmap='Blues', xticklabels=label_lst, yticklabels=label_lst)
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()


In [None]:
from sklearn.metrics import classification_report

plt.figure(figsize=(10, 8))
report = classification_report(y_test_new, y_pred_new, output_dict=True)
f1_scores = [report[str(i)]['f1-score'] for i in range(len(label_lst))]
plt.bar(label_lst, f1_scores)
plt.title("F1 Score per Class")
plt.xlabel("Class")
plt.ylabel("F1 Score")
plt.xticks(rotation=90)
plt.show()
