In [None]:
!pip install transformers

In [None]:
from transformers import AlbertForSequenceClassification, AdamW, AutoTokenizer, AlbertForMultipleChoice
import torch
import pandas as pd
from torch.utils.data import TensorDataset, random_split, DataLoader, RandomSampler, SequentialSampler
from sklearn.model_selection import train_test_split
from tqdm import tqdm, trange
import numpy as np

In [None]:
# Set up the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("albert-base-v2")

In [None]:
# Load the labeled data
total_df = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/intent_data/train_data.csv')
total_df = total_df.dropna()
print(total_df.shape)
total_df.head()

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# df = total_df.sample(frac=0.2, replace=True, random_state=1)
# df.shape
# Assuming 'label' is the name of the column containing the labels
# total_df_stratified = total_df.groupby('Label', group_keys=False).apply(lambda x: x.sample(frac=0.2, replace=True, random_state=1))
df = total_df

In [None]:
# Tokenize the input text and convert to PyTorch tensors
inputs = tokenizer(df['Question'].tolist(), padding=True, truncation=True, return_tensors='pt').to('cuda')
labels = torch.tensor(df['Label'].tolist()).to('cuda')
# Create a PyTorch dataset and dataloader
dataset = TensorDataset(inputs['input_ids'], inputs['attention_mask'], labels)

In [None]:
# train_size = int(0.8 * len(dataset))
# val_size = len(dataset) - train_size
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

In [None]:
# load model pretrained
model = AlbertForSequenceClassification.from_pretrained('albert-base-v2').to('cuda')

dataloader = torch.utils.data.DataLoader(dataset, batch_size=32)

# Set up the optimizer and loss function
optimizer = AdamW(model.parameters(), lr=1e-5)
criterion = torch.nn.CrossEntropyLoss()

# Train the model
model.train()
for epoch in range(3):
    running_loss = 0.0
    for i, batch in enumerate(dataloader):
        input_ids, attention_mask, labels = batch
        optimizer.zero_grad()
        outputs = model(input_ids.to('cuda'), attention_mask=attention_mask.to('cuda'), labels=labels.to('cuda'))
        loss = criterion(outputs.logits, labels.to('cuda'))
        loss.backward()
        
        if (i+1) % 4 == 0:
            optimizer.step()
            optimizer.zero_grad()

        running_loss += loss.item()
        
    print(f"Epoch {epoch+1} loss: {running_loss/len(dataloader)}")


In [None]:
import torch

# assuming your trained model is stored in a variable called `model`
# and you want to save it in a file called `my_model.pt`
torch.save(model.state_dict(), 'my_model.pt')

In [None]:
# Load the labeled data
test_df = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/intent_data/test_data.csv')
test_df = test_df.dropna()
print(test_df.shape)
test_df.head()

In [None]:
# df = total_df.sample(frac=0.2, replace=True, random_state=1)
# df.shape

In [None]:
# Tokenize the input text and convert to PyTorch tensors
test_inputs = tokenizer(test_df['Question'].tolist(), padding=True, truncation=True, return_tensors='pt').to('cuda')
test_labels = torch.tensor(test_df['Label'].tolist()).to('cuda')
# Create a PyTorch dataset and dataloader
test_dataset = TensorDataset(test_inputs['input_ids'], test_inputs['attention_mask'], test_labels)

In [None]:
num_samples = len(test_dataset)
print(num_samples)

In [None]:
tensor_shape = test_dataset[0][0].size()
print(tensor_shape)

In [None]:
# from torch.utils.data import DataLoader

# # assuming your trained model is stored in a variable called `model`
# # and your test dataset is stored in a variable called `test_dataset`
# # and you have defined your evaluation criterion as `criterion`

# # put your test dataset in a DataLoader
# batch_size = 32
# test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

# # set your model to evaluation mode
# model.eval()

# # evaluate the model on the test dataset
# test_loss = 0.0
# correct = 0
# total_samples = 0

# # loop over the test data
# with torch.no_grad():
#     for batch in test_dataloader:
#         # unpack the input and target tensors
#         inputs, targets, _ = batch # assuming there are 3 tensors in the batch
        
#         # move the tensors to the same device as the model
#         inputs = inputs.to(device)
#         targets = targets.to(device)

#         # compute the model's predictions
#         outputs = model(inputs)

#         # check the shape of the target tensor
#         print(targets.shape)

#         # reshape the target tensor to a 1D tensor
#         # targets = targets.view(-1)

#         # check the shape of the target tensor again
#         print(outputs.logits.shape)

#         # compute the loss
#         loss = criterion(outputs.logits, targets)

#         # accumulate the total loss for the batch
#         test_loss += loss.item() * inputs.size(0)

#         # compute the number of correct predictions
#         _, predicted = torch.max(outputs, 1)
#         correct += (predicted == targets).sum().item()

# # compute the average loss and accuracy
# test_loss /= len(test_dataset)
# accuracy = correct / len(test_dataset)

# # print the results
# print(f'Test Loss: {test_loss:.4f} Accuracy: {accuracy:.4f}')


In [None]:
print("Creation of the results' folder...")
!mkdir results

import csv

def get_probs_from_logits(logits):
    """
    Converts a tensor of logits into an array of probabilities by applying the sigmoid function
    """
    probs = torch.sigmoid(logits.unsqueeze(-1))
    return probs.detach().cpu().numpy()

def test_prediction(net, device, dataloader, with_labels=True, result_file="results/output.csv"):
    """
    Predict the probabilities on a dataset with or without labels and print the result in a file
    """
    net.eval()
    header = ["probability"]
    with open(result_file, mode="w", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(header)
        with torch.no_grad():
            if with_labels:
                for input_ids, attn_mask, labels in tqdm(dataloader):
                    input_ids, attn_mask, labels = input_ids.to(device), attn_mask.to(device), labels.to(device)
                    outputs = net(input_ids, attention_mask=attn_mask, labels=labels)
                    loss = criterion(outputs.logits, labels)
                    probs = get_probs_from_logits(outputs.logits.squeeze(-1)).squeeze(-1)
                    for prob in probs.tolist():
                        writer.writerow([prob])
            else:
                for input_ids, attn_mask in tqdm(dataloader):
                    input_ids, attn_mask = input_ids.to(device), attn_mask.to(device)
                    logits = net(input_ids, attn_mask)
                    probs = get_probs_from_logits(logits.squeeze(-1)).squeeze(-1)
                    for prob in probs.tolist():
                        writer.writerow([prob])

In [None]:
path_to_model = '/content/my_model.pt'  
# path_to_model = '/content/models/...'  # You can add here your trained model

path_to_output_file = 'results/output.csv'

print("Reading test data...")
# test_set = CustomDataset(df_test, maxlen, bert_model)
batch_size = 32
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)
# test_loader = DataLoader(test_set, batch_size=bs, num_workers=5)

# model = AlbertForSequenceClassification.from_pretrained('albert-base-v2').to('cuda')
# if torch.cuda.device_count() > 1:  # if multiple GPUs
#     print("Let's use", torch.cuda.device_count(), "GPUs!")
#     model = nn.DataParallel(model)

print()
print("Loading the weights of the model...")
model.load_state_dict(torch.load(path_to_model))
model.to(device)

print("Predicting on test data...")
test_prediction(net=model, device=device, dataloader=test_dataloader, with_labels=True,  # set the with_labels parameter to False if your want to get predictions on a dataset without labels
                result_file=path_to_output_file)
print()
print("Predictions are available in : {}".format(path_to_output_file))

In [None]:
!pip install datasets

In [None]:
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import pandas as pd
import numpy as np


test_df = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/intent_data/test_data.csv')
test_df = test_df.dropna()
print(test_df.shape)

# Load the true labels from the test dataframe
labels_test = test_df['Label']

# Load the predicted probabilities from the output.csv file
probs_test = pd.read_csv('/content/results/output.csv')

In [None]:
probs_test

In [None]:
import ast
# Convert the probability string to a list of probabilities
probs_test['probability'] = probs_test['probability'].apply(lambda x: ast.literal_eval(x))
probs_test.head()

In [None]:
# Get index with the highest probability
probs_test['max_index'] = probs_test['probability'].apply(lambda x: x.index(max(x)))

In [None]:
probs_test['max_index'].unique()

In [None]:
# probs_test.drop(columns=['probability'], inplace=True)
probs_test.to_csv('/content/drive/MyDrive/NLP_project/dataset/results/outputs.csv', index=False)

In [None]:
# for line in probs_test:
#     # Remove the square brackets and split the line into a list of strings
#     probs_str = line.strip()[1:-1].split(', ')
#     print("probs_str: ", probs_str)
#     # Convert the list of strings to a list of floats
#     probs = [float(p) for p in probs_str]
#     print("probs: ", probs)
#     # Append the list of probabilities to the main list
#     # probs_list.append(probs)
#     break;

# # Convert the list of lists to a numpy array
# # probs_np = np.array(probs_list)

In [None]:
# # Determine the predicted class label for each prediction
# preds_test = [np.argmax(prob) for prob in probs_test]
# print(len(preds_test))

# # Calculate accuracy
# accuracy = accuracy_score(labels_test, preds_test)

# # Calculate f1 score
# f1 = f1_score(labels_test, preds_test)

# # Calculate confusion matrix
# cm = confusion_matrix(labels_test, preds_test)

# print("Accuracy:", accuracy)
# print("F1 score:", f1)
# print("Confusion matrix:")
# print(cm)

In [None]:
!pip install datasets

In [None]:
from datasets import load_metric

path_to_output_file = 'results/output.csv'  # path to the file with prediction probabilities

labels_test = test_df['Label']  # true labels
preds_test = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/results/outputs.csv')

# probs_test = pd.read_csv(path_to_output_file, header=None)[0]  # prediction probabilities
# threshold = 0.5   # you can adjust this threshold for your own dataset
# preds_test=(probs_test.astype('float')>=threshold).astype('uint8') # predicted labels using the above fixed threshold

preds_test.head()

In [None]:
print(labels_test.shape)
print(preds_test.shape)

In [None]:
labels_test = labels_test.to_numpy().reshape(-1, 1)
print(labels_test.shape)

In [None]:
preds_test = preds_test['max_index'].to_numpy().reshape(-1, 1)
print(preds_test.shape)

In [None]:
from datasets import load_metric
metric = load_metric("glue", "mrpc")
# Compute the accuracy and F1 scores
metric._compute(predictions=preds_test, references=labels_test)

In [None]:
# Determine the predicted class label for each prediction
# Calculate accuracy
accuracy = accuracy_score(labels_test, preds_test)

# Calculate f1 score
f1 = f1_score(labels_test, preds_test)

# Calculate confusion matrix
cm = confusion_matrix(labels_test, preds_test)

print("Accuracy:", accuracy)
print("F1 score:", f1)
print("Confusion matrix:")
print(cm)

In [None]:
from datasets import load_metric

test_data = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/intent_data/test_data.csv')
preds_test = pd.read_csv('/content/drive/MyDrive/NLP_project/dataset/results/outputs.csv')

combined_test_pred = pd.concat([test_data, preds_test], axis=1)

In [None]:
combined_test_pred = combined_test_pred.rename(columns={'max_index': 'prediction'})

In [None]:
combined_test_pred.to_csv('/content/drive/MyDrive/NLP_project/dataset/results/prediction_actual.csv')