## Trains a transformers model to classify an email
Recommended setup:
- AWS SageMaker Studio
- Image: Pytorch 2.0.0 Python 3.10 GPU Optimized
- Instance Type: g4dn.xlarge

In [None]:
%%capture
!pip install transformers[torch] datasets evaluate huggingface_hub sentencepiece seqeval accelerate

### Load models and data

In [None]:
import csv
import ast
from transformers import AutoTokenizer, DataCollatorWithPadding
from datasets import Dataset, DatasetDict
import configparser

In [None]:
# Constants
config = configparser.ConfigParser()
config.read('config.ini')
ENCODING = config['global']['ENCODING']

model_checkpoint = "roberta-base"
max_length = 512
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
# Load training data to a dataset

training_data_file = config['classify_emails']['TRAINING_ANNOTATION_FILE']

cols = {
    'a_class': 1,
    'answer': 4,
    'question': 9,
}

label_names = []
label2id = {}
id2label = {}
docs = {"text": [], "label": []}

def read_csv():
    with open(training_data_file, 'r', encoding='utf8') as csvfile:
        datareader = csv.reader(csvfile)
        next(datareader) # skip header row 
        
        for row in datareader:
            if len(row) == 0: continue
            
            ans = row[cols['answer']]
            q = row[cols['question']]
            a_class = row[cols['a_class']]
            
            if ans == 'nan' or q == 'nan' or len(ans) == 0 or len(q) == 0: continue
            
            if a_class == 'nan' or a_class == '':
                a_class = "NONE"
                
            if a_class not in label_names:
                i = len(label_names)
                label_names.append(a_class)
                label2id[a_class] = i
                id2label[i] = a_class
                
            label = label2id[a_class]
            
            docs["text"].append(f">>> Question:\n{q}\n\n>>> Answer:\n{ans}")
            #docs["text"].append(f"{ans}")
            docs["label"].append(label)

read_csv()

def make_dataset_split(docs, test_size=0.15, valid_size=0.15):
    """
    Creates a dataset with train/test/valid split
    """
    dataset = Dataset.from_dict(docs)
    train_testvalid = dataset.train_test_split(test_size = test_size + valid_size)
    test_valid = train_testvalid['test'].train_test_split(test_size = test_size / (test_size + valid_size))

    dataset = DatasetDict({
        'train': train_testvalid['train'],
        'test': test_valid['test'],
        'valid': test_valid['train']})
    
    return dataset

read_csv()
dataset = make_dataset_split(docs)

In [None]:
# Preprocess the dataset

def tokenize(examples):
    return tokenizer(examples, truncation=True, is_split_into_words=False, max_length = max_length, 
                     padding="max_length")

def preprocess(examples):
    tokenized_inputs = tokenize(examples["text"])
    return tokenized_inputs

In [None]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

tokenized_dataset = dataset.map(
    preprocess,
    batched=True,
)

### Train the model

In [None]:
import evaluate
import numpy as np
from transformers import TrainingArguments, Trainer, AutoModelForSequenceClassification

In [None]:
# Set the metrics for training

accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

In [None]:
# Login to huggingface to save the model
from huggingface_hub import interpreter_login

interpreter_login()

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_checkpoint,
    num_labels=len(label_names),
    id2label=id2label,
    label2id=label2id,
)

args = TrainingArguments(
    config['classify_emails']['HF_CLASSIFIER_NAME'],
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=6,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    weight_decay=0.01,
    logging_steps=25,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    push_to_hub=True,
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["valid"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
)
trainer.train()

In [None]:
trainer.push_to_hub()

In [None]:
del model

### Inference
You can test inference with your model below. Call the compare(split, index) function to compare the real and predicted answer on the given test/train/valid split, and index.

In [None]:
from transformers import pipeline

In [None]:
checkpoint = config['classify_emails']['HF_CLASSIFIER_NAME']
max_length = 512
tokenizer = AutoTokenizer.from_pretrained(checkpoint, max_length=max_length)
classifier = pipeline("text-classification", model=checkpoint, tokenizer=tokenizer)

In [None]:
def expected_ans(split, i):
    return id2label[dataset[split]['label'][i]]

def predicted_ans(split, i):
    result = classifier(dataset[split]['text'][i])
    return result[0]['label']

def compare(split, i):
    print(f"Expected: {expected_ans(split, i)}")
    print('\n')
    print(f"Actual: {predicted_ans(split, i)}")
    print('\n\n')

In [None]:
compare('test', 10)