In [None]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

from transformers import AutoConfig, AutoModel, AutoTokenizer, RobertaTokenizer

from tqdm import tqdm

from sklearn.preprocessing import MultiLabelBinarizer, LabelEncoder
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score, roc_curve, auc

from collections import defaultdict

import ast

In [None]:
state = torch.load('/content/model.pt') # "Please input the path to the saved model."
# , map_location=torch.device('cpu')

In [None]:
model_state_dict = {}
tag_state_dict = {}
rating_state_dict = {}

In [None]:
for k, v in state.items():
    if "model." in k:
        name = k[6:]
        model_state_dict[name] = v
    if "tags_classifier." in k:
        name = k[len("tags_classifier."):]
        tag_state_dict[name] = v
    if "ratings_classifier." in k:
        name = k[len("ratings_classifier."):]
        rating_state_dict[name] = v

In [None]:
AMT10 = [
    'implementation',
    'dp',
    'math',
    'greedy',
    'data structures',
    'brute force',
    'geometry',
    'constructive algorithms',
    'dfs and similar',
    'strings'
]

In [None]:
model_config = AutoConfig.from_pretrained("google/bigbird-roberta-base", max_position_embeddings=1024)
model_config

In [None]:
config = {
    'seed' : 42,
    'tags' : AMT10,
    'batchSize' : 4,
    'lr' : 5e-6,
    'trainMaxLength' : 1024,
    'testMaxLength' : 1024,
    'numEpochs' : 200,
    'model' : AutoModel.from_config(model_config),
    'tokenizer' : RobertaTokenizer.from_pretrained('roberta-base'),
    'gradient_accumulation_steps' : 4,
    'max_grad_norm' : 1.0,
    'lambda' : 10,
    'save' : True,
}

In [None]:
# Define a class for multi-label classification head
class MultiLabelClassificationHead(nn.Module):
    def __init__(self, num_labels, hidden_size=768):
        super().__init__()
        self.fc = nn.Linear(hidden_size, num_labels)  # Fully connected layer
        self.sigmoid = nn.Sigmoid()  # Sigmoid activation function

    def forward(self, x):
        x = self.fc(x)  # Apply the fully connected layer
        x = self.sigmoid(x)  # Apply the sigmoid activation
        return x

# Define a class for multi-class classification head
class MultiClassClassificationHead(nn.Module):
    def __init__(self, num_labels, hidden_size=768):
        super().__init__()
        self.fc = nn.Linear(hidden_size, num_labels)  # Fully connected layer

    def forward(self, x):
        x = self.fc(x)  # Apply the fully connected layer
        return x

In [None]:
model = config['model']
tag_head = MultiLabelClassificationHead(10)
rating_head = MultiClassClassificationHead(28)

In [None]:
model.load_state_dict(model_state_dict)
tag_head.load_state_dict(tag_state_dict)
rating_head.load_state_dict(rating_state_dict)
print('fin')

In [None]:
# Set the device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the chosen device
model.to(device)
tag_head.to(device)
rating_head.to(device)
print('device : ', device)

In [None]:
train_df = pd.read_csv('/content/AMT10_train.csv', index_col=0, encoding='utf8')
test_df = pd.read_csv('/content/AMT10_test.csv', index_col=0, encoding='utf8')

# test_df = test_df[:100]
# pred_df = pred_df[:100]

In [None]:
new_train_idx = []  # List to store new indices
selected_train_tags = []  # List to store selected tags

# Iterate through the DataFrame indices
for index in train_df.index:
    check = 0
    t = []  # List to store selected tags for this index

    # Iterate through the tags for the current index
    for tag in ast.literal_eval(train_df.loc[index]['tags']):
        if tag in config['tags']:
            check = 1
            t.append(tag)

    # If at least one tag is in the desired tags list, append the index and selected tags
    if check == 1:
        selected_train_tags.append(t)
        new_train_idx.append(index)

print(len(new_train_idx))  # Print the length of the new index list

In [None]:
new_valid_idx = []  # List to store new indices
selected_valid_tags = []  # List to store selected tags

# Iterate through the DataFrame indices
for index in test_df.index:
    check = 0
    t = []  # List to store selected tags for this index

    # Iterate through the tags for the current index
    for tag in ast.literal_eval(test_df.loc[index]['tags']):
        if tag in config['tags']:
            check = 1
            t.append(tag)

    # If at least one tag is in the desired tags list, append the index and selected tags
    if check == 1:
        selected_valid_tags.append(t)
        new_valid_idx.append(index)

print(len(new_valid_idx))  # Print the length of the new index list

In [None]:
train_df = train_df.loc[new_train_idx]
train_df['tags'] = selected_train_tags

y_tags_train = train_df['tags']
y_ratings_train = train_df['rating'].astype(int)

test_df = test_df.loc[new_valid_idx]
test_df['tags'] = selected_valid_tags

X_test = test_df['description']
y_tags_test = test_df['tags']
y_ratings_test = test_df['rating'].astype(int)

In [None]:
# Create an instance of the MultiLabelBinarizer
tag_label_encoder = MultiLabelBinarizer()
rating_label_encoder = LabelEncoder()

# Fit the label encoder on the labels and transform them
y_tags_train = tag_label_encoder.fit_transform(y_tags_train)
y_tags_test = tag_label_encoder.transform(y_tags_test)

y_ratings_train = rating_label_encoder.fit_transform(y_ratings_train)
y_ratings_test = rating_label_encoder.transform(y_ratings_test)

In [None]:
def tokenizing(tokenizer, data, max_length):
    # Tokenize and encode the text input
    data = list(data.values)
    tokenized_data = tokenizer(data, padding=True, truncation=True, return_tensors='pt', max_length=max_length)

    return tokenized_data

In [None]:
tokenizer = config['tokenizer']
tokenized_inputs_test = tokenizing(tokenizer, X_test, config['testMaxLength'])

In [None]:
def convert_to_tensor(data, dtype):
    # Convert data to tensors
    tensor_data = torch.tensor(data, dtype=dtype)
    return tensor_data

In [None]:
tags_labels_test = convert_to_tensor(y_tags_test, dtype=torch.float)
ratings_labels_test = convert_to_tensor(y_ratings_test, dtype=torch.long)

In [None]:
test_dataset = TensorDataset(tokenized_inputs_test['input_ids'], tokenized_inputs_test['attention_mask'], tags_labels_test, ratings_labels_test)
test_dataloader = DataLoader(test_dataset, batch_size=config['batchSize'], shuffle=False, num_workers=8, pin_memory=True)

In [None]:
model.eval()
tag_head.eval()
rating_head.eval()
with torch.no_grad():

    thresholds = [0.001] + [i * 0.01 for i in range(1, 101)]
    tags_true = []
    tags_pred = defaultdict(list)
    tags_pred_proba = []

    ratings_true = []
    ratings_pred = []
    for batch in tqdm(test_dataloader):
        ## Unpack the batch
        input_ids, attention_mask, tags_labels, ratings_labels = batch

        # Move the inputs and labels to the chosen device
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        tags_labels = tags_labels.to(device)
        ratings_labels = ratings_labels.to(device)

        # Forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output

        tags_output = tag_head(pooled_output)
        ratings_output = rating_head(pooled_output)

        # tags
        tags_true.extend([torch.nonzero(row).flatten().tolist() for row in tags_labels.detach().cpu().clone()])
        tags_pred_proba.extend(tags_output.detach().cpu().clone().tolist())

        ratings_pred.extend(torch.argmax(ratings_output, dim=1).detach().cpu().clone())
        ratings_true.extend(ratings_labels.detach().cpu().clone())

        # Extract indices with values greater than or equal to the threshold.
        for threshold in thresholds:
            tags_pred[threshold].extend([(row >= threshold).nonzero().flatten().tolist() for row in tags_output.detach().cpu().clone()])

    rating_true = [tensor.detach().cpu().clone().item() for tensor in ratings_true]
    rating_pred = [tensor.detach().cpu().clone().item() for tensor in ratings_pred]

    revise_rating_pred = []

    for i in range(len(rating_pred)):
        if abs(rating_true[i] - rating_pred[i]) <= 1:
            revise_rating_pred.append(rating_true[i])
        else:
            revise_rating_pred.append(rating_pred[i])

    rating_pred = revise_rating_pred

    tag_true = []

    for index_list in tags_true:
        result_true = [0] * 10  # Create a list of length num_classes.
        for index in index_list:
            result_true[index] = 1  # Fill the corresponding index with 1.

        tag_true.append(result_true)

    tag_true = np.array(tag_true)
    tags_pred_proba = np.array(tags_pred_proba)

    thr = 0
    max_f1_score = 0

    for threshold in thresholds:
        tag_pred = []
        for index_list in tags_pred[threshold]:
            result_pred = [0] * 10 # Create a list of length num_classes.
            for index in index_list:
                result_pred[index] = 1  # Fill the corresponding index with 1.

            tag_pred.append(result_pred)

        f1 = f1_score(tag_true, tag_pred, average='macro', zero_division=0)
        if max_f1_score < f1:
            thr = threshold
            max_f1_score = f1

    fpr = dict()
    tpr = dict()

    # Plot ROC curve for each classifier
    plt.figure()
    for num_classes in range(10):
        tt, tp = tag_true[:, num_classes], tags_pred_proba[:, num_classes]

        score = roc_auc_score(tt, tp)
        tag = tag_label_encoder.classes_[num_classes]
        print(f"{tag} : {score}")
        fpr[num_classes], tpr[num_classes], _ = roc_curve(tt, tp)
        plt.plot(fpr[num_classes], tpr[num_classes], label=f'{tag}(area={score:.2f})')
    print()

    print("tag_roc_auc_score : ", roc_auc_score(tag_true, tags_pred_proba))
    print("f1_score : ", max_f1_score)
    print("threshold : ", thr)

    rating_acc = accuracy_score(rating_true, rating_pred)
    print(f"rating_acc : {rating_acc}")

    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Tag Prediction')
    plt.legend(loc="lower right")
    plt.show()