# Predict labels for test dataset

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import csv
import numpy as np
from transformers import *
from torch.utils.data import DataLoader
import torch.nn.functional as F

# Create the BertClassfier class
class BertClassifier(nn.Module):
    """Bert Model for Classification Tasks.
    """

    def __init__(self, model_name,hidden_layers, freeze_bert=False):
        """
        @param    bert: a BertModel object
        @param    classifier: a torch.nn.Module classifier
        @param    freeze_bert (bool): Set `False` to fine-tune the BERT model
        """
        super(BertClassifier, self).__init__()
        # Specify hidden size of BERT, hidden size of our classifier, and number of labels
        D_in, H, D_out = 768, 50, 4

        # Instantiate BERT model
        bert_base_cased = BertModel.from_pretrained(model_name)  # Instantiate model using the trained weights
        config = BertConfig.from_pretrained(model_name)
        if hidden_layers != 12:
            config.num_hidden_layers = hidden_layers
        self.bert = BertModel.from_pretrained(model_name, config=config)

        # Instantiate an one-layer feed-forward classifier
        self.classifier = nn.Sequential(
            nn.Linear(D_in, H),
            nn.ReLU(),
            nn.Linear(H, D_out)
        )

        # Freeze the BERT model
        if freeze_bert:
            for param in self.bert.parameters():
                param.requires_grad = False

    def forward(self, input_ids, attention_mask):
        """
        Feed input to BERT and the classifier to compute logits.
        @param    input_ids (torch.Tensor): an input tensor with shape (batch_size,
                      max_length)
        @param    attention_mask (torch.Tensor): a tensor that hold attention mask
                      information with shape (batch_size, max_length)
        @return   logits (torch.Tensor): an output tensor with shape (batch_size,
                      num_labels)
        """
        # Feed input to BERT
        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask)

        # Extract the last hidden state of the token `[CLS]` for classification task
        last_hidden_state_cls = outputs[0][:, 0, :]

        # Feed input to classifier to compute logits
        logits = self.classifier(last_hidden_state_cls)

        return logits

# create class for dataset loader
class bertDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# load test dataset
def load_dataset(filepath):
    with open(filepath, 'r') as f:
        reader = csv.reader(f, delimiter=",")
        data = np.array(list(reader))
        return data

# load bert model with given number of hidden layers
def load_model(model_path, num_hidden_layer):
    print("Loading Bert model: ",model_path)
    if 'TODBERT' in model_path:
        model_name = "TODBERT/TOD-BERT-JNT-V1"
    else:
        model_name = "bert-base-uncased"
        
    the_model = BertClassifier(model_name,num_hidden_layer, freeze_bert=True)
    the_model.load_state_dict(torch.load(model_path))
    return the_model

# create dataloader for the test dataset
def get_test_dataloader(model_path,filepath):
    if 'TODBERT' in model_path:
        model_name = "TODBERT/TOD-BERT-JNT-V1"
    else:
        model_name = "bert-base-uncased"
    folder = "sentence"
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    data = load_dataset(filepath)
    print(filepath, len(data))
    test_texts = list(data[:, 0])
    test_labels = list(data[:, 1].astype(int))
    print("Creating Test dataset loader..")
    test_encodings = tokenizer(test_texts, truncation=True, padding=True)

    test_dataset = bertDataset(test_encodings, test_labels)

    test_loader = DataLoader(test_dataset, batch_size=16)

    del data, test_texts, test_encodings

    return test_loader

# predict label for the test dataset using the given model
def get_predictions(model,test_loader):
    print("Getting predictions..")
    model.eval()
    
    y_true = []
    y_pred = []
    
    # For each batch in our test set...
    for batch in test_loader:
        # Load batch to GPU
        b_input_ids = batch["input_ids"]
        b_input_mask = batch["attention_mask"]
        b_labels = batch["labels"]
        y_true.extend(b_labels.numpy())

        # Compute logits
        logits = model(b_input_ids, attention_mask=b_input_mask)
        probs = F.softmax(logits, dim=1).cpu().detach().numpy()
        pred = np.argmax(probs, axis=1)
        y_pred.extend(pred)

    d = {"y_true": y_true, "y_pred": y_pred}
    df =pd.DataFrame(data=d)

    return df


In [None]:
import os
model_paths = ["Results/bert-base-uncased_2_test.torch",
               "Results/bert-base-uncased_6_test.torch",
               "Results/bert-base-uncased_12_test.torch",
               "Results/bert-base-uncased_14_test.torch",
               "Results/bert-base-uncased_18_test.torch",
               "Results/TODBERT/TOD-BERT-JNT-V1_2_test.torch",
               "Results/TODBERT/TOD-BERT-JNT-V1_6_test.torch",
               "Results/TODBERT/TOD-BERT-JNT-V1_12_test.torch",
               "Results/TODBERT/TOD-BERT-JNT-V1_14_test.torch", 
               "Results/TODBERT/TOD-BERT-JNT-V1_18_test.torch",
               "Results/bert-base-uncased_12freeze_test.torch",
               "Results/bert-base-uncased_12freeze6_test.torch"]
num_hidden_layers = [2,6,12,14,18,2,6,12,14,18,12,12]

for i in range(len(model_paths)):
    #load model
    model = load_model(model_paths[i], num_hidden_layers[i])

    # predict for all samples of test datset
    for sample_num in range(20):
        filepath = 'capstone/dataset/sentence/sample_data'+str(sample_num)+'.csv'
        test_loader = get_test_dataloader(model_paths[i],filepath)
        pred_df = get_predictions(model,test_loader)
        
        # save predictions in csv
        if not os.path.exists(model_paths[i][:-6]):
            os.makedirs(model_paths[i][:-6])
        pred_df.to_csv(model_paths[i][:-6]+'/sample_result'+str(sample_num)+'.csv',index=False,header=False)
