In [None]:
# Mount google drive
import os
try:
    import google.colab
    IN_COLAB = True
except:
    IN_COLAB = False
if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive',force_remount=True)
    base_folder = '/content/drive/My Drive/unibo/NLP_project/BarneyBot'
    os.system("pip install datasets")
    os.system("pip install transformers")
    os.system("pip install rouge_score")
    os.system("pip install -U sentence-transformers")
else:
    base_folder = os.getcwd()
    
in_folder = os.path.join(base_folder, "in")
if not os.path.exists(in_folder):
    os.makedirs(in_folder)
out_folder = os.path.join(base_folder, "out")
if not os.path.exists(out_folder):
    os.makedirs(out_folder)

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import pandas as pd
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt
%matplotlib inline
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split

In [None]:
character_dict = {
    'Barney':{
        'classifier_name': 'barney_classifier',
        'series_df_filename': 'HIMYM.csv',
        'classifier_df': 'barney_classifier.csv',
        'encoded_lines_filename': 'barney_encoded_lines.npy',
        'source': 'HIMYM'
    },
    'Sheldon':{
        'classifier_name': 'sheldon_classifier',
        'series_df_filename': 'TBBT.csv',
        'classifier_df': 'sheldon_classifier.csv',
        'encoded_lines_filename': 'sheldon_encoded_lines.npy',
        'source': 'TBBT'
    },
    'Harry':{
        'classifier_name': 'harry_classifier',
        'series_df_filename': 'HP.csv',
        'classifier_df': 'harry_classifier.csv',
        'encoded_lines_filename': 'harry_encoded_lines.npy',
        'source': 'HP'
    },
    'Fry':{
        'classifier_name': 'fry_classifier',
        'series_df_filename': 'Futurama.csv',
        'classifier_df': 'fry_classifier.csv',
        'encoded_lines_filename': 'fry_encoded_lines.npy',
        'source': 'Futurama'
    },
    'Vader':{
        'classifier_name': 'vader_classifier',
        'series_df_filename': 'SW.csv',
        'classifier_df': 'vader_classifier.csv',
        'encoded_lines_filename': 'vader_encoded_lines.npy',
        'source': 'SW'
    },
             }

In [None]:
batch_size = 16
epochs = 1000
lr = 1e-6
regularizer_weight_r = 1e-4
regularizer_weight_s = 1e-3
dropout_rate = 0.2
train_size = 0.85
test_size = 0.10
n_shuffles = 10

from_saved_embeddings = True

character = 'Barney'

version = ''
shutdown_at_end = False # 'h'

In [None]:
character_folder = os.path.join(base_folder, "Data", "Sources", character_dict[character]['source'])

model_path = os.path.join(character_folder, character_dict[character]['classifier_name'])

# Dataset

In [None]:
series_df = pd.read_csv(os.path.join(character_folder, character_dict[character]['series_df_filename']))

In [None]:
series_df[series_df['character']==character]

In [None]:
series_df['character'] = series_df['character'].apply(lambda x: 1 if x==character else 0)

In [None]:
series_df[series_df['character']==1]

In [None]:
series_df = series_df[['character', 'line']]

In [None]:
series_df

# Model

## Sentence Transformer

In [None]:
# if it cannot find sentence embeddings, set from_saved_embeddings = True
if not os.path.exists(os.path.join(character_folder, character_dict[character]['encoded_lines_filename'])):
    from_saved_embeddings = False
    print('Encoded lines not found, from_saved_embeddings set to False')

In [None]:
from sentence_transformers import SentenceTransformer

if not from_saved_embeddings:
    sentence_transformer = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2")

## Sentence Encoding

In [None]:
if not from_saved_embeddings:

    series_df['encoded_line'] = [sentence_transformer.encode(line) for line in tqdm(series_df['line'])]

    # save sentences dataset
    series_df[['line', 'character']].to_csv(
        os.path.join(character_folder, character_dict[character]['classifier_df']), 
        index = False
    )

    np.save(
        os.path.join(character_folder, character_dict[character]['encoded_lines_filename']),
        series_df['encoded_line'].to_numpy()
    )

In [None]:
# read sentences dataaset
series_df = pd.read_csv(
    os.path.join(character_folder, character_dict[character]['classifier_df']),
    dtype={'line': str,
           'character': int
          }
)

series_df['encoded_line'] = np.load(
    os.path.join(character_folder, character_dict[character]['encoded_lines_filename']), 
    allow_pickle=True
)

In [None]:
series_df

In [None]:
series_train_df, series_test_df = train_test_split(series_df, test_size=test_size)

In [None]:
series_train_df, series_val_df = train_test_split(series_train_df, test_size = 1-train_size-test_size)

In [None]:
def get_triplet_df(series_df, n_shuffles=1):
    
    # separate character from others
    series_df_1 = series_df[series_df['character']==1].copy()
    series_df_0 = series_df[series_df['character']==0].copy()
    
    df_rows = {'character':[], 'encoded_lines':[]}
    
    for _ in range(n_shuffles):
        # shuffle dataset
        series_df_1 = series_df_1.sample(frac=1).reset_index(drop=True)
        series_df_0 = series_df_0.sample(n=len(series_df_1)).reset_index(drop=True)
        
        for i in tqdm(range(2,len(series_df_1))):
            # character
            lines = list(series_df_1['encoded_line'][i-2:i+1])
            lines = np.concatenate(lines)
            df_rows['character'].append(1)
            df_rows['encoded_lines'].append(lines)

            # other
            lines = list(series_df_0['encoded_line'][i-2:i+1])
            lines = np.concatenate(lines)
            df_rows['character'].append(0)
            df_rows['encoded_lines'].append(lines)

    df = pd.DataFrame(data=df_rows)
    
    return df.sample(frac=1).reset_index(drop=True)

In [None]:
shuffled_df = get_triplet_df(series_df, n_shuffles=n_shuffles)

In [None]:
len(shuffled_df)

## Create Classification Dataset

In [None]:
tot_len = len(shuffled_df)
train_len = int(tot_len*train_size)
test_len = int(tot_len*test_size)
val_len = tot_len - train_len - test_len

print(tot_len, train_len, test_len, val_len)

In [None]:
print('Loading training data...')
X_train = np.array([[float(e) for e in s] for s in tqdm(shuffled_df['encoded_lines'][:train_len])])
y_train = np.array([c for c in tqdm(shuffled_df['character'][:train_len])])

print('Loading test data...')
X_test = np.array([[float(e) for e in s] for s in tqdm(shuffled_df['encoded_lines'][:test_len])])
y_test = np.array([c for c in tqdm(shuffled_df['character'][:test_len])])

print('Loading validation data...')
X_val = np.array([[float(e) for e in s] for s in tqdm(shuffled_df['encoded_lines'][:val_len])])
y_val = np.array([c for c in tqdm(shuffled_df['character'][:val_len])])

In [None]:
# compute some statistics
train_percentage_1 = len(y_train[y_train==1])/len(y_train)
train_percentage_0 = len(y_train[y_train==0])/len(y_train)

val_percentage_1 = len(y_val[y_val==1])/len(y_val)
val_percentage_0 = len(y_val[y_val==0])/len(y_val)
print('\t0 (%)\t\t1 (%)')
print('train\t{:.2f}\t\t{:.2f}'.format(train_percentage_0, train_percentage_1))
print('val\t{:.2f}\t\t{:.2f}'.format(val_percentage_0, val_percentage_1))

## Classification Model

In [None]:
# Import keras/tensorflow libraries
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import regularizers

In [None]:
# create model
def create_model():
    inputs = keras.Input(shape=(len(X_train[0],)))
    
    x = layers.Dense(
        1024,
        activation='relu',
        # kernel_regularizer=regularizers.l2(regularizer_weight),
        # bias_regularizer=regularizers.l2(regularizer_weight)
    )(inputs)
    x = layers.BatchNormalization()(x)
    
    x = layers.Dense(
        1024,
        activation='relu',
        # kernel_regularizer=regularizers.l2(regularizer_weight),
        # bias_regularizer=regularizers.l2(regularizer_weight)
    )(x)
    x = layers.BatchNormalization()(x)
    
    x = layers.Dense(
        512, 
        activation='relu',
        # kernel_regularizer=regularizers.l2(regularizer_weight),
        # bias_regularizer=regularizers.l2(regularizer_weight)
    )(x)
    x = layers.BatchNormalization()(x)
    
    x = layers.Dense(
        256, 
        activation='relu',
        # kernel_regularizer=regularizers.l2(regularizer_weight),
        # bias_regularizer=regularizers.l2(regularizer_weight)
    )(x)
    x = layers.BatchNormalization()(x)

    x = layers.Dense(
        128, 
        activation='relu',
        kernel_regularizer=regularizers.l2(regularizer_weight_r),
        bias_regularizer=regularizers.l2(regularizer_weight_r)
    )(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(dropout_rate)(x)
    out = layers.Dense(
        1, 
        activation='sigmoid',
        kernel_regularizer=regularizers.l2(regularizer_weight_s),
        bias_regularizer=regularizers.l2(regularizer_weight_s)
    )(x)


    classifier_model = keras.Model(inputs, out)
    classifier_model.compile(
        loss = keras.losses.BinaryCrossentropy(),
        optimizer = keras.optimizers.Adam(learning_rate = lr),
        metrics = [keras.metrics.BinaryAccuracy(), keras.metrics.Recall()]
    )
    return classifier_model

In [None]:
classifier_model = create_model()

## Training

In [None]:
earlystop_callback = callbacks.EarlyStopping(
        monitor="val_binary_accuracy",
        min_delta=0,
        patience=6,
        verbose=0,
        mode="max",
        baseline=None,
        restore_best_weights=True,
    )

In [None]:
train_history = classifier_model.fit(
    X_train, 
    y_train,
    validation_data = (X_val, y_val),
    epochs= epochs,
    verbose = 1, 
    callbacks=[earlystop_callback],
    batch_size = batch_size
)

In [None]:
print('#'*25 + ' Model Test ' + '#'*25)
fig, ax=plt.subplots(1,1,figsize=(5,5))
y_pred = classifier_model.predict(X_test).round()
# Plot the confusion matrix normalizing over the true values (over the rows)
cm = confusion_matrix(y_test, y_pred) #, normalize='pred')
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Others', character])
disp.plot(ax=ax)
plt.show()

In [None]:
classifier_path = os.path.join(character_folder, character_dict[character]['classifier_name']+version)
classifier_model.save(classifier_path)

In [None]:
# Save history as a JSON file
import json
filename = character.lower() + '_training_history' + version + '.json'

output_string = json.dumps(train_history.history)
with open(os.path.join(character_folder, filename), 'w') as file:
    file.write(output_string)

In [None]:
if shutdown_at_end:
    os.system('shutdown /' + shutdown_at_end)