Uncomment and run in case the packages are not installed


In [None]:
# %pip install pandas, matplotlib, nltk, torch, imageio, tabulate

Import libraries

In [None]:
# data manipulation
import pandas as pd
import numpy as np
# data visualization
import matplotlib.pyplot as plt
# text processing
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('wordnet')
stopwords = set(stopwords.words('english'))
# pytorch
import torch
from torch import nn
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader
# utils
import pickle
from tabulate import tabulate
from tqdm import tqdm
tqdm.pandas()
from collections import Counter
from utils.models import TClassifier
from utils.training import Trainer
from utils.graphics import plot_review

# set seed for reproducibility
torch.manual_seed(0)
np.random.seed(0)

Read and batch data

In [None]:
# read data
data = pd.read_csv('./data/imdb_preprocessed.csv')

# get all processed reviews
reviews = data.processed.values

# merge into single variable, separated by whitespaces
words = ' '.join(reviews)

# obtain list of words
words = words.split()

# build vocabulary
counter = Counter(words)
vocab = sorted(counter, key=counter.get, reverse=True)
int2word = dict(enumerate(vocab, 1))
int2word[0] = '<PAD>'
word2int = {word: id for id, word in int2word.items()}

# encode words
reviews_enc = [[word2int[word] for word in review.split()] for review in tqdm(reviews)]

# padding sequences
def pad_features(reviews, pad_id, seq_length=128):
    features = np.full((len(reviews), seq_length), pad_id, dtype=int)
    padded_count = 0
    trimmed_count = 0
    
    for i, row in enumerate(reviews):
        if len(row) > seq_length:
            trimmed_count += 1
            features[i, :seq_length] = np.array(row)[:seq_length]
        else:
            padded_count += 1
            features[i, :len(row)] = np.array(row)
    
    return features, padded_count, trimmed_count

seq_length = 128
features, padded_count, trimmed_count = pad_features(reviews_enc, pad_id=word2int['<PAD>'], seq_length=seq_length)

# Assertions to ensure the feature matrix dimensions are as expected
assert len(features) == len(reviews_enc)
assert len(features[0]) == seq_length

print(f"Number of padded reviews: {padded_count}")
print(f"Number of trimmed reviews: {trimmed_count}")

# get labels as numpy
labels = data.label.to_numpy()

# train test split
train_size = .7         # we will use 70% of whole data as train set
test_size = .3          # and we will use 30% of whole data as test set

# make train set and test
split_id = int(len(features) * train_size)
train_x, test_x = features[:split_id], features[split_id:]
train_y, test_y = labels[:split_id], labels[split_id:]

# define batch size
batch_size = 64

# create tensor datasets
trainset = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
testset = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))

# create dataloaders
trainloader = DataLoader(trainset, shuffle=True, batch_size=batch_size)
testloader = DataLoader(testset, shuffle=True, batch_size=batch_size)

Define all hyperparameters for model and training

In [None]:
# model hyperparameters
vocab_size = len(word2int)
embs = [2, 4, 8, 16]
depth = 8 
temperature = 0.001
lr = 0.001
hm_loss = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.BCELoss()  # we use BCELoss for a binary classification problem
num_epochs = 100 + 1
save_every = 10
grad_clip = 5
saved_dir = './saved'

Training and test

In [None]:
for emb in embs:
    model = TClassifier(vocab_size, emb, seq_length, depth, temperature)
    optim = Adam(model.parameters(), lr=lr)
    trainer = Trainer(model,optim, criterion, device, hm_loss, saved_dir, grad_clip)
    trainer.train(trainloader, testloader, num_epochs, emb, save_every)

Plot learning curves

In [None]:
# Define plot styles
colors = ['blue', 'green', 'red', 'orange']
markers = ['^', 'o', 's', 'd']
linestyles = ['solid','dotted', 'dashed', 'dashdot']

# Define plots directory
plots_dir = './plots'

### Plot of the training losses for both the softmax and hardmax models 
embs = [2, 4]
plt.figure(figsize=(5, 3))
for idx, emb in enumerate(embs):
    # Load history dictionary from the file
    with open(saved_dir + '/history_d=' + str(emb) + '_HM.pkl', 'rb') as f:
        history = pickle.load(f)
    # Plot training loss Softmax
    plt.plot(range(history['epochs']), history['train_loss'], label='Softmax $d = $' + str(emb), 
            color=colors[idx])
    # Plot training loss Hardmax
    # Filter out the indices and values that are not multiples of 10
    ind = [int(i * 10) for i in range(10 + 1)]
    values = [history['train_loss_hm'][i] for i in ind]
    plt.scatter(ind, values, label='Hardmax $d = $' + str(emb),
            marker=markers[idx],
            facecolor="none",
            s = 50,
            color=colors[idx])
plt.legend()
plt.yscale('log')
plt.xlabel('epochs')
plt.ylabel('training loss')
plt.savefig(plots_dir + '/training_loss_sm_hm' + ".pdf", format='pdf', dpi=250, bbox_inches='tight')

### Plot of the training and test accuracy
embs = [2, 4, 8, 16]
fig, axs = plt.subplots(1, 2, figsize=(9,2))
legend_handles = []  # Store legend handles
for idx, emb in enumerate(embs):
    # Load history dictionary from the file
    with open(saved_dir + '/history_d=' + str(emb) + '.pkl', 'rb') as f:
        history = pickle.load(f)
    # Plot train accuracy
    train_line, = axs[0].plot(range(history['epochs']), history['train_acc'], label='$d = $' + str(emb), 
            linestyle=linestyles[idx],
            color=colors[idx])
    # Plot test accuracy
    axs[1].plot(range(history['epochs']), history['test_acc'], label='$d = $' + str(emb),
            linestyle=linestyles[idx],
            color=colors[idx])
    # Find and print maximum test accuracy and its epoch
    max_test_acc = max(history['test_acc'])
    max_test_acc_epoch = history['test_acc'].index(max_test_acc)
    print(f"Maximum test accuracy for emb={emb}: {max_test_acc} at epoch {max_test_acc_epoch}")
    # Store handles for legend
    legend_handles.append(train_line)

# Combine legends into a single legend at the bottom
fig.legend(legend_handles, [f'd = {emb}' for emb in embs], loc='upper center',
        ncol=len(embs), bbox_to_anchor=(0.5, 1.1))
axs[0].set_xlabel('epochs')
axs[0].set_ylabel('train accuracy')
axs[0].set_ylim(0.5, 1 + 0.05)
axs[1].set_xlabel('epochs')
axs[1].set_ylabel('test accuracy')
axs[1].set_ylim(0.5, 1 + 0.05)

plt.savefig(plots_dir + '/acc_embs' + ".pdf", format='pdf', dpi=250, bbox_inches='tight')

Table with statistical information about leaders

In [None]:
saved_dir = './saved'
embs = [2, 4, 8, 16]
N_reviews = np.shape(test_x)[0]
leaders = np.zeros((np.size(embs),N_reviews))
ini_leaders = np.zeros((np.size(embs),N_reviews))

for idx, emb in enumerate(embs):
    model = torch.load(saved_dir + '/model_d=' + str(emb) + '_epoch=2.pth')
    E = model.state_dict()['encoder.weight'].numpy()
    v = model.state_dict()['decoder.weight'].numpy()
    b = model.state_dict()['decoder.bias'].numpy()
    alpha = model.state_dict()['attention.alpha'].numpy() 
    X = E[test_x, :] # matrix with the embeddings of all test reviews 
    _, n, d = np.shape(X)
    s1 = 1 / (1 + alpha)
    s2 = alpha * s1
    for rev in range(N_reviews):
        z0 = X[rev,:,:].T #initial configuration embedding
        ## Run the hardmax dynamics ##
        W = np.zeros((d, n))
        z = np.zeros((d, n, depth+1)) #we want to do num_steps number of iterations, +1 to save the initial confiiguration
        z[:, :, 0] = z0
        f = z0.copy()
        for iter in range(depth+1):
            for i in range(n):
                IP = np.dot(f[:, i],f)
                Pij = np.zeros(n)
                ind = IP == np.max(IP)
                # Get initial leaders (in the layer = 0, i.e. first layer)
                if iter == 0:
                    if i == np.where(ind)[0][0]:
                        ini_leaders[idx,rev] += 1
                Pij[ind] = 1. / np.sum(ind)
                W[:, i] =  s2 * np.sum(Pij * f, axis=1)
            f = s1 * f + W
            z[:, :, iter] = f
        # Get all leaders (in the layer = depth, i.e. last layer)
        for i in range(n):
            y = np.dot(z[:, i, depth], z[:, :, depth])
            ind = y == np.max(y)
            if i == np.where(ind)[0][0]:
                leaders[idx,rev] += 1

# Define and print a table with statistical information on leaders
table = [["Average", *np.mean(leaders, axis = 1)],
         ["Std Dev", *np.std(leaders, axis = 1)],
         ["Min", *np.min(leaders, axis = 1)],
         ["Max", *np.max(leaders, axis = 1)],
         ["Frac. Ini.", *(np.mean(ini_leaders / leaders, axis = 1))]]

print(tabulate(table, headers=embs))

Select reviews to be plotted later

In [None]:
# Load model
saved_dir = './saved'
model = torch.load(saved_dir + '/model_d=2_epoch=100.pth')

# Selected reviews (features) in the test set, those correcty classified with predicted value > 0.999
confidence = 0.999 
selected_features = []
selected_labels = []
testloop = tqdm(testloader, leave=True, desc='Inference')
with torch.no_grad():
    for feature, target in testloop:
        feature, target = feature.to(device), target.to(device)
        out = model(feature)
        predicted = torch.tensor([1 if i == True else 0 for i in out > 0.5], device=device)
        equals = predicted == target
        # Select features and labels
        confidence_mask = ((out > confidence) | (out < (1 - confidence)))
        mask = confidence_mask.squeeze(dim=1) & equals
        selected_features.append(feature[mask].cpu())
        selected_labels.append(target[mask].cpu())
selected_features = torch.cat(selected_features, dim=0)
selected_labels = torch.cat(selected_labels, dim=0)
print(np.shape(selected_features))

Plot evolution of tokens from a single review

In [None]:
saved_dir = './saved'
dir_path = "./plots"

# Load model
model = torch.load(saved_dir + '/model_d=2_epoch=100.pth')

# Sect a single review from selected reviews
rev_number = 10
rev_features = selected_features[rev_number]
label = selected_labels[rev_number]

# Plotting options
options = {
    'movie': False,
    'save_plots': False,
    'mean': True,
    'levels': True,
    'trail': False,
    'start_white': False
}
plot_review(model, rev_features, label, int2word, depth, dir_path, options)

Study the frequency of leaders of correctly classified with high confidence reviews

In [None]:
# Load model
saved_dir = './saved'
plots_dir = './plots'
emb = 2
confidence = 0.95 #0.999
selected_features = []
selected_labels = []
all_leaders = [] #for histogram of leaders
testloop = tqdm(testloader, leave=True, desc='Inference')

model = torch.load(saved_dir + '/model_d=' + str(emb) + '_epoch=100.pth')
with torch.no_grad():
    for feature, target in testloop:
        feature, target = feature.to(device), target.to(device)
        out = model(feature)
        predicted = torch.tensor([1 if i == True else 0 for i in out > 0.5], device=device)
        equals = predicted == target
        # Select features and labels
        confidence_mask = ((out > confidence) | (out < (1 - confidence)))
        mask = confidence_mask.squeeze(dim=1) & equals
        selected_features.append(feature[mask].cpu())
        selected_labels.append(target[mask].cpu())
selected_features = torch.cat(selected_features, dim=0)
selected_labels = torch.cat(selected_labels, dim=0)
print(np.shape(selected_features))

In [None]:
tol = 2 # approximately ln(1/0.9 - 1)

N_reviews = np.shape(selected_features)[0]
E = model.state_dict()['encoder.weight'].numpy()
v = model.state_dict()['decoder.weight'].numpy()
b = model.state_dict()['decoder.bias'].numpy()
alpha = model.state_dict()['attention.alpha'].numpy() 
X = E[selected_features, :] # matrix with the embeddings of all test reviews 
_, n, d = np.shape(X)
s1 = 1 / (1 + alpha)
s2 = alpha * s1
for rev in range(N_reviews):
    z0 = X[rev,:,:].T #initial configuration embedding
    ## Run the hardmax dynamics ##
    W = np.zeros((d, n))
    z = np.zeros((d, n, depth+1)) #we want to do num_steps number of iterations, +1 to save the initial confiiguration
    z[:, :, 0] = z0
    f = z0.copy()
    for iter in range(depth+1):
        for i in range(n):
            IP = np.dot(f[:, i],f)
            Pij = np.zeros(n)
            ind = IP == np.max(IP)
            Pij[ind] = 1. / np.sum(ind) 
            W[:, i] =  s2 * np.sum(Pij * f, axis=1)
        f = s1 * f + W
        z[:, :, iter] = f
    # Get all leaders (in the layer = depth, i.e. last layer)
    for i in range(n):
        y = np.dot(z[:, i, depth], z[:, :, depth])
        ind = y == np.max(y)
        proj = np.abs(np.dot(z[:, i, depth], v.T) + b)
        # proj = np.abs(1 / (1 + np.exp(-proj * proj)) - 0.5)
        if i == np.where(ind)[0][0] and proj > tol:
            all_leaders.append(int2word[int(selected_features[rev][i])])

In [None]:
# Count the frequency of each tag
counter = Counter(all_leaders)

# Sort the tags by frequency in descending order
sorted_data = sorted(counter.items(), key=lambda x: x[1], reverse=True)

# Select only the most frequent tags
top = 15
top_data = sorted_data[:top]

# Separate the tags and their frequencies
tags, frequencies = zip(*top_data)

# Define the width of each bar
bar_width = 0.5 

# Set the size of the figure
plt.figure(figsize=(5, 3))
plt.bar(tags, frequencies, width=bar_width, edgecolor='black')
plt.ylabel('frequency')

# Rotate the x-axis labels vertically for better readability
plt.xticks(rotation=60, ha='center')

# Save the plot
plt.savefig(plots_dir + '/histogram_leaders' + '.pdf', format='pdf', dpi=250, bbox_inches='tight')