In [None]:
import random
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import transformers
from transformers import  models
from sklearn.metrics import accuracy_score, f1_score


import logging
import torch.nn as nn
from sentence_transformers import SentenceTransformer,  models
from scipy.stats import pearsonr, spearmanr
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import KFold
from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam

In [None]:

class Sent_Embed_Dense(nn.Module):
    def __init__(self, model_name, pooling_mode, input_dim, output_dim=1):
        super(Sent_Embed_Dense, self).__init__()
        self.sbert = SentenceTransformer(model_name)
        self.pooling = models.Pooling(self.sbert.get_sentence_embedding_dimension(), pooling_mode)
        self.dense = nn.Linear(2 * input_dim, output_dim)  # input_dim = input_dim for element wise subtraction
    
    def forward(self, features):
        text1 = features['text1']
        text2 = features['text2']

        # Get embeddings for both texts
        embeddings1 = self.sbert.encode(text1, convert_to_tensor=True)
        embeddings2 = self.sbert.encode(text2, convert_to_tensor=True)

        # Concatenate embeddings
        combined_embeddings = torch.concat((embeddings1, embeddings2), dim=1)

        # element wise subtraction embeddings
        #combined_embeddings = (embeddings1- embeddings2)
        output = self.dense(combined_embeddings)
        return output


class CustomDataset(Dataset):
    def __init__(self, text1, text2, labels):
        self.text1 = text1
        self.text2 = text2
        self.labels = labels
 
    
    def __len__(self):
        return len(self.text1)
    
    def __getitem__(self, idx):
        text1 = self.text1[idx]
        text2 = self.text2[idx]
        label = self.labels[idx]
        return {'text1': text1, 'text2': text2}, torch.tensor(label, dtype=torch.float32)
    
def make_dataloader(df,batch_size):
    texts1 = df['user_thoughts_and _feelings'].values
    texts2 = df['designer_guess'].values
    labels = df['Avg_EA'].values
    
    dataset = CustomDataset(texts1, texts2, labels)
    dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size, collate_fn=custom_collate_fn)
    #print(zip(*dataloader))
    return dataloader

def custom_collate_fn(batch):
    texts, labels = zip(*batch)
    combined_inputs = {}
    for key in texts[0].keys():
        combined_inputs[key] = [text[key] for text in texts]
    
    # Convert labels to tensors and move to GPU
    labels = (labels)
    
    return combined_inputs, labels

def calculate_correlation(predictions, labels):
    # Flatten tensors and convert to numpy arrays
    predictions = torch.cat(predictions).cpu().detach().numpy().flatten()
    labels = torch.cat(labels).cpu().detach().numpy().flatten()
    #print(predictions)
    
    if len(predictions) < 2 or len(labels) < 2:
        raise ValueError("Both predictions and labels must have at least 2 data points")
    
    # Compute Pearson correlation
    pearson, _ = pearsonr(predictions, labels)  
    rmse = np.sqrt(np.mean((labels - predictions)**2))
    spearman, _ = spearmanr(predictions, labels)
    return pearson,spearman, rmse

In [None]:


# Define your SBERT model with additional dense layer
model_name = "sentence-transformers/xlm-r-distilroberta-base-paraphrase-v1"  
pooling_mode = "mean" 
input_dim = 768  
model = Sent_Embed_Dense(model_name, pooling_mode, input_dim)

model.to('cuda')
tokenizer_ = model.sbert.tokenizer 

# Define loss and optimizer
loss_fn = nn.MSELoss()
optimizer = Adam(model.parameters(), lr=2e-5)

# Load your data
data = pd.read_excel(r"full data-EA-preprocessed.xlsx" )

data['Avg_EA'] = data["Average EA"]/2
User= data["Inferences"]
Des= data["Users' thoughts or feeling"]
User = User.str.lower().str.replace(':','').str.replace('i was', '').str.replace('she / he was', '').str.replace('s/he was', '').str.replace('she was', '').str.replace('he was', '').str.replace('they were', '')
Des = Des.str.lower().str.replace(':','').str.replace('i was', '').str.replace('she / he was', '').str.replace('s/he was', '').str.replace('she was', '').str.replace('he was', '').str.replace('they were', '')

data['user_thoughts_and _feelings'] = User
data['designer_guess'] = Des
data['Avg_EA'] = data["Average EA"] / 2

k_folds = 10
kf = KFold(n_splits=k_folds, shuffle=True, random_state=123)

# Training loop
epochs = 50
results = []

All_pred = []
Train_predictions = []
Val_predictions = []
for fold, (train_idx, val_idx) in enumerate(kf.split(data)):
    train_df = data.iloc[train_idx]
    val_df = data.iloc[val_idx]
    
    train_dataloader = make_dataloader(train_df,  batch_size=1)
    val_dataloader = make_dataloader(val_df, batch_size=1)

    for epoch in range(epochs):
        all_predictions_train = []
        all_labels_train = []
        model.train()
        for batch in train_dataloader:
            inputs, labels = batch
            inputs = {k: v for k, v in inputs.items()}  # Move inputs to GPU
            labels = torch.stack(labels).to('cuda')
            labels = labels.view(-1, 1)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            all_predictions_train.append(outputs)
            all_labels_train.append(labels)

       
            loss.backward()
            optimizer.step()
        train_pearson, train_spearman, train_rmse = calculate_correlation(all_predictions_train, all_labels_train)
        Train_predictions.append([train_pearson, train_spearman, train_rmse ])
        #print(f"Fold {fold + 1}, Epoch {epoch + 1}, Val Loss: {val_loss / len(val_dataloader)}")
        # Validation step
        model.eval()
        val_loss = 0
        all_predictions = []
        all_labels = []
        
        with torch.no_grad():
            for batch in val_dataloader:
                inputs, labels = batch
                inputs = {k: v for k, v in inputs.items()}  # Move inputs to GPU
                labels = torch.stack(labels).to('cuda')
                labels = labels.view(-1, 1)
                
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                val_loss += loss.item()
                
                all_predictions.append(outputs)
                all_labels.append(labels)
                
                #print(len(all_predictions))
                #print(all_predictions)
            val_pearson,val_spearman, val_rmse  = calculate_correlation(all_predictions, all_labels)
            Val_predictions.append([val_pearson,val_spearman, val_rmse])
        print(f"Fold {fold + 1}, Epoch {epoch + 1}, Val Loss: {val_loss / len(val_dataloader)}")
    
    results.append({
                'fold': fold,
                'train_pearson': train_pearson, 'train_spearman': train_spearman,'train_rmse': train_rmse,
                'val_pearson': val_pearson, 'val_spearman': val_spearman,'val_rmse': val_rmse,
                #'baseline_pearson':baseline_pearson, 'baseline_spearman':baseline_spearman, 'baseline_rmse':baseline_rmse
            })
    All_pred.append({
        'fold': fold,
        'train_pearson': [i[0] for i in Train_predictions], 'train_spearman': [i[1] for i in Train_predictions],'train_rmse': [i[2] for i in Train_predictions],
         'val_pearson': [i[0] for i in Val_predictions], 'val_spearman': [i[1] for i in Val_predictions],'val_rmse': [i[2] for i in Val_predictions],
    
    })
    
        
# Save the results
results_df = pd.DataFrame(results)

results_df.to_csv("Dense_layer_experiment_with_SBERT Concatination evaluation.csv", index=False)
