<a href="https://colab.research.google.com/github/cgray1117/NLPTransformersProject/blob/main/Copy_of_DeBERTa.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install libraries
!pip install imblearn
!pip install transformers
!pip install tokenizers

In [None]:
# Import libraries
import numpy as np
import regex as re
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statistics
import math
import os

import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import TweetTokenizer

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import RandomizedSearchCV

import tensorflow as tf
import tensorflow.python.keras
import tensorflow.python.keras.backend as K

import tokenizers
from transformers import DebertaTokenizer, TFDebertaModel
from collections import Counter
from imblearn.over_sampling import RandomOverSampler

In [None]:
MODEL_NAME = 'microsoft/deberta-base'
MAX_LEN = 256
ARTIFACTS_PATH = '../artifacts/'
EPOCHS = 5
if not os.path.exists(ARTIFACTS_PATH):
    os.makedirs(ARTIFACTS_PATH)

## Data NLP

In [None]:
df = pd.read_csv('project_data.csv')

# Identify null and remove
df["Label"].isnull().sum()
df['Label'].replace('', np.nan, inplace=True)
df.dropna(subset=['Label'], inplace=True)

In [None]:
# Drop extra columns
df = df.drop('Username', axis=1)
df = df.drop('UserLocation', axis=1)
df.head()

In [None]:
tknzr = TweetTokenizer()

# Tokenize each tweet
df['tokenized'] = df['Text'].map(lambda t: tknzr.tokenize(t))

# lowecase, strip and ensure we only include words
df['tokenized'] = df['tokenized'].map(
    lambda t: [word.lower().strip() for word in t if word.isalpha()])

# Importing stopwords
nltk.download("stopwords")

stopwords_en = stopwords.words('english')

# Instantiate the WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()

# lemmarize and remove stopwords
nltk.download('omw-1.4')
nltk.download('wordnet')

df['tokenized'] = df['tokenized'].map(
    lambda t: [wordnet_lemmatizer.lemmatize(word) for word in t 
               if word not in stopwords_en])

In [None]:
df['tokenized'][:10]

In [None]:
token_tweets = []
for lst in df['tokenized']:
  txt = " ".join(lst)
  token_tweets.append(txt)

X_data = np.array(token_tweets)
y_data = df[["Label"]].to_numpy().reshape(-1)

## EDA

In [None]:
categories = df[['Label']].values.reshape(-1)
counter_categories = Counter(categories)
category_names = counter_categories.keys()
category_values = counter_categories.values()
y_pos = np.arange(len(category_names))
plt.figure(1, figsize=(10, 5))
plt.bar(y_pos, category_values, align='center', alpha=0.5)
plt.xticks(y_pos, category_names)
plt.ylabel('Number of texts')
plt.xlabel('Labels')
plt.title('Distribution of texts per category')
plt.gca().yaxis.grid(True)
plt.show()
print(counter_categories)

In [None]:
n_texts = len(X_data)
print('Texts in dataset: %d' % n_texts)
n_categories = len(df['Label'].unique())
print('Number of categories: %d' % n_categories)

## Decode, and Create Deberta

In [None]:
def deberta_encode(texts, tokenizer): # Create encoding function
    ct = len(texts) # Assign "ct" to number of rows in data
    input_ids = np.ones((ct, MAX_LEN), dtype='int32') # Assign "input_ids" to ct x MAX_LEN array of ones
    attention_mask = np.zeros((ct, MAX_LEN), dtype='int32') # Assign "attention_mask" to ct x MAX_LEN array of zeros
    token_type_ids = np.zeros((ct, MAX_LEN), dtype='int32') # # Assign "token_type_ids" to ct x MAX_LEN arry of zeros

    for k, text in enumerate(texts): # Iterating through rows of data with "text" = row and "k" = iteration number 
        tok_text = tokenizer.tokenize(text) # Assign "tok_text" to tokenized row using DeBERTa tokenizer
        enc_text = tokenizer.convert_tokens_to_ids(tok_text[:(MAX_LEN-2)]) # Truncate and convert tokens to numerical ids
        input_length = len(enc_text) + 2 # Assign "input_length" to 2 + rows of encoded text 
        input_length = input_length if input_length < MAX_LEN else MAX_LEN # Ensure input length is <= MAX_LEN
        input_ids[k,:input_length] = np.asarray([0] + enc_text + [2], dtype='int32') # Place encoded text in input_id array
        attention_mask[k,:input_length] = 1 # Set attention_mask of encoded text to 1

    return {'input_word_ids': input_ids,'input_mask': attention_mask,
            'input_type_ids': token_type_ids}

In [None]:
# Commented out lines here were for oversmapling
X_train, X_rem, y_train, y_rem = train_test_split(X_data, y_data, test_size=0.3, random_state=444)

X_valid, X_test, y_valid, y_test = train_test_split(X_rem, y_rem, test_size=0.5)

#oversample = RandomOverSampler(sampling_strategy='minority')
#X_train = pd.DataFrame(X_train)
#y_train = pd.DataFrame(y_train)
#X_train, y_train = oversample.fit_resample(X_train, y_train)
#X_train = X_train.to_numpy().reshape(-1)
#y_train = y_train.to_numpy().reshape(-1)

In [None]:
tokenizer = DebertaTokenizer.from_pretrained(MODEL_NAME)

X_train = deberta_encode(X_train, tokenizer)
X_test = deberta_encode(X_test, tokenizer)
X_valid = deberta_encode(X_valid, tokenizer)

y_valid = np.asarray(y_valid, dtype='int32').reshape(-1)
y_train = np.asarray(y_train, dtype='int32').reshape(-1)
y_test = np.asarray(y_test, dtype='int32').reshape(-1)

## Build Model

In [None]:
def build_model(n_categories):
      input_word_ids = tf.keras.Input(shape=(MAX_LEN,), dtype=tf.int32, name='input_word_ids') # Creating inputs for keras model
      input_mask = tf.keras.Input(shape=(MAX_LEN,), dtype=tf.int32, name='input_mask')
      input_type_ids = tf.keras.Input(shape=(MAX_LEN,), dtype=tf.int32, name='input_type_ids')

      # Import Deberta model from HuggingFace
      deberta_model = TFDebertaModel.from_pretrained(MODEL_NAME) # Initialize model
      x = deberta_model(input_word_ids, attention_mask=input_mask, token_type_ids=input_type_ids) # Assign "x" to model with inputs

      x = x[0] # Slice out the embeddings output

      x = tf.keras.layers.Dropout(0.1)(x)
      x = tf.keras.layers.Flatten()(x)
      x = tf.keras.layers.Dense(256, activation='relu')(x)
      x = tf.keras.layers.Dense(n_categories, activation='softmax')(x)

      model = tf.keras.Model(inputs=[input_word_ids, input_mask, input_type_ids], outputs=x)
      model.compile(
          optimizer=tf.keras.optimizers.Adam(lr=1e-5),
          loss='sparse_categorical_crossentropy',
          metrics=['accuracy'])

      return model

In [None]:
model = build_model(n_categories)
model.summary()

## Train and Evaluate

In [None]:
# the code in this block was adpated from https://stackoverflow.com/questions/48118111/get-loss-values-for-each-training-instance-keras
class LossHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.losses = []
    def on_batch_end(self, batch, logs={}):
        self.losses.append(logs.get('loss'))

history = LossHistory()
model.fit(X_train, y_train, batch_size=16, epochs=EPOCHS, verbose=1, validation_data=(X_valid, y_valid), callbacks=[history])
print(history.losses)

plt.plot(history.losses, linestyle = 'dotted')
#plt.ylabel('Number of texts')
plt.title('Distribution of Loss')
plt.show()
# history = model.fit(X_train,y_train,epochs=EPOCHS,batch_size=16,verbose=1,validation_data=(X_test, y_test))

Epoch 1/5

In [None]:
# Get training and test loss histories
train_loss = model.history['loss']
val_loss = model.history['val_loss']

# Create count of the number of epochs
epoch_count = range(1, len(train_loss) + 1)

plt.plot(epoch_count, train_loss, linestyle = 'dotted')
plt.plot(epoch_count, val_loss, linestyle = 'dotted')
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend(['Training Loss', 'Validation Loss'])
plt.show()

In [None]:
# Get training and test accuracy histories
train_acc = history.history['acc']
val_acc = history.history['val_acc']

# Create count of the number of epochs
epoch_acc = range(1, len(train_acc) + 1)

plt.plot(epoch_count, train_loss, linestyle = 'dotted')
plt.plot(epoch_count, val_loss, linestyle = 'dotted')
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend(['Training Loss', 'Validation Loss'])
plt.show()

## Visualizations

In [None]:
def plot_confusion_matrix(X_test, y_test, model):
    y_pred = model.predict(X_test)
    y_pred = [np.argmax(i) for i in model.predict(X_test)]

    con_mat = tf.math.confusion_matrix(labels=y_test, predictions=y_pred).numpy()

    con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)
    label_names = list(range(len(con_mat_norm)))

    con_mat_df = pd.DataFrame(con_mat_norm,
                              index=label_names, 
                              columns=label_names)

    figure = plt.figure(figsize=(10, 10))
    sns.heatmap(con_mat_df, cmap=plt.cm.Blues, annot=True)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
plot_confusion_matrix(X_test, y_test, model)

## Results

In [None]:
y_pred = model.predict(X_test)
y_pred = [np.argmax(i) for i in model.predict(X_test)]

In [None]:
print("precision: " + str(precision_score(y_test, y_pred)))
print("recall: " + str(recall_score(y_test, y_pred)))
print("f1: " + str(f1_score(y_test, y_pred)))
print("accuracy: " + str(model.evaluate(X_test, y_test)[1]*100))