In [None]:
import sys
import os
sys.path.append("../src")
sys.path.append("../methods")

# Basic imports
import numpy as np
import pandas as pd

# Deep Learning
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer

# Tokenize sentences
from nltk.tokenize import sent_tokenize

# Utils
from dataset_building import build_dataset
from model import init_model
from trainer import train_epoch

# Measurements
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
from classification import LogClassification, train_classifier
from clustering import KMeansAuthors

# Progress bar
from tqdm import tqdm
tqdm.pandas()

# Data loading

Here, as an example, we are taking a 3 author subset from the Reuters dataset just to simply show how the code works. For an actual training, proper training and test sets must be defined. 

In [None]:
# Load data
df = pd.read_json("../data/reuters_sample.json")

# Clean and tokenize
df.text = df.text.progress_apply(lambda x: x.lower())
df.text = df.text.progress_apply(sent_tokenize)

# Build dataset
dataset = build_dataset(df.text,
                masking_percentage=0.5,
                max_pairs_per_doc = 2)

# Model training

In [None]:
# Load model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = init_model(device)

# Load tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

In [None]:
# Training loop
epochs = 3

for epoch in range(epochs):
    print(f"Epoch {epoch}")
    
    # Construct DataLoader
    dataloader = DataLoader(dataset,
                            batch_size = 16, 
                            shuffle = True)

    train_epoch(model = model,
                tokenizer = tokenizer,
                dataloader = dataloader,
                optimizer = optimizer,
                criterion = criterion,
                device = device,
                print_each = 500, 
                disable_progress_bar = False)

    # Create folder if it doesn't exist
    if not os.path.isdir("saved_models"):
        os.mkdir("saved_models")

    # Save model weights after epoch
    save_path = f"saved_models/model_{epoch}epoch.pt"
    torch.save(model.state_dict(), save_path)
    print("Model saved.\n\n")
    

# Style representations

With the trained models, it is then easy to obtain the style representations for an input text. One simply needs to load the model, set it to the evaluation mode, and perform the forward pass for the given text(s).

In [None]:
# Initialize model
model = init_model(device)

# Load trained model weights
state = torch.load(f"saved_models/saved_model.pt", map_location=torch.device('cpu'))

# This bit corrects the layer names in the saved PyTorch weights, so it can 
# match them. 
state_corrected = {key.replace("module.", ""):value for key, value in state.items()}
model.load_state_dict(state_corrected)

# Set model to evaluation
_ = model.to(device)
_ = model.eval()

In [None]:
def get_style_representations(sentence, tokenizer, model, device):
    """Simple method to obtain the style representation of a sentence"""

    # Tokenize sentence
    toks = tokenizer(sentence, return_tensors="pt")
    tok_ids = toks.input_ids[:, :512]
    att_mask = toks.attention_mask[:, :512]
    tok_ids = tok_ids.to(device)
    att_mask = att_mask.to(device)

    # Forward pass, keeping only [CLS] from the last hidden state
    out = model(tok_ids, att_mask, return_lhs=True)
    return out.cpu().detach().numpy()

In [None]:
# Since for training we are using three sentences, we will
# also use 3 sentences as input for evaluating our model
def chunk_text(sent_list):
    """Group sentences into chunks of 3 sentences"""
    total_length = len(sent_list)
    chunks =  [sent_list[i:i+3] for i in 
                    range(0, total_length, 3)]

    # Remove last chunk if it is too small
    if len(chunks[-1]) != 3: 
        del chunks[-1]
    
    chunks = [" ".join(chunk) for chunk in chunks]
    return chunks

df.text = df.text.apply(chunk_text)

In [None]:
# Build new column containing the style representations
df["style_representations"] = df.text.progress_apply(lambda sentences: 
                                np.vstack([get_style_representations(sent, tokenizer, model, device)
                                                   for sent in sentences]))

# Evaluation methods

## Method 1: Dimension reduction + K-Means

In [None]:
# Standardize data and apply PCA
data = StandardScaler().fit_transform(np.vstack(df.style_representations))
X = PCA(n_components=5).fit_transform(data)

# Instantiate method
cl = KMeansAuthors(n_authors=3)

# Create an author label for each point
auth_labels = [[author]*n_chunks for author, n_chunks in 
                zip(df.author, df.style_representations.apply(len))]

# Flatten
auth_labels = [x for y in auth_labels for x in y]

# Fit data. Pass author labels to assign one author per cluster
cl.fit(X, auth_labels)       

# Metrics
author_pred = cl.predict_document(X, df.style_representations.apply(len).to_numpy())
print(classification_report(y_true = df.author, y_pred = author_pred, zero_division=0))

## Method 2: Logistic Regression

In [None]:
# Instantiate classifier
classifier = LogClassification(n_feat = len(df.style_representations[0][0]),
                               n_class = len(df.author.unique()))

# Create labels for authors
auth_dict = dict(zip(set(df.author), range(len(df.author.unique()))))
labels_train = [[auth_dict[auth]]*n_chunks for auth, n_chunks in zip(df.author,
 df.style_representations.apply(len))]

# Flat labels and chunks 
labels_train = [x for y in labels_train for x in y]
chunks = [x for chunk in df.style_representations for x in chunk]

# Build dataset as tuples (chunk, label)
dataset = list(zip(chunks, labels_train))

# Just for the purpose of showing the code, we use 
# the dataframe we already have as also test set.
df_test = df.copy()

labels_test = [[auth_dict[auth]]*n_chunks for auth, n_chunks in zip(df_test.author,
 df_test.style_representations.apply(len))]

# Flat labels and chunks 
labels_test = [x for y in labels_test for x in y]
chunks = [x for chunk in df_test.style_representations for x in chunk]

# Build dataset as tuples (chunk, label)
dataset_test = list(zip(chunks, labels_test))

# Normalize (if necessary) and build dataset
scaler_train = StandardScaler().fit([x[0] for x in dataset])
dataset = [(scaler_train.transform([x[0]])[0], x[1]) for x in dataset]
dataset_test = [(scaler_train.transform([x[0]])[0], x[1]) for x in dataset_test]

In [None]:
# Train  classifier and generate json files with results
train_classifier(classifier, 
                 dataset, 
                 dataset_test)

In [None]:
# Check measurements
pd.read_json("training_measurements.json")