In [None]:
!nvidia-smi

In [None]:
!pip install ekphrasis scikit-learn pandas numpy torch transformers datasets

In [None]:
import random
import os
import json
import pandas as pd
import numpy as np
import torch
from torch import nn
import logging
from datasets import Dataset
from sklearn.utils.class_weight import compute_class_weight
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments

In [None]:
def set_seed(seed_val):
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)


def get_class_weights(train_set):
    return compute_class_weight(
                                    class_weight = 'balanced',
                                    classes = np.unique(train_set['label']),
                                    y = train_set['label']
                                )

class CustomTrainer(Trainer):
    def __init__(self, class_wts, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_wts = class_wts

    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # compute custom loss (suppose one has 3 labels with different weights)
        weight = torch.tensor(self.class_wts).float().cuda()
        loss_fct = nn.CrossEntropyLoss(weight=weight)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss


def train_model(model_name='roberta-base', train_path="./train_set.csv", num_epochs=4, num_labels=4, max_length=128, seed_val=42, batch_size=8, learning_rate=2e-5, weight_decay=1e-8, save_model_path="./models/", use_custom_loss=False):
    
    set_seed(seed_val)
    
    logger = logging.getLogger(__name__)
    logger.info(f'Using seed: {seed_val}')

    train_df = pd.read_csv(train_path)
    train_set = Dataset.from_pandas(train_df)

    logger.info(f'dataset loaded')

    tokenizer = AutoTokenizer.from_pretrained(model_name, model_max_length=max_length)

    logger.info(f'tokenizer loaded')

    def tokenize_function(examples):
        return tokenizer(examples["text"], padding='max_length', truncation=True)

    tokenized_dataset_train = train_set.map(tokenize_function, batched=True)

    logger.info(f'tokenized dataset')

    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels, 
                                                           max_length=max_length)

    logger.info(f'model loaded')

    training_args = TrainingArguments(output_dir="test_trainer",
                                    learning_rate=learning_rate,
                                    weight_decay=weight_decay,
                                    num_train_epochs=num_epochs,
                                    per_device_train_batch_size=batch_size,
                                    seed = seed_val)

    if use_custom_loss:
        class_wts = get_class_weights(train_set)
        trainer = CustomTrainer(class_wts, model=model, args=training_args, train_dataset=tokenized_dataset_train)
    else:
        trainer = Trainer(model=model, args=training_args, train_dataset=tokenized_dataset_train)
    
    logger.info(f'starting training')
    trainer.train()
    logger.info(f'training finished')

    logger.info(f'saving model and tokenizer')
    save_directory = os.path.join(save_model_path, model_name)
    os.makedirs(save_directory, exist_ok=True)
    tokenizer.save_pretrained(save_directory)
    model.save_pretrained(save_directory)

    logger.info(f'saving parameters')
    params = {
        'model_name': model_name,
        'num_epochs': num_epochs,
        'num_labels': num_labels,
        'max_length': max_length,
        'seed_val': seed_val,
        'batch_size': batch_size,
        'learning_rate': learning_rate,
        'weight_decay': weight_decay,
        'use_custom_loss': use_custom_loss
    }

    with open(os.path.join(save_directory, 'params.json'), 'w') as f:
        json.dump(params, f)

In [None]:
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_fmt)
logger = logging.getLogger(__name__)

In [None]:
logger.info('training model')
train_model(save_model_path='./models/')
train_model(train_path = './train_set_hand.csv', save_model_path='./models/hand/')

In [None]:
import os
import json
import pandas as pd
import logging
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TextClassificationPipeline, Trainer, TrainingArguments
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
def predict(model_path="./models/roberta-base", test_path="./test_set.csv"):
    logger = logging.getLogger(__name__)
    logger.info(f'loading model, tokenizer and parameters')
    model, tokenizer, params = load_model(model_path)

    # Ensure that the truth values are python booleans
    if params["use_custom_loss"]:
        params["use_custom_loss"] = json.loads(params["use_custom_loss"].lower())

    logger.info(f'loading test set')
    test_df = pd.read_csv(test_path)

    pipeline = TextClassificationPipeline(model=model, tokenizer=tokenizer, device=0)

    logger.info(f'predicting on test set')
    predictions = pipeline(test_df['text'].tolist(), padding='max_length', truncation=True)

    # Open config.json
    with open(os.path.join(model_path, "config.json")) as f:
        config = json.load(f)
    
    # Get label2id from config.json
    label2id = config["label2id"]

    preds = [label2id[pred['label']] for pred in predictions]

    cf = classification_report(test_df['label'].tolist(), preds, digits=4, output_dict=True)
    print(classification_report(test_df['label'].tolist(), preds, digits=4))

    cm = confusion_matrix(test_df['label'].tolist(), preds)
    print(f'confusion matrix:\n{cm}')


def load_model(path="models/roberta-base/"):
    logger = logging.getLogger(__name__)
    logger.info(f'loading model from {path}')
    model = AutoModelForSequenceClassification.from_pretrained(path, num_labels=4)
    logger.info(f'loading tokenizer from {path}')
    tokenizer = AutoTokenizer.from_pretrained(path)
    logger.info(f'loading parameters from {path}')
    with open(os.path.join(path, "params.json"), "r") as f:
        params = json.load(f)
    return model, tokenizer, params

In [None]:
logger.info('predicting')
predict()
predict(test_path='./test_set_hand.csv')
predict(model_path='./models/hand/roberta-base/')
predict(model_path='./models/hand/roberta-base/', test_path='./test_set_hand.csv')