In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

drive_path = "/content/drive/MyDrive/Dissertation/"

In [None]:
!pip install datasets transformers sentencepiece
!pip install transformers[torch]
#!pip install sacremoses

In [None]:
import transformers

print(transformers.__version__)

In [None]:
# Define the list of labels
labels = ["Chat", "Contacts", "Aggregation", "Accessibility", "Consistency",  "Authentication" ]

# Create label2id dictionary
label2id = {label: idx for idx, label in enumerate(labels)}

# Create id2label dictionary
id2label = {idx: label for label, idx in label2id.items()}

print("label2id:", label2id)
print("id2label:", id2label)

In [None]:
import pandas as pd

df = pd.read_csv(drive_path + 'ibm_labelled_user_stories.csv')

# Map the labels to their corresponding IDs
df['label'] = df['label'].map(label2id)
df['label'].value_counts()

In [None]:
!pip install contractions

In [None]:
# import library
import contractions

def expand_contractions(text):
  # creating an empty list
  expanded_words = []
  for word in text.split():
    # using contractions.fix to expand the shortened words
    expanded_words.append(contractions.fix(word))

  expanded_text = ' '.join(expanded_words)
  return expanded_text

df['text'] = df['text'].apply(expand_contractions)

In [None]:
import string as string
import re
df['text'] = df['text'].apply(lambda x: re.sub('[%s]' % re.escape(string.punctuation), '' , x))
df['text'] = df['text'].apply(lambda x: re.sub('W*dw*','',x))

In [None]:
#remove stopwords

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
stop_words.add('subject')
stop_words.add('http')

def remove_stopwords(text):
    return " ".join([word for word in str(text).split() if word not in stop_words])

df['text'] = df['text'].apply(lambda x: remove_stopwords(x))

In [None]:
# Replace email addresses
df['text'] = df['text'].apply(lambda x: re.sub(r'\b[\w.-]+?@\w+?\.\w{2,4}\b', 'emailadd', x))

# Replace URLs
df['text'] = df['text'].apply(lambda x: re.sub(r'(http[s]?://\S+)|(\w+\.[A-Za-z]{2,4}\S*)', 'urladd', x))

In [None]:
nltk.download('wordnet')

from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
def lemmatize_words(text):
    return " ".join([lemmatizer.lemmatize(word) for word in text.split()])
df["text"] = df["text"].apply(lambda text: lemmatize_words(text))

In [None]:
df["text"] = df["text"].apply(lambda text: re.sub(' +', ' ', text))

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict

# Assuming df is your DataFrame with columns ['sentence', 'label', 'idx']

# Splitting the data
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'], random_state=42)
test_df, eval_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)

# Convert DataFrames to Dataset format
train_dataset = Dataset.from_pandas(train_df)
test_dataset = Dataset.from_pandas(test_df)
eval_dataset = Dataset.from_pandas(eval_df)

# Create DatasetDict
user_story_dataset = DatasetDict({
    'train': train_dataset,
    'validation': eval_dataset,
    'test': test_dataset
})

user_story_dataset

# SELECT Model

In [None]:
from transformers import AutoTokenizer

model_checkpoint = drive_path + 'BERT4RE/'

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)

In [None]:
#tokenizer.add_special_tokens({'pad_token': '[PAD]'})

In [None]:
tokenizer.vocab_size

In [None]:
tokenizer.special_tokens_map

In [None]:
def tokenize_text(user_stories):
    return tokenizer(user_stories["text"], truncation=True,padding='max_length', max_length=128)

In [None]:
tokenized_dataset = user_story_dataset.map(tokenize_text, batched=True)
tokenized_dataset

In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

y = train_df['label']
classes = np.unique(y)
weights = compute_class_weight('balanced', classes=classes, y=y)
class_weights = {k: v for k, v in zip(classes, weights)}

# Replace the keys
new_class_weights = {id2label[key]: value for key, value in class_weights.items()}

print("Class Weights:", new_class_weights)

In [None]:
import torch

class_weights_tensor = torch.tensor(weights, dtype=torch.float)

In [None]:
from torch import nn
from transformers import Trainer


class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Move class_weights_tensor to the model's device
        class_weights_tensor = torch.tensor(weights, dtype=torch.float).to(model.device)

        # compute custom loss
        loss_fct = nn.CrossEntropyLoss(weight=class_weights_tensor)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

In [None]:
# from transformers import DataCollatorWithPadding
# data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
from transformers import AutoModelForSequenceClassification, AutoConfig,DistilBertConfig,TFDistilBertModel

num_labels = 6

model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=num_labels, label2id=label2id, id2label=id2label,ignore_mismatched_sizes=True)

In [None]:
from transformers import TrainingArguments
from transformers import EarlyStoppingCallback, IntervalStrategy


model_name = model_checkpoint.split("/")[-1]
batch_size = 16
num_train_epochs = 50
logging_steps = len(tokenized_dataset["train"]) // (batch_size * num_train_epochs)

args = TrainingArguments(
    output_dir=f"{model_name}-finetuned-user-story",
    evaluation_strategy= IntervalStrategy.STEPS, #"epoch"
    #save_strategy='steps' #"epoch",
    load_best_model_at_end=True,
    eval_steps = 50,
    learning_rate=2e-05,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_train_epochs,

    #weight_decay=0.01,
    logging_steps=logging_steps,
    metric_for_best_model="f1",
    save_total_limit=5,
)

In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, recall_score,precision_score
from datasets import load_metric

def compute_metrics(eval_pred):
  predictions, labels = eval_pred
  predictions = np.argmax(predictions, axis=1)

  acc = accuracy_score(labels, predictions)
  f1_macro = f1_score(labels, predictions, average='macro')

  return {
      "Accuracy": acc,
      "f1": f1_macro,
  }


def compute_all_metrics(p):
  pred, labels = p
  pred = np.argmax(pred, axis=1)
  accuracy = accuracy_score(y_true=labels, y_pred=pred)
  recall = recall_score(y_true=labels, y_pred=pred,average='macro')
  precision = precision_score(y_true=labels, y_pred=pred,average='macro')
  f1 = f1_score(y_true=labels, y_pred=pred,average='macro')

  return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

In [None]:
CUDA_LAUNCH_BLOCKING=1
TORCH_USE_CUDA_DSA=1

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ['TORCH_USE_CUDA_DSA'] = "1"

In [None]:
from transformers import Trainer

trainer = CustomTrainer(
    model,
    args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_all_metrics,
    callbacks = [EarlyStoppingCallback(early_stopping_patience=3)]

    #data_collator=data_collator
)

In [None]:
trainer.train()

In [None]:
validation_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])
print(validation_results)

## Hyperparameter search

The `Trainer` supports hyperparameter search using [optuna](https://optuna.org/) or [Ray Tune](https://docs.ray.io/en/latest/tune/). For this last section you will need either of those libraries installed, just uncomment the line you want on the next cell and run it.

In [None]:
!pip install optuna

During hyperparameter search, the `Trainer` will run several trainings, so it needs to have the model defined via a function (so it can be reinitialized at each new run) instead of just having it passed. We jsut use the same function as before:

In [None]:
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=num_labels, label2id=label2id, id2label=id2label)

And we can instantiate our `Trainer` like before:

In [None]:
trainer = Trainer(
    model_init=model_init,
    args=args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset['validation'],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

The method we call this time is `hyperparameter_search`. Note that it can take a long time to run on the full dataset for some of the tasks. You can try to find some good hyperparameter on a portion of the training dataset by replacing the `train_dataset` line above by:
```python
train_dataset = encoded_dataset["train"].shard(index=1, num_shards=10)
```
for 1/10th of the dataset. Then you can run a full training on the best hyperparameters picked by the search.

In [None]:
def optuna_hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
        "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [16, 32, 64, 128]),
    }

In [None]:
# best_run = trainer.hyperparameter_search(n_trials=5, direction="maximize")
best_trial = trainer.hyperparameter_search(
    direction="maximize",
    backend="optuna",
    hp_space=optuna_hp_space,
    n_trials=9,
)

The `hyperparameter_search` method returns a `BestRun` objects, which contains the value of the objective maximized (by default the sum of all metrics) and the hyperparameters it used for that run.

In [None]:
#best_run
best_trial

You can customize the objective to maximize by passing along a `compute_objective` function to the `hyperparameter_search` method, and you can customize the search space by passing a `hp_space` argument to `hyperparameter_search`. See this [forum post](https://discuss.huggingface.co/t/using-hyperparameter-search-in-trainer/785/10) for some examples.

To reproduce the best training, just set the hyperparameters in your `TrainingArgument` before creating a `Trainer`:

In [None]:
for n, v in best_trial.hyperparameters.items():
  setattr(trainer.args, n, v)

trainer.train()

In [None]:
validation_results = trainer.evaluate()
print(validation_results)
