<a href="https://colab.research.google.com/github/NikolaJanik/Polish_poetry_classification_with_transformers/blob/main/odleg%C5%82o%C5%9Bci_cosinusowe_augmentacja_s%C5%82%C3%B3w_NN__herBERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install sacremoses

In [None]:
from google.colab import files
import pandas as pd
import numpy as np
import random
import os
from tqdm import tqdm
import seaborn as sns
from sklearn.model_selection import cross_val_score, train_test_split
import joblib
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
import matplotlib.pyplot as plt
import torch
from transformers import HerbertTokenizer, RobertaModel, AutoTokenizer, BertModel

import tensorflow as tf
import tensorflow.keras as keras

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
from tensorflow.keras.models import load_model

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import lightgbm as lgb

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

In [None]:
os.mkdir("figs")

In [None]:
def get_confusion_matrix(model, X_test, y_test, model_name, data_type, classes):

  cls =[]
  for k in classes.keys():
    cls.append(k)

  y_pred = model.predict(X_test)
  pred_labels=[]
  for idx in range(len(y_pred)):
    pred_label = np.argmax(y_pred[idx])
    pred_labels.append(pred_label)

  true_labels = y_test
  score = accuracy_score(true_labels, pred_labels)
  cm = confusion_matrix(true_labels, pred_labels, normalize='true')

  #disp = ConfusionMatrixDisplay(confusion_matrix=cm)
  #disp.plot()
  #disp.ax_.set_title("Model: {} | Data type: {} |  Acc: {}".format(model_name, data_type, score))

  #if classes is not None:
      #tick_marks = np.arange(len(cls))
      #plt.xticks(tick_marks, cls, rotation=45)
      #plt.yticks(tick_marks, cls, rotation=50)

  #plt.gcf().set_size_inches(10, 10)
  #plt.savefig('/content/figs/{}_{}.png'.format(model_name, data_type), dpi=200)
  #files.download('/content/figs/{}_{}.png'.format(model_name, data_type))

  return score, cm

In [None]:
def get_data_set(labels, df):
  idxs = []
  for label in labels:
    idxs_for_label, = np.where(df['Label'] == label)
    for idx in idxs_for_label:
      idxs.append(idx)

  new_df = df.iloc[idxs]
  new_df = new_df.sample(frac = 1).reset_index(drop=True)
  return new_df

In [None]:
def print_classes(df):
  authors = {}
  y = df['Label']
  if len(df['Label'].unique()) < 8:
    y = df ['Label'].factorize()[0]
  num_classes = len(df['Label'].unique())
  for label in range(0, num_classes):
    i, = np.where(y == label)
    authors['{}'.format(df['Author-short'][i[0]])] = label

  return authors

In [None]:
def make_embedding(df, model):

  X_stack = []
  model_name, tokenizer, model = model
  embedded = {}
  tokens = {}
  num_idxs = df.shape[0]
  for idx in tqdm(range(0,num_idxs)):
    single_poem_input = df['Text'][idx]
    inputs = tokenizer.batch_encode_plus([single_poem_input], max_length = 512, padding="longest", add_special_tokens=True, return_tensors="pt",)
    single_poem_output = model(**inputs)
    X_single_poem = single_poem_output[0][:,0,:].detach().numpy()
    X_stack.append(X_single_poem[0])

    embedded[idx] = X_single_poem[0], df['Label'][idx]

  df_embedded = pd.DataFrame.from_dict(embedded,  orient='index', columns=['embedding', 'label'])

  return df_embedded

In [None]:
def normalize_data(X):

  X_normalized = np.zeros((X.shape[0],X.shape[1]))

  for idx in range(0,X.shape[0]):
    X_normalized[idx,:] = (X[idx,:] - np.mean(X[idx,:]))/ np.std(X[idx,:])

  return X_normalized

In [None]:
def get_X_y(df, normalization=True):

  X = np.stack(df['embedding'])
  y = df['Label']
  if(normalization==True):
    X = normalize_data(X)

  #jeśli jest mniej niż 8 klas:
  if len(df['Label'].unique()) < 8:
    y = df ['Label'].factorize()[0]

  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
  X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2)

  print(X.shape)

  return X, y, X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
def get_X_y_train(df, normalization=True):

  X = np.stack(df['embedding'])
  y = df['Label'].values
  if(normalization==True):
    X = normalize_data(X)

  #jeśli jest mniej niż 8 klas:
  if len(df['Label'].unique()) < 8:
    y = df ['Label'].factorize()[0]

  X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)


  print(X.shape)

  return X, y, X_train, X_val, y_train, y_val

In [None]:
def get_cosinus_predictions(X_train, y_train, X_test, y_test):

 # X_train_normalized = get_normalization(X_train)
 # X_test_normalized  = get_normalization(X_test)

  train_labels = np.unique(y_train)
  test_labels  = np.unique(y_test)
  confusion_matrix = np.zeros((train_labels.shape[0], test_labels.shape[0]))

  for idx_x_test, x_test in enumerate(X_test):
    y_true = y_test[idx_x_test]

    cos_distance_min = 10000
    y_pred = 0
    for idx_x_train, x_train in enumerate(X_train):
      cos_distance = np.dot(x_test,x_train)
      if(cos_distance < cos_distance_min):
        cos_distance_min = cos_distance
        y_pred = y_train[idx_x_train]

    confusion_matrix[y_true, y_pred] = confusion_matrix[y_pred, y_true] + 1

  for label in test_labels:
    n_y_true = np.where(label == y_test)[0]
    confusion_matrix[y_true, :] = confusion_matrix[y_true, :]/n_y_true*100

  plt.imshow(confusion_matrix)
  plt.colorbar()
  return confusion_matrix


In [None]:
df_orginal = pd.DataFrame
df_orginal = pd.concat([df_raw["Text"],df_raw["Label"],df_raw["Author-short"]], axis=1)
#df_orginal = df_orginal.sample(frac = 1).reset_index(drop=True)
df_orginal

In [None]:
embed = make_embedding(df_orginal, herbert_klej)
#embed = make_embedding(df_orginal, herbert_large)
df_orginal = pd.concat([df_orginal, embed['embedding']], axis=1)
df_orginal

In [None]:
embed_men = make_embedding(df_men, herbert_klej)
df_men = pd.concat([df_men, embed_men['embedding']], axis=1)
df_men

In [None]:
embed_women = make_embedding(df_women, herbert_klej)
df_women = pd.concat([df_women, embed_women['embedding']], axis=1)
df_women

In [None]:
df = df_women
norm = False
data_type = 'women'

classes = print_classes(df)
cls =[]
for k in classes.keys():
  cls.append(k)

In [None]:
n_realizations = 10
CM_aver = np.zeros((4,4))
if norm==True:
  normalize = 1
else:
  normalize = 0

for n_realization in range(n_realizations):
  X, y, X_train, X_test, y_train, y_test = get_X_y_train(df, normalization=False)

  confusion_matrix = np.zeros((4,4))
  train_labels = np.unique(y_train)
  test_labels  = np.unique(y_test)

  for idx_x_test in range(0,X_test.shape[0]):

    x_test = X_test[idx_x_test,:]
    y_true = int(y_test[idx_x_test])

    distance_min = 10000
    y_pred = 0

    distance_from_x_train = np.zeros(X_train.shape[0])
    for idx_x_train in range(X_train.shape[0]):
      x_train = X_train[idx_x_train, :]
      #distance = np.dot(x_test, x_train)
      distance = np.sqrt(np.sum((x_train-x_test)**2)) #euclidean distance
      distance_from_x_train[idx_x_train] = distance

    idx_min_distance = np.argmin(distance_from_x_train)
    y_pred = int(y_train[idx_min_distance])
    #print(y_pred)
      #print(y_true, y_pred)

    confusion_matrix[y_true, y_pred] = confusion_matrix[ y_true, y_pred] + 1

  for y_true in test_labels:

    y_true = int(y_true)
    confusion_matrix[y_true, :] = confusion_matrix[ y_true, :]/np.sum(confusion_matrix[y_true,:])

  #print(n_realization, np.mean(np.diag(confusion_matrix)))
  CM_aver = CM_aver + confusion_matrix
CM_aver = CM_aver/n_realizations
acc =  round(np.mean(np.diag(CM_aver)), 2)
disp = ConfusionMatrixDisplay(confusion_matrix=CM_aver)
disp.plot()
disp.ax_.set_title("Prediction by euclidean distance | Data type: {} |  Acc: {}".format(data_type, acc))
tick_marks = np.arange(len(cls))
plt.xticks(tick_marks, cls, rotation=45)
plt.yticks(tick_marks, cls, rotation=50)

plt.gcf().set_size_inches(10, 10)
plt.savefig('/content/figs/prediction_euclidean_distance_{}_normalize_{}.png'.format(data_type, normalize), dpi=200)
#files.download('/content/figs/prediction_euclidean_distance_{}_normalize_{}.png'.format(data_type, normalize))



print(np.mean(np.diag(CM_aver)))

In [None]:
#Distance from averaged vectors

n_realizations = 100
CM_aver = np.zeros((8,8))
for n_realization in range(n_realizations):
  X, y, X_train, X_test, y_train, y_test = get_X_y_train(df, normalization=True)

  idx_class_0 = np.where(y_train == 0)[0]
  idx_class_1 = np.where(y_train == 1)[0]
  idx_class_2 = np.where(y_train == 2)[0]
  idx_class_3 = np.where(y_train == 3)[0]
  idx_class_4 = np.where(y_train == 4)[0]
  idx_class_5 = np.where(y_train == 5)[0]
  idx_class_6 = np.where(y_train == 6)[0]
  idx_class_7 = np.where(y_train == 7)[0]

  X_aver = np.zeros((8,X_train.shape[1]))

  X_aver[0,:] = np.mean(X_train[idx_class_0,:],axis=0)
  X_aver[1,:] = np.mean(X_train[idx_class_1,:],axis=0)
  X_aver[2,:] = np.mean(X_train[idx_class_2,:],axis=0)
  X_aver[3,:] = np.mean(X_train[idx_class_3,:],axis=0)
  X_aver[4,:] = np.mean(X_train[idx_class_4,:],axis=0)
  X_aver[5,:] = np.mean(X_train[idx_class_5,:],axis=0)
  X_aver[6,:] = np.mean(X_train[idx_class_6,:],axis=0)
  X_aver[7,:] = np.mean(X_train[idx_class_7,:],axis=0)


  confusion_matrix = np.zeros((  8, 8))
  train_labels = np.unique(y_train)
  test_labels  = np.unique(y_test)

  for idx_x_test in range(0,X_test.shape[0]):

    x_test = X_test[idx_x_test,:]
    y_true = int(y_test[idx_x_test])

    distance_min = 10000
    y_pred = 0

    distance_from_x_train = np.zeros(X_aver.shape[0])
    for idx_x_train in range(X_aver.shape[0]):
      x_aver_train = X_aver[idx_x_train, :]
      #distance = np.dot(x_test, x_train)
      distance = np.sqrt(np.sum((x_aver_train-x_test)**2)) #euclidean distance
      distance_from_x_train[idx_x_train] = distance

    idx_min_distance = np.argmin(distance_from_x_train)
    y_pred = int(y_train[idx_min_distance])
    #print(y_pred)
      #print(y_true, y_pred)

    confusion_matrix[y_true, y_pred] = confusion_matrix[ y_true, y_pred] + 1

  for y_true in test_labels:

    y_true = int(y_true)
    confusion_matrix[y_true, :] = confusion_matrix[ y_true, :]/np.sum(confusion_matrix[y_true,:])

  print(n_realization, np.mean(np.diag(confusion_matrix)))
  CM_aver = CM_aver + confusion_matrix
CM_aver = CM_aver/n_realizations
disp = ConfusionMatrixDisplay(confusion_matrix=CM_aver)
disp.plot()

tick_marks = np.arange(len(cls))
plt.xticks(tick_marks, cls, rotation=45)
plt.yticks(tick_marks, cls, rotation=50)
plt.gcf().set_size_inches(10, 10)



print(np.mean(np.diag(CM_aver)))

In [None]:
def get_X_y_test(df, normalization=True):
  X = np.stack(df['embedding'])
  y = df['Label']
  if(normalization == True):
    X = normalize_data(X)

  #jeśli jest mniej niż 8 klas:
  if len(df['Label'].unique()) < 8:
    y = df ['Label'].factorize()[0]

  return X, y

In [None]:
herbert_large = ["Herbert-large", HerbertTokenizer.from_pretrained("allegro/herbert-large-cased"), RobertaModel.from_pretrained("allegro/herbert-large-cased")]
herbert_base = ["Herbert-base", HerbertTokenizer.from_pretrained("allegro/herbert-base-cased"), RobertaModel.from_pretrained("allegro/herbert-base-cased")]
herbert_klej = ["Herbert-klej", HerbertTokenizer.from_pretrained("allegro/herbert-klej-cased-tokenizer-v1"), RobertaModel.from_pretrained("allegro/herbert-klej-cased-v1")]

In [None]:
df_raw = pd.read_csv('/content/wiersze_do_BERT_Herbert_Miłosz.csv', ";")
df_raw .columns

In [None]:
df_raw  = df_raw.drop(columns = ['Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11'])
df_raw.shape

In [None]:
df_raw.iloc[400:]

In [None]:
df_raw = df_raw.drop(df_raw.index[400:])
df_raw

In [None]:
df_women = df_raw[200:].reset_index(drop=True)
df_men = df_raw[:200].reset_index(drop=True)
df_women = df_women.sample(frac = 1).reset_index(drop=True)
df_men = df_men.sample(frac = 1).reset_index(drop=True)

In [None]:
df_orginal = pd.DataFrame
df_orginal = pd.concat([df_raw["Text"],df_raw["Label"],df_raw["Author-short"]], axis=1)
df_orginal = df_orginal.sample(frac = 1).reset_index(drop=True)
df_orginal