In [None]:
from sklearn.preprocessing import LabelEncoder
def make_labels_from_extracted_emb(train_df, test_df):
    """
    Encode training and testing set labels

    Args:
        train_df (Dataframe): training corpus features where the last column is the label
        test_df (Dataframe): testing corpus features where the last column is the label

    Returns:
        label_encoded_train (Array[Int]): encoded labels for training set
        label_encoded_test (Array[Int]): encoded labels for testing set
    """
    no_feats = len(train_df.columns)-1
    lEnc = LabelEncoder()
    lEnc.fit(train_df.iloc[:,no_feats])
    label_encoded_train = lEnc.transform(train_df.iloc[:,no_feats])
    label_encoded_test = lEnc.transform(test_dfiloc[:,no_feats])
    return label_encoded_train, label_encoded_test

In [None]:
!pip install transformers

In [None]:
import numpy as np
import pandas as pd
import torch.optim as optim
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class CNNClassifier(nn.Module):

    def __init__(self, input_dim= 768, reduced_dim = 10, last_k =11, last_s = 20):
        """
        Define the CNN model for reducing the feature dimension of pretrained embeddings

        Args:
            input_dim (int): defines the input embedding dimension. Default is 768.
            reduced_dim (int): defines the target embedding dimension. Default is 10.
            last_k (int): defines the kernel size of the last pooling layer. Default is 11.
            last_s (int): defines the stride size of the last pooling layer. Default is 20.

        """

        super(CNNClassifier, self).__init__()

        self.layer1 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=2, stride=2)
        self.act1 = nn.ReLU()
        self.pooling1 = nn.AvgPool1d(2, stride=2)

        self.layer2 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=2, stride=1)
        self.act2 = nn.ReLU()
        self.pooling2 = nn.AvgPool1d(last_k, stride=last_s)

        self.linear = nn.Linear(reduced_dim, n_class)

    def forward(self, input_feature):
        """
        Forward the CNN model for reducing the feature dimension and classification

        """


        pooled_output = self.pooling1(self.act1(self.layer1(input_feature)))
        pooled_output = self.pooling2(self.act2(self.layer2(pooled_output)))

        prediction_logit = self.linear(pooled_output)

        return prediction_logit,pooled_output

In [None]:
from sklearn.metrics import accuracy_score, classification_report
def train(model, X_train, X_val, y_train, y_val, output_path_prefix, batch_size = 32, learning_rate = 5e-5, total_epoch = 10):
    """
    Train the CNN model on the given features and labels

    Args:
        model(object): initialised pretrained model with self defined classification head.
        X_train(List[Str]): list of training texts.
        X_val(List[Str]): list of validation texts.
        y_train(Array[Int]): encoded labels of training texts as NumPy array.
        y_val(list): encoded labels of validation texts as NumPy array.
        output_path_prefix(str): prefix of the path to save the trained model, comprised of the path to the output folder and the experiment name. This will be appended with '_best.pt' and '_best_state_dict.pt' to save the best model during training and its state dictionary.
        batch_size(int): batch size for training. Default is 32.
        learning_rate(float): learning rate for training. Default is 5e-5.
        total_epoch(int): total number of epochs for training. Default is 10.

    Returns:
        paths(Tuple(Str)): file paths where the best model and its state dictionary is saved .
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, eps = 1e-8)

    best = 0
    # Training the model
    for epoch in range(total_epoch):
        train_loss = 0
        for batch_start_index in range(0,len(X_train), batch_size):
            batch_df = X_train.iloc[batch_start_index:(batch_start_index+batch_size),:].astype(float)
            batch_tensor = torch.FloatTensor(batch_df.values).to(device)
            batch_tensor = torch.unsqueeze(batch_tensor,1)
            batch_labels = torch.tensor(y_train[batch_start_index:(batch_start_index+batch_size)]).to(device)

            model.train()
            optimizer.zero_grad()
            outputs,_ = model(batch_tensor)
            outputs= torch.squeeze(outputs,1)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # validation
        model.eval()
        val_predictions = []
        for batch_start_index in range(0,len(X_val.index), batch_size):

            batch_df = X_val.iloc[batch_start_index:(batch_start_index+batch_size),:].astype(float)
            batch_tensor = torch.FloatTensor(batch_df.values).to(device)
            batch_tensor = torch.unsqueeze(batch_tensor,1)
            batch_labels = torch.tensor(y_val[batch_start_index:(batch_start_index+batch_size)]).to(device)

            model.train()
            optimizer.zero_grad()
            outputs,_ = model(batch_tensor)
            outputs= torch.squeeze(outputs,1)
            predicted = torch.argmax(outputs, 1)
            val_predictions.append(predicted)

        predictions_tensor = torch.hstack(val_predictions)
        current_val_acc = accuracy_score(y_val, predictions_tensor.cpu().numpy())


        # save best
        if current_val_acc > best:
            torch.save(model, f'{output_path_prefix}_best.pt')
            torch.save(model.state_dict(),f'{output_path_prefix}_best_state_dict.pt')
            best =  current_val_acc
        print('Epoch: %d, train loss: %.5f, val acc: %.5f, best acc: %.5f'%(epoch + 1, train_loss, current_val_acc, best))


    print('Finished Training')
    return f'{output_path_prefix}_best.pt', f'{output_path_prefix}_best_state_dict.pt'


In [None]:
def evaluate(model_path, X_test, label_encoded_test, batch_size=32):
    """
    Run inference of the trained CNN model on the testing test features and compute the accuracy and f1 score

    Args:
        model_path(str): file path where the best CNN model is saved .
        X_test(Dataframe): testing set features
        label_encoded_test(Array[Int]): encoded labels of testing texts as NumPy array.
        batch_size(int): batch size for inference. Default is 32.

    Returns:
        None
    """
    model = torch.load(model_or_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # evaluation on the test set
    model.eval()
    predictions = []
    for batch_start_index in range(0,len(X_test.index), batch_size):

    batch_df = X_test.iloc[batch_start_index:(batch_start_index+batch_size),:].astype(float)
    batch_tensor = torch.FloatTensor(batch_df.values).to(device)
    batch_tensor = torch.unsqueeze(batch_tensor,1)
    batch_labels = torch.tensor(label_encoded_test[batch_start_index:(batch_start_index+batch_size)]).to(device)

    outputs,_ = model(batch_tensor)
    outputs= torch.squeeze(outputs,1)
    predicted = torch.argmax(outputs, 1)
    predictions.append(predicted)

    predictions_tensor = torch.hstack(predictions)
    print(classification_report(label_encoded_test, predictions_tensor.cpu().numpy(),digits=4))

In [None]:
def get_CLS_row_values(model_path, raw_feature_df, labels, dim_size):
    """
    Extract the CLS embeddings of the given texts.

    Args:
        model_path(str): file path where the best finetuned model is saved .
        raw_feature_df(Dataframe): input features
        labels(Array[Int]): encoded labels of texts as NumPy array.
        dim_size(int): dimension size of the embeddings.

    Returns:
        row_values (List[List[float]]): CLS embeddings + encoded label of the given input.
    """
    model = torch.load(model_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    row_values = []
    with torch.no_grad():
        model.eval()
        for batch_start_index in range(0,len(raw_feature_df.index), 1):
            batch_tensor = torch.FloatTensor(raw_feature_df.iloc[batch_start_index:(batch_start_index+1),:dim_size].values).to(device)
            batch_tensor = torch.unsqueeze(batch_tensor,1)
            _,cls_rep = model(batch_tensor)
            one_row = cls_rep.cpu().squeeze().tolist()
            one_row.append(labels[batch_start_index])
            row_values.append(one_row)
    return row_values

In [None]:
def save_CLS_embs(model_path, raw_feature_df, labels, output_path, dim_size=768,save_csv=True):
    """
    Convert any extracted embeddings as a dataframe and save them in CSV file.

    Args:
        model_path(str): file path where the best finetuned model is saved .
        raw_feature_df(Dataframe): input features
        labels(Array[Int]): encoded labels of texts as NumPy array.
        output_path(str): path to save the CSV file.
        dim_size(int): dimension size of the embeddings. Default is 768.
        save_csv(bool): whether to save the CSV file or not. Default is True.

    Returns:
        df (DataFrame): Given embeddings coverted to the dataframe, where each dim of the embedding is a column, and named as f'vec_val_{dim_index}', and the encoded label information is stored as the last column named as  'Class_label'.
    """
    column_names = []
    for i in range(dim_size):
        column_names.append("vec_val_" + str(i))
    column_names.append("Class_label")

    train_row_values = get_CLS_row_values(model_path, raw_feature_df, labels, dim_size)
    df = pd.DataFrame(train_row_values, columns=column_names)
    if save_csv:
        df.to_csv(output_path,sep=',',index=False)
    return df

