# Author Prediction - BERT

## Importing Libraries

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
tf.__version__




'2.15.1'

In [2]:
import os
os.environ['TF_USE_LEGACY_KERAS'] = '1'

## Data Preprocessing

### Importing the Dataset

In [3]:
current_dir = os.path.dirname(os.path.realpath('__file__'))
root = os.path.join(current_dir, "Books")

data = []

# Loop through author and book folders, adding .txt file contents to data with matching labels
for author in os.listdir(root):
    author_path = os.path.join(root, author)

    for book in os.listdir(author_path):
        book_path = os.path.join(author_path, book)

        for chapter in os.listdir(book_path):
            chapter_path = os.path.join(book_path, chapter)
            
            with open(chapter_path, 'r', encoding='utf-8') as file:
                text = file.read()
                
            data.append({
                'Author': author,
                'Book': book,
                'Chapter': chapter[:-4],
                'Text': text
            })

dataset = pd.DataFrame(data)

### Splitting and Cleaning

In [4]:
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

fragment_size = 200
overlap = 50

# First we clean our text, converting to lower case and removing unwanted characted
# Then we apply corpus, simplifying our text
# Finally we split our text into fragments of 'fragment_size', with an overlap of 'overlap' words from the previous fragment

def preprocess_text(text, ps, all_stopwords):
    # Clean text
    text = text.lower()  # Lowercase
    text = re.sub(r'\n', " ", text)  # Newlines
    text = re.sub(r'[^a-zA-Z\s]', " ", text)  # Punctuation and special characters
    text = re.sub(r'\s+', ' ', text).strip()  # Extra spaces

    # Apply corpus
    words = text.split()
    words = [ps.stem(word) for word in words if word not in all_stopwords]
    processed_text = " ".join(words)

    return processed_text

def fragment_text(text, fragment_size, overlap):
    # Split text into fragments of fragment_size length, returns array of fragments
    words = text.split()
    current_text_fragments = []
    
    step_size = fragment_size - overlap  
    
    for i in range(0, len(words), step_size):
        current_fragment = " ".join(words[i:i + fragment_size])
        current_text_fragments.append(current_fragment)

        # Handle situation where final chapter fragment is already contained in the previous fragment
        if len(words) - i < fragment_size:
            break
        
    return current_text_fragments

In [5]:
ps = PorterStemmer()
all_stopwords = stopwords.words('english')
all_stopwords.remove('not')

# Apply our cleaning and create a new dataset to replace our previous one, this time with processed text
text_fragments = []
for index, row in dataset.iterrows():
    text = row["Text"]
    text = preprocess_text(text, ps, all_stopwords)
    current_text_fragments = fragment_text(text, fragment_size, overlap)
    
    for text_fragment in current_text_fragments:
        text_fragments.append({
            "Book": row["Book"],
            "Author": row["Author"],
            "Text": text_fragment
        })

# Convert the data fragments into a Pandas DataFrame and replace the original
dataset = pd.DataFrame(text_fragments)

## Encoding and Tokenisation

### Encoding

In [6]:
X = dataset["Text"].values
y = dataset["Author"].values

In [7]:
from sklearn.preprocessing import OneHotEncoder
import numpy as np

# Reshape y to a 2D array (needed for OneHotEncoder)
y_reshaped = y.reshape(-1, 1)

# Create and fit OneHotEncoder
encoder = OneHotEncoder(sparse_output=False)
y_encoded = encoder.fit_transform(y_reshaped)

In [8]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Creating class weights (in order to reduce overfitting from imbalanced data size per author)
y_class_indices = np.argmax(y_encoded, axis=-1)

class_weights = compute_class_weight(
    'balanced', 
    classes=np.unique(y_class_indices), 
    y=y_class_indices
)

# Creating class weights dictionary
class_weight_dict = {i: class_weights[i] for i in range(len(class_weights))}

### Tokenisation

In [9]:
# Remove empty strings after .strip() applied
# There shouldn't be any but BERT tokeniser fails if not done
X = [x for x in X if x.strip() != '']

In [10]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size = 0.2, random_state = 1)

In [11]:
from transformers import BertTokenizerFast

# Max length chosen based on token distribution coming from fragment size to minimise truncation
# Edit if changing fragment_size, or optionally remove  entirely though this should be less efficient
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

X_tokenized = tokenizer(
    X_train, 
    padding=True, 
    truncation=True,
    max_length=330,
    return_tensors='tf'
)

X_test_tokenized = tokenizer(
    X_test, 
    padding=True, 
    truncation=True,
    max_length=330,
    return_tensors='tf'
)



## Creating and Training Model

In [12]:
from transformers import TFBertModel
from tensorflow.keras.layers import Input, Dense, Dropout, Lambda
from tensorflow.keras.models import Model

# Load pretrained BERT model
bert_model = TFBertModel.from_pretrained('bert-base-uncased')

def create_model(learning_rate=2e-5, dropout_rate=0.3):
    # Define input layers
    input_ids = tf.keras.layers.Input(shape=(330,), dtype=tf.int32, name="input_ids")
    attention_mask = tf.keras.layers.Input(shape=(330,), dtype=tf.int32, name="attention_mask")
    
    # Define output
    bert_output = bert_model([input_ids, attention_mask])
    pooled_output = bert_output.pooler_output
    
    # Add dropout (prevent overfitting)
    dropout = Dropout(dropout_rate)(pooled_output)
    
    # Create classification layers
    num_authors = len(dataset['Author'].unique())
    output = Dense(num_authors, activation="softmax")(dropout)
    
    # # Create model
    model = Model(inputs=[input_ids, attention_mask], outputs=output)
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    return model




Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions w

In [13]:
# Freeze all layers
for layer in bert_model.layers:
    layer.trainable = False

# Unfreeze only the last few layers (e.g., last 4)
for layer in bert_model.layers[-4:]:  
    layer.trainable = True

In [14]:
learning_rate = 1e-5
dropout_rate = 0.4

# Create the model
model = create_model(learning_rate=learning_rate, dropout_rate=dropout_rate)

# model.summary()

## Early Stopping

In [15]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(
    monitor='val_loss',         # Monitor validation loss
    patience=3,                 # Stop after 3 epochs with no improvement
    restore_best_weights=True   # Restore model weights from the best epoch
)

In [17]:
# Set epochs lower if not using early stopping, observed some good results at 5+
# Converges at around 19 with seven authors, takes 1h30m on CPU
epochs = 50 
batch_size = 16

# Label input ids and attention mask for BERT (from BERT tokenizer), for the sake of convienience
train_input_ids = X_tokenized['input_ids']
train_attention_mask = X_tokenized['attention_mask']
test_input_ids = X_test_tokenized['input_ids']
test_attention_mask = X_test_tokenized['attention_mask']

# Train the model
model.fit(
    [train_input_ids, train_attention_mask],
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    validation_data=([test_input_ids, test_attention_mask], y_test),
    callbacks=[early_stopping]
)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50


<keras.src.callbacks.History at 0x280130c4510>

## Evaluating Model

In [18]:
from sklearn.metrics import confusion_matrix, classification_report

y_pred = model.predict([test_input_ids, test_attention_mask])
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)  # Convert one-hot labels to class indices

print(confusion_matrix(y_true_classes, y_pred_classes))
print("\n")
print(classification_report(y_true_classes, y_pred_classes))

[[20  0  1  0  0  0  1]
 [ 0 29  0  0  0  0  0]
 [ 0  0 29  0  1  0  0]
 [ 0  0  0 14  0  0  0]
 [ 0  0  0  0 26  0  0]
 [ 0  0  0  0  0 31  0]
 [ 0  0  0  0  1  0 11]]


              precision    recall  f1-score   support

           0       1.00      0.91      0.95        22
           1       1.00      1.00      1.00        29
           2       0.97      0.97      0.97        30
           3       1.00      1.00      1.00        14
           4       0.93      1.00      0.96        26
           5       1.00      1.00      1.00        31
           6       0.92      0.92      0.92        12

    accuracy                           0.98       164
   macro avg       0.97      0.97      0.97       164
weighted avg       0.98      0.98      0.98       164



In [19]:
loss, accuracy = model.evaluate(
    [train_input_ids, train_attention_mask],  # Model inputs
    y_train  # True labels
)

print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

Test Loss: 0.002359158592298627
Test Accuracy: 1.0


### Sanity Check

In [20]:
# Testing model on data not contained within original dataset
tests_path = os.path.join(current_dir, "Tests")

for author in os.listdir(tests_path):
    test_file_path = os.path.join(tests_path, author)
    
    with open(test_file_path, 'r', encoding='utf-8') as file:
        new_text = file.read()

    processed_new_text = preprocess_text(new_text, ps, all_stopwords)
    
    X_test_tokenized = tokenizer(
        processed_new_text, 
        padding='max_length',
        truncation=True,
        max_length=330,
        return_tensors='tf'
    )

    # Make predictions on the test data
    predictions = model.predict([X_test_tokenized['input_ids'], X_test_tokenized['attention_mask']])
    
    # Get the predicted class
    predicted_classes = predictions.argmax(axis=-1)  # Get the index of the highest probability
    
    # Map to predicted class to author name
    author_names = dataset['Author'].unique()
    author_mapping = {index: author for index, author in enumerate(author_names)}
    predicted_authors = [author_mapping[i] for i in predicted_classes]
    
    print(f"Prediction: {predicted_authors} vs Actual: {author[:-4]}")

Prediction: ['Charles Dickens'] vs Actual: Charles Dickens
Prediction: ['F Scott Fitzgerald'] vs Actual: F Scott Fitzgerald
Prediction: ['Fyodor Dostoyevsky'] vs Actual: Fyodor Dostoyevsky
Prediction: ['Herman Melville'] vs Actual: Herman Melville
Prediction: ['Mary Shelley'] vs Actual: Mary Shelley
Prediction: ['Oscar Wilde'] vs Actual: Oscar Wilde
Prediction: ['Robert Louise Stevenson'] vs Actual: Robert Louise Stevenson
