In [None]:
!pip install transformers

In [None]:
import gym
import numpy as np
from gym import spaces
from transformers import BertTokenizerFast
import pandas as pd
import torch
from transformers import BertTokenizerFast, DistilBertForSequenceClassification
from torch.distributions import Categorical
from torch.optim import Adam
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:
class LabelingEnv(gym.Env):
  def __init__(self, instances, labels):
    super(LabelingEnv, self).__init__()
    self.instances = instances
    self.labels = labels
    self.current_instance = 0
    self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
    encoded = self.tokenizer([self.instances[self.current_instance]], return_tensors='pt', padding='max_length', truncation=True, max_length=64)

    #define the output of the model
    self.action_space = spaces.Discrete(2)
    self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(1, 64))

  def step(self, action):
    reward = 1 if action == self.labels[self.current_instance] else -1
    self.current_instance += 1
    done = self.current_instance == len(self.instances)
    if done:
      next_state = None
    else:
        encoded = self.tokenizer([self.instances[self.current_instance]], return_tensors='pt', padding='max_length', truncation=True, max_length=64)
        next_state = { 'input_ids': encoded['input_ids'], 'attention_mask': encoded['attention_mask'] }
    return next_state, reward, done


  def reset(self):
    self.current_instance = 0
    encoded = self.tokenizer([self.instances[self.current_instance]], return_tensors='pt', padding='max_length', truncation=True, max_length=64)
    return { 'input_ids': encoded['input_ids'], 'attention_mask': encoded['attention_mask'] }

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#loading model from BERT
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased') # 2 labels: Slang, No Slang
tokenizer = BertTokenizerFast.from_pretrained('distilbert-base-uncased')

#set up an optimizer
optimizer = Adam(model.parameters(), lr=1e-5)

df = pd.read_csv('/content/drive/MyDrive/BERT Models/Dataset/mergedData.csv') #the file directory
df.drop_duplicates(subset = ['sentence'], inplace = True)

instances = df['sentence'].tolist()
labels = df['label'].tolist()

#custom envinronment
env = LabelingEnv(instances, labels)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
n = 50 #number of epochs
model.train()
all_rewards = []
for epoch in tqdm(range(n), desc = 'Epochs'):
  state = env.reset()
  done = False
  epoch_rewards = []
  pbar = tqdm(total=len(env.instances), desc=f'Epoch {epoch + 1}', leave=False)
  while not done:
    if state is not None:
      state = {k: v.to(device) for k, v in state.items()}
      outputs = model(**state)

      #softmax for model output
      probs = torch.nn.functional.softmax(outputs.logits, dim=-1)

      #sampling action from the probabilities
      dist = Categorical(probs[0])
      action = dist.sample()

      #train in the environment
      new_state, reward, done = env.step(action.item())
      epoch_rewards.append(reward)

      loss = -dist.log_prob(action) * reward

      #backpropagation
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

      #updating the state
      state = new_state if new_state is not None else None

      pbar.update(1)
    else:
      break
  pbar.close()
  all_rewards.append(np.sum(epoch_rewards))
  print(f'\nEpoch {epoch + 1}: Total rewards {np.sum(epoch_rewards)}')

In [None]:
def moving_avg(a, n = 10):
  temp = np.cumsum(a, dtype = float)
  temp[n:] = temp[n:] - temp[:-n]
  return temp[n - 1:] / n

moving_avg_rewards = moving_avg(all_rewards, 3)

plt.figure(figsize=(10,5))
plt.title("Total Rewards per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Total Rewards")
plt.plot(all_rewards, label='Rewards')
plt.plot(range(len(moving_avg_rewards)), moving_avg_rewards, label='Moving Average Rewards')
plt.legend()
plt.show()

In [None]:
model.save_pretrained('/content/drive/MyDrive/BERT Models/BERT RL/model')
tokenizer.save_pretrained('/content/drive/MyDrive/BERT Models/BERT RL/tokenizer')
import pickle

with open("/content/drive/MyDrive/BERT Models/BERT RL/instances", "wb") as f:
    pickle.dump(instances, f)

with open("/content/drive/MyDrive/BERT Models/BERT RL/labels", "wb") as f:
    pickle.dump(labels, f)


In [None]:
from transformers import BertTokenizerFast, DistilBertForSequenceClassification
from sklearn.metrics import accuracy_score
import pandas as pd
import torch

model_dir = '/content/drive/MyDrive/BERT Models/BERT RL/model'
tokenizer_dir = '/content/drive/MyDrive/BERT Models/BERT RL/tokenizer'

model = DistilBertForSequenceClassification.from_pretrained(model_dir)
tokenizer = BertTokenizerFast.from_pretrained(tokenizer_dir)

model.eval()

data = pd.read_csv('/content/drive/MyDrive/BERT Models/Dataset/mergedDataTest.csv', delimiter = ';')

#testing and validation datas
instances = list(data['sentence'])
true_labels = list(data['label'])

inputs = tokenizer(instances, return_tensors='pt', padding='max_length', truncation=True, max_length=64)
inputs = {key: val for key, val in inputs.items() if key != 'token_type_ids'}

#predicting label
with torch.no_grad():
    outputs = model(**inputs)

probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
predicted_labels = torch.argmax(probs, dim=-1)
confidence_scores, _ = torch.max(probs, dim=-1)
confidence_scores = confidence_scores.tolist()

accuracy = accuracy_score(true_labels, predicted_labels.tolist())

In [None]:
for i, instance in enumerate(instances):
  prediction = 'Slang' if predicted_labels[i].item() == 1 else 'No Slang'
  is_correct = "Correct" if predicted_labels[i].item() == true_labels[i] else "Incorrect"
  print(f"Text: {instance}")
  print(f"Predicted label: {prediction}")
  print(f"Prediction is: {is_correct}")
  print(f"Confidence score: {confidence_scores[i]}")  # no need for .item() here
  print("\n")

print(f"Accuracy: {accuracy*100:.2f}%")