In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from tqdm import tqdm
import transformers

In [None]:
score_df = pd.read_csv('data.csv', index_col=0)
print("Total number of samples: ", score_df.shape[0])
score_df.head()

In [None]:
score_df['score_avg'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Average Score')
plt.ylabel('Number of samples')
plt.title('Average Score Distribution')
plt.show()

In [None]:
# Rounding to nearest .5 value
score_df['score_avg'] = score_df['score_avg'].apply(lambda x: round(x * 2) / 2)
score_df['score_avg'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Average Score')
plt.ylabel('Number of samples')
plt.title('Average Score Distribution')
plt.show()


In [None]:
# Define the function to generate BERT embeddings

model_name = 'bert-base-uncased'
def generate_bert_embeddings(text):
    """
    Generates BERT embeddings for a list of texts.

    Args:
        model_name: The name of the BERT model to use.
        texts: A list of strings.

    Returns:
        A DataFrame containing BERT embeddings for each text.
    """
    # text = [text]

    # Load the BERT model
    tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
    model = transformers.AutoModel.from_pretrained(model_name)

    # Function to generate embeddings for a single text
    def generate_embedding(text):
        # Encode the text
        encoded_text = tokenizer(text, padding=True, truncation=True, return_tensors='pt')

        # Generate the embeddings
        with torch.no_grad():
            model_output = model(**encoded_text)
            embeddings = model_output.pooler_output

        # Reshape embeddings to (batch_size, embedding_size)
        embeddings = embeddings.squeeze(0)  # Remove the batch dimension
        return embeddings.detach().numpy()

    return generate_embedding(text)


score_df['question_embedding'] = score_df['question'].parallel_apply(generate_bert_embeddings)
score_df['refanswer_embedding'] = score_df['refanswer'].parallel_apply(generate_bert_embeddings)
score_df['answer_embedding'] = score_df['answer'].parallel_apply(generate_bert_embeddings)


In [None]:
score_df.to_pickle("data_processed.pkl")

In [None]:
score_df.head()

## Embedding Classification

In [None]:
score_df = pd.read_pickle('data_processed.pkl')

In [None]:
score_df['score_avg'] = score_df['score_avg'].round()

In [None]:
score_df['combined_embedding'] = score_df.apply(lambda row: np.concatenate((row['question_embedding'], row['refanswer_embedding'], row['answer_embedding']), axis=0), axis=1)

# do train test split with stratification
train_df, test_df = train_test_split(score_df, test_size=0.2, stratify=score_df['score_avg'], random_state=42)
print("Train samples: ", train_df.shape[0])
print("Test samples: ", test_df.shape[0])

train_df['score_avg'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Average Score')
plt.ylabel('Number of samples')
plt.title('Train Average Score Distribution')
plt.show()

test_df['score_avg'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Average Score')
plt.ylabel('Number of samples')
plt.title('Test Average Score Distribution')
plt.show()

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder
import xgboost as xgb
from sklearn.svm import SVC



# Initialize the classifier
# classifier = RandomForestClassifier(n_estimators=100, random_state=42)
# classifier = xgb.XGBClassifier(
#     n_estimators=100,  # Number of trees
#     max_depth=3,       # Maximum depth of each tree
#     learning_rate=0.1, # Learning rate
# )
classifier = SVC(kernel='linear')

# ohe = OneHotEncoder(handle_unknown='ignore')

# Train the classifier on the combined embeddings
X_train = train_df['combined_embedding'].tolist()
# y_train_sparse = ohe.fit_transform(train_df['score_avg'].values.reshape(-1,1))
# y_train = y_train_sparse.toarray()
y_train = train_df['score_avg'].astype('str')
# y_train = train_df['score_avg'].values
classifier.fit(X_train, y_train)

In [None]:
X_test = test_df['combined_embedding'].tolist()
y_test = test_df['score_avg'].astype('str')

y_pred_train = classifier.predict(X_train)
y_pred_test = classifier.predict(X_test)

In [None]:
from sklearn import metrics

In [None]:
precision, recall, f1, _ = metrics.precision_recall_fscore_support(y_train, y_pred_train, average="weighted")

print("Precision :", precision)
print("Recall :", recall)
print("F1 :", f1)

In [None]:
precision, recall, f1, _ = metrics.precision_recall_fscore_support(y_test, y_pred_test, average="weighted")

print("Precision :", precision)
print("Recall :", recall)
print("F1 :", f1)

## NN Classification

In [None]:
score_df = pd.read_csv('data.csv', index_col=0)
print("Total number of samples: ", score_df.shape[0])
score_df['score_avg'] = score_df['score_avg'].round().astype(int)
score_df.head()

In [None]:
num_classes = score_df['score_avg'].nunique()
score_df['text'] = "Question :\n" + score_df['question'] + "\nReference Answer:\n" + score_df['refanswer'] + "\nActual Answer:\n" + score_df['answer']

text_samples = score_df['text'].values
labels = score_df['score_avg'].values

In [None]:
from torch.utils.data import DataLoader, Dataset
import os
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, f1_score
import pandas as pd

train_texts, val_texts, train_labels, val_labels = train_test_split(text_samples, labels, test_size=0.2, random_state=42)


In [None]:
# prompt: ignore warning

import warnings
warnings.filterwarnings('ignore')


In [None]:
from transformers import BertTokenizer, BertModel
from torch import nn

class BertClassifier(nn.Module):
  def __init__(self, model_name, num_classes):
    super().__init__()
    self.bert_model = BertModel.from_pretrained(model_name)
    self.dropout = nn.Dropout(0.2)
    self.linear = nn.Linear(self.bert_model.config.hidden_size, num_classes)

  def forward(self, input_ids, attention_mask):
    outputs = self.bert_model(input_ids = input_ids, attention_mask = attention_mask)
    pooled_output = outputs.pooler_output
    x = self.dropout(pooled_output)
    return self.linear(x)

In [None]:
def train(model, data_loader, optimizer, scheduler, device):
  model.train()
  for batch in data_loader:
    optimizer.zero_grad()
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['label'].to(device)
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    loss = nn.CrossEntropyLoss()(outputs, labels)
    loss.backward()
    optimizer.step()
    scheduler.step()

def evaluate(model, data_loader, device):
    model.eval()
    predictions = []
    actual_labels = []
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            predictions.extend(preds.cpu().tolist())
            actual_labels.extend(labels.cpu().tolist())
    return accuracy_score(actual_labels, predictions), classification_report(actual_labels, predictions), f1_score(actual_labels, predictions, average='weighted')

In [None]:
# Set up parameters
bert_model_name = 'bert-base-uncased'
num_classes = score_df['score_avg'].nunique()
max_length = 128
batch_size = 16
num_epochs = 4
learning_rate = 2e-5

class TextClassificationDataset(Dataset):

    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(text, return_tensors='pt', max_length=self.max_length, padding='max_length', truncation=True)
        return {'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'label': torch.tensor(label)}

tokenizer = BertTokenizer.from_pretrained(bert_model_name)
train_dataset = TextClassificationDataset(train_texts, train_labels, tokenizer, max_length)
val_dataset = TextClassificationDataset(val_texts, val_labels, tokenizer, max_length)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertClassifier(bert_model_name, num_classes).to(device)

optimizer = AdamW(model.parameters(), lr=learning_rate)
total_steps = len(train_dataloader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

In [None]:
from tqdm import tqdm
for epoch in tqdm(range(num_epochs)):
  # print(f"Epoch {epoch + 1}/{num_epochs}")
  train(model, train_dataloader, optimizer, scheduler, device)
  accuracy, report, f1_score = evaluate(model, val_dataloader, device)
  print(f"\nValidation Accuracy: {accuracy:.4f}")
  print(f"\nF1 Score: {f1_score:.4f}")
  print(report)

In [None]:
def predict_sentiment(text, model, tokenizer, device, max_length=128):
    model.eval()
    encoding = tokenizer(text, return_tensors='pt', max_length=max_length, padding='max_length', truncation=True)
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs, dim=1)
    return preds.items()