In [None]:
import os
import numpy as np
import pandas as pd
import absl.logging
from nlp_classifier import NaiveBayes, SVM, XGBoost, CNN, Dense, NLPClassifier, CNN2Step
from nlp_embeddings_no_nlu import DistilBERT, SentenceTransformerMPNET
from sklearn import preprocessing, metrics
from ast import literal_eval
import torch
absl.logging.set_verbosity(absl.logging.ERROR)

In [None]:
max_words = 400
dataset_name = 'small_balanced'
cnn_epochs = 5
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
optimizer = 'adam'
indiv_genre = 'Rock'

In [None]:
def train(data_x, data_y, nlp_embedding, nlp_classifier, label_encoder, batch_size, dataset_name, embeddings = None, epochs=1, model_dir='models', start_idx=0, fname_end=''):
    print('Training...')
    fname = os.path.join(model_dir, dataset_name, f'model_{nlp_embedding.name}_{nlp_classifier.name}{fname_end}')
    data_y_enc = label_encoder.transform(data_y)
    classes = np.unique(data_y_enc)
    
    for epoch in range(epochs):
        print(f'Epoch: {str(epoch + 1)}/{str(epochs)}')
        for i in range(start_idx, data_x.shape[0], batch_size):
            
            if i + batch_size > data_x.shape[0]:
                j = data_x.shape[0]
            else:
                j = i + batch_size
            
            print(f'Processing rows: {i} - {j - 1}')
            
            if embeddings is not None:
                embeddings_batch = embeddings[i:j]
            else:
                embeddings_batch = nlp_embedding.embed_lyrics(data_x[i:j])
                
            nlp_classifier.partial_fit(embeddings_batch.values, data_y_enc[i:j], classes=classes)
            nlp_classifier.save(fname)
        start_idx = 0
    
    print('Success!')

In [None]:
def test(data_x, nlp_embedding, nlp_classifier, label_encoder, batch_size, dataset_name, embeddings = None, pred_dir='predictions', start_idx=0, fname_end=''):
    print('Testing...')
    fname = os.path.join(pred_dir, dataset_name, f'model_{nlp_embedding.name}_{nlp_classifier.name}{fname_end}.csv')
    predictions_all = []

    if start_idx == 0 and os.path.exists(fname):
        os.remove(fname)
    
    for i in range(start_idx, data_x.shape[0], batch_size):

        if i + batch_size > data_x.shape[0]:
            j = data_x.shape[0]
        else:
            j = i + batch_size
        
        print(f'Processing rows: {i} - {j - 1}')

        if embeddings is not None:
            embeddings_batch = embeddings[i:j]
        else:
            embeddings_batch = nlp_embedding.embed_lyrics(data_x[i:j])
                
        predictions_enc = nlp_classifier.predict(embeddings_batch.values)
        predictions = label_encoder.inverse_transform(predictions_enc)
        
        predictions_all.extend(predictions)

        pd.DataFrame(predictions.reshape(-1, 1)).to_csv(fname, mode='a', index=False, header=False)
    
    print('Success!')    
    
    return predictions_all

In [None]:
def get_results(y_true, y_pred):
    print('RESULTS:')
    print(f'accuracy = {metrics.accuracy_score(y_true=y_true, y_pred=y_pred)}')
    print(f'balanced accuracy = {metrics.balanced_accuracy_score(y_true=y_true, y_pred=y_pred)}')
    print(f'f1 score = {metrics.f1_score(y_true=y_true, y_pred=y_pred, average="weighted")}')

In [None]:
def train_and_save_results(emb, clf, x_train, y_train, x_test, y_test, dataset_name, le, train_embeddings = None, test_embeddings = None, batch_size=5000, epochs=1, fname_end=''):
    train(x_train, y_train, emb, clf, le, batch_size, dataset_name, train_embeddings, epochs=epochs, fname_end=fname_end)
    y_pred = test(x_test, emb, clf, le, batch_size, dataset_name, test_embeddings, fname_end=fname_end)
    get_results(y_test, y_pred)
    return

In [None]:
def add_normalized_lyrics(data):
    tokens = data.tokens.apply(literal_eval)
    data['normalized_lyrics'] = [' '.join(t) for t in tokens]

In [None]:
model_dir = os.path.join('models', dataset_name)
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

pred_dir = os.path.join('predictions', dataset_name)
if not os.path.exists(pred_dir):
    os.makedirs(pred_dir)

In [None]:
train_data = pd.read_csv(f'data/train/{dataset_name}.csv')
test_data = pd.read_csv(f'data/test/{dataset_name}.csv')

train_data = train_data.loc[~train_data['lyrics'].isna()]
test_data = test_data.loc[~test_data['lyrics'].isna()]

In [None]:
add_normalized_lyrics(train_data)
add_normalized_lyrics(test_data)

In [None]:
genres = np.unique(train_data.genre)
label_encoder = preprocessing.LabelEncoder()
label_encoder.fit(genres)
label_encoder.classes_

In [None]:
indiv_genre_label = label_encoder.transform([indiv_genre])[0]

## DistilBERT

In [None]:
emb_distil_bert = DistilBERT(max_words, device)

In [None]:
train_embeddings_path = f'data/train/embeddings_nn/embedded_{emb_distil_bert.name}_{dataset_name}.csv'
train_embeddings = None
if os.path.exists(train_embeddings_path):
    train_embeddings = pd.read_csv(train_embeddings_path, header=None)
    
test_embeddings_path = f'data/test/embeddings_nn/embedded_{emb_distil_bert.name}_{dataset_name}.csv'
test_embeddings = None
if os.path.exists(test_embeddings_path):
    test_embeddings = pd.read_csv(test_embeddings_path, header=None)

train_embeddings_path_norm = f'data/train/embeddings/embedded_{emb_distil_bert.name}_{dataset_name}.csv'
train_embeddings_norm = None
if os.path.exists(train_embeddings_path):
    train_embeddings_norm = pd.read_csv(train_embeddings_path_norm, header=None)
    
test_embeddings_path_norm = f'data/test/embeddings/embedded_{emb_distil_bert.name}_{dataset_name}.csv'
test_embeddings_norm = None
if os.path.exists(test_embeddings_path):
    test_embeddings_norm = pd.read_csv(test_embeddings_path_norm, header=None)

### Unnormalized lyrics

In [None]:
clf_nb = NaiveBayes()
train_and_save_results(emb_distil_bert, clf_nb, 
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings)

In [None]:
clf_svm = SVM()
train_and_save_results(emb_distil_bert, clf_svm, 
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings)

In [None]:
clf_xgb = XGBoost(len(label_encoder.classes_))
train_and_save_results(emb_distil_bert, clf_xgb, 
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings)

In [None]:
clf_cnn = CNN(emb_distil_bert.embedding_size, len(genres), optimizer)
train_and_save_results(emb_distil_bert, clf_cnn,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, epochs=cnn_epochs)

In [None]:
clf_dense = Dense(emb_distil_bert.embedding_size, len(genres), optimizer)
train_and_save_results(emb_distil_bert, clf_dense,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, 
                       epochs=cnn_epochs)

In [None]:
clf_cnn2 = CNN2Step(emb_distil_bert.embedding_size, len(genres), optimizer, indiv_genre_label)
train_and_save_results(emb_distil_bert, clf_cnn2,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, 
                       epochs=cnn_epochs)

### Normalized lyrics

In [None]:
clf_nb_norm = NaiveBayes()
train_and_save_results(emb_distil_bert, clf_nb_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       fname_end='_norm')

In [None]:
clf_svm_norm = SVM()
train_and_save_results(emb_distil_bert, clf_svm_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       fname_end='_norm')

In [None]:
clf_xgb_norm = XGBoost(len(label_encoder.classes_))
train_and_save_results(emb_distil_bert, clf_xgb_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       fname_end='_norm')

In [None]:
clf_cnn_norm = CNN(max_words * emb_distil_bert.embedding_size, len(genres), optimizer')
train_and_save_results(emb_distil_bert, clf_cnn_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm,
                       epochs=cnn_epochs, fname_end='_norm')

In [None]:
clf_dense_norm = Dense(emb_distil_bert.embedding_size, len(genres), optimizer)
train_and_save_results(emb_distil_bert, clf_dense_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       epochs=cnn_epochs)

In [None]:
clf_cnn2_norm = CNN2Step(emb_distil_bert.embedding_size, len(genres), optimizer, indiv_genre_label)
train_and_save_results(emb_distil_bert, clf_cnn2_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       epochs=cnn_epochs)

## SentenceTransformerMPNET

In [None]:
emb_mpnet = SentenceTransformerMPNET()

In [None]:
train_embeddings_path = f'data/train/embeddings_nn/embedded_{emb_mpnet.name}_{dataset_name}.csv'
train_embeddings = None
if os.path.exists(train_embeddings_path):
    train_embeddings = pd.read_csv(train_embeddings_path, header=None)
    
test_embeddings_path = f'data/test/embeddings_nn/embedded_{emb_mpnet.name}_{dataset_name}.csv'
test_embeddings = None
if os.path.exists(test_embeddings_path):
    test_embeddings = pd.read_csv(test_embeddings_path, header=None)

train_embeddings_path_norm = f'data/train/embeddings/embedded_{emb_mpnet.name}_{dataset_name}.csv'
train_embeddings_norm = None
if os.path.exists(train_embeddings_path):
    train_embeddings_norm = pd.read_csv(train_embeddings_path_norm, header=None)
    
test_embeddings_path_norm = f'data/test/embeddings/embedded_{emb_mpnet.name}_{dataset_name}.csv'
test_embeddings_norm = None
if os.path.exists(test_embeddings_path):
    test_embeddings_norm = pd.read_csv(test_embeddings_path_norm, header=None)

### Unnormalized lyrics

In [None]:
clf_cnn = CNN(emb_mpnet.embedding_size, len(genres), optimizer)
train_and_save_results(emb_mpnet, clf_cnn,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, epochs=cnn_epochs)

In [None]:
clf_dense = Dense(emb_mpnet.embedding_size, len(genres), optimizer)
train_and_save_results(emb_mpnet, clf_dense,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, 
                       epochs=cnn_epochs)

In [None]:
clf_cnn2 = CNN2Step(emb_mpnet.embedding_size, len(genres), optimizer, indiv_genre_label)
train_and_save_results(emb_mpnet, clf_cnn2,
                       train_data.lyrics, train_data.genre, test_data.lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings, test_embeddings, 
                       epochs=cnn_epochs)

### Normalized lyrics

In [None]:
clf_cnn_norm = CNN(max_words * emb_mpnet.embedding_size, len(genres), optimizer)
train_and_save_results(emb_mpnet, clf_cnn_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm,
                       epochs=cnn_epochs, fname_end='_norm')

In [None]:
clf_dense_norm = Dense(emb_mpnet.embedding_size, len(genres), optimizer)
train_and_save_results(emb_mpnet, clf_dense_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       epochs=cnn_epochs)

In [None]:
clf_cnn2_norm = CNN2Step(emb_mpnet.embedding_size, len(genres), optimizer, indiv_genre_label)
train_and_save_results(emb_mpnet, clf_cnn2_norm,
                       train_data.normalized_lyrics, train_data.genre, test_data.normalized_lyrics, test_data.genre, 
                       dataset_name, label_encoder, 
                       train_embeddings_norm, test_embeddings_norm, 
                       epochs=cnn_epochs)