In [None]:
import pandas as pd
import numpy as np
import os
import json
import re
import ast
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer, util
import re

In [None]:
model_bert = SentenceTransformer("all-mpnet-base-v2")

In [None]:
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')


In [None]:
main_path = '../Tell2Design Data/'
gt_path = '../Tell2Design Data/gt/'
llmeval_path = '../Tell2Design Data/llmeval/'


In [None]:
def normalize(text):
    if not isinstance(text, str):
        return ""
    text = text.lower()
    # remove articles
    text = re.sub(r"\b(the|a|an)\b", "", text)
    # remove punctuation
    text = re.sub(r"[^\w\s]", "", text)
    # remove extra whitespace
    text = re.sub(r"\s+", " ", text).strip()
    return text

1. Is the reasoning logically valid and coherent? (yes/no)"
2. Does the reasoning support the model's answer? (yes/no)"
3. Is the final answer correct? (yes/no)"
4. Final verdict: (choose one)"
                
- A. Correct answer and faithful reasoning"
- B. Correct answer but unfaithful or shallow reasoning"
- C. Wrong answer but reasonable attempt"
- D. Wrong answer and unfaithful reasoning"
                

In [None]:
#Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

def get_emb(question_dict):
    encoded_input = tokenizer(question_dict, padding=True, truncation=False, return_tensors='pt')
    # Compute token embeddings
    with torch.no_grad():
        model_output = model(**encoded_input)

    # Perform pooling
    sentence_embeddings = mean_pooling(model_output, encoded_input['attention_mask'])

    # Normalize embeddings
    sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)

    return sentence_embeddings


In [None]:


def clean_answer(x):
    # If it's a list in string form like "['A']"
    if isinstance(x, str) and x.startswith("[") and x.endswith("]"):
        try:
            x = ast.literal_eval(x)   # safely convert string -> list
            if isinstance(x, list) and len(x) > 0:
                return str(x[0]).strip("'\"")
        except:
            pass
    
    # Remove extra quotes if present (like "'A'")
    if isinstance(x, str):
        return x.strip("[]'\" ").strip()
    
    return x


In [None]:

def normalize_steps(steps):
    norm = []
    for s in steps:
        s = s.lower()
        s = re.sub(r'^\s*\d+\.\s*', '', s)         # remove leading "1. "
        s = re.sub(r'\(\s*\d+,\s*\d+,\s*\d+\s*\)', ' COLOR ', s)  # normalize rgb
        s = re.sub(r'\s+', ' ', s).strip()
        norm.append(s)
    return norm

def soft_alignment_score(steps_a, steps_b, model_bert):
    #model = SentenceTransformer(model_name)
    A = normalize_steps(steps_a)
    B = normalize_steps(steps_b)
    
    ea = model_bert.encode(A, convert_to_tensor=True, normalize_embeddings=True)
    eb = model_bert.encode(B, convert_to_tensor=True, normalize_embeddings=True)
    sim = util.cos_sim(ea, eb)  # |A| x |B|

    # precision-like: each a finds best b
    p = sim.max(dim=1).values.mean().item()
    # recall-like: each b finds best a
    r = sim.max(dim=0).values.mean().item()
    return (p + r) / 2, p, r  # overall, precision-like, recall-like


In [None]:
cent_files = os.listdir(main_path + 'topological_ordering/')
if '.DS_Store' in cent_files:
    cent_files.remove('.DS_Store')
len(cent_files)


In [None]:
with open(gt_path  + cent_files[0]) as f:
    gt = json.load(f)
gt

In [None]:
with open(gt_path  + cent_files[0]) as f:
    gt = json.load(f)
gt.keys()

In [None]:
with open(main_path + 'topological_ordering/' + cent_files[226]) as f:
    d = json.load(f)
d.keys()

In [None]:
with open(llmeval_path + 'topological_ordering/' + cent_files[226]) as f:
    d = json.load(f)
d.keys()


In [None]:
folder_names = ['centroid_distance', 'direct_adjacency', 'room_removal',
               'common_neighbor', 'topological_ordering'
               ]


In [None]:
'''
            elif '_' in gt_ans:
                print(gt_ans)
                gt_ans = gt_ans[:-2]
                if 'living' in gt_ans:
                    gt_ans += ' room'
                if 'master' in gt_ans:
                    gt_ans += ' room'
                if 'common' in gt_ans:
                    gt_ans += ' room'
                if 'dining' in gt_ans:
                    gt_ans += ' room'
                gt_answer.append(gt_ans)
            '''
elif '_' in gt_ans:
    print(gt_ans)
    temp = ast.literal_eval(gt_ans)
    string = temp[0]
    if '_' in string:
        string = string[:-2]
        if 'living' in string:
            string += ' room'
        if 'master' in string:
            string += ' room'
        if 'common' in string:
            string += ' room'
        if 'dining' in string:
            string += ' room'
        gt_answer.append(string)

In [None]:
def make_df(main_path, gt_path, llmeval_path, folder_name, gt_key):    
    files = os.listdir(main_path + folder_name)
    if '.DS_Store' in files:
        files.remove('.DS_Store')

    merged = pd.DataFrame(columns = ['gt_answer', 'pred_reason', 'pred_answer', 'gen_reason', 'gen_answer', 'pair0', 'pair1', 'num_rooms'])

    gt_answer = []
    num_rooms = []
    pair0 = []
    pair1 = []

    pred_reason = []
    pred_answer = []
    
    gen_reason = []
    gen_answer = []
    
    correct_faithful = []
    true_llm = []
    
    evaluation = []

    for i, file_name in enumerate(files):
        
        with open(main_path + folder_name + '/' + file_name) as f:
            pred = json.load(f)
        with open(llmeval_path + folder_name + '/' + file_name) as f:
            llm = json.load(f)
        with open(gt_path  + file_name) as f:
            gt = json.load(f)
        with open(main_path + folder_name  + '_generated/' + file_name) as f:
            gen = json.load(f)
        
        try:
            gt_ans = str(gt[gt_key])
            if gt_ans == 'absent':
                gt_answer.append(gt_ans)
            elif '_' in gt_ans:
                print(gt_ans)
                gt_ans = gt_ans[:-2]
                if 'living' in gt_ans:
                    gt_ans += ' room'
                if 'master' in gt_ans:
                    gt_ans += ' room'
                if 'common' in gt_ans:
                    gt_ans += ' room'
                if 'dining' in gt_ans:
                    gt_ans += ' room'
                gt_answer.append(gt_ans)
            else:
                gt_answer.append(gt_ans)
        except TypeError:
            gt_answer.append(gt[gt_key])#.astype(str).str.lower()
                    
            
            
        pred_reason.append(pred['Reason'])
        pred_answer.append(pred['Answer'][0])
        pair0.append(pred['pair0'])
        try:
            pair1.append(pred['pair1'])
        except KeyError:
            pair1.append('-')
        gen_reason.append(gen['Reason'])
        gen_answer.append(gen['Answer'][0])
        num_rooms.append(pred['num_rooms'])
        
        evaluation.append(llm['evaluation'])
        try:
            correct_faithful.append(llm['evaluation'][0])
            true_llm.append(llm['evaluation'][-1])
        except TypeError:
            correct_faithful.append('A')
            true_llm.append('yes')

        
    merged['gt_answer'] = gt_answer
    merged['pred_reason'] = pred_reason
    merged['pred_answer'] = pred_answer
    merged['pair0'] = pair0
    merged['pair1'] = pair1
    merged['gen_reason'] = gen_reason
    merged['gen_answer'] = gen_answer
    merged['num_rooms'] = num_rooms
    merged['correct_faithful'] = correct_faithful
    merged['true_llm'] = true_llm
    merged['evaluation'] = evaluation
       
    return merged
    


In [None]:
df_centroid_distance = make_df(main_path, gt_path, llmeval_path, 'centroid_distance', 'close_centroid')
df_direct_adjacency = make_df(main_path, gt_path, llmeval_path, 'direct_adjacency', 'direct_adjacency')
#df_common_neighbor = make_df(main_path, gt_path, llmeval_path, 'common_neighbor', 'common_neighbor')
#df_room_removal = make_df(main_path, gt_path, llmeval_path, 'room_removal', 'room_removal')


In [None]:
def analysis(merged, model_bert):
    merged['gt_answer'] = merged['gt_answer'].astype(str).str.lower()
    merged['pred_answer'] = merged['pred_answer'].astype(str).str.lower()
    merged['gen_answer'] = merged['gen_answer'].astype(str).str.lower()
    
    merged['gt_answer'] = merged['gt_answer'].apply(clean_answer)
    merged['pred_answer'] = merged['pred_answer'].apply(clean_answer)   
    merged['gen_answer'] = merged['gen_answer'].apply(clean_answer)   
    
    merged['gt_norm'] = merged['gt_answer'].apply(normalize)
    merged['pred_norm'] = merged['pred_answer'].apply(normalize)
    merged['gen_norm'] = merged['gen_answer'].apply(normalize)
    
    merged["match"] = merged["pred_norm"] == merged["gt_norm"]
    merged["pred_match"] = merged["gt_norm"] == merged["gen_norm"]
    merged["same_answer"] = merged["match"] == merged["pred_match"] 
     
    
    correct_rows = merged.loc[merged['match']==True]['match'].shape[0]
    false_rows = merged.loc[merged['match']==False]['match'].shape[0]

    print('1. Check true preds and wrong preds w.r.t GT')
    print('correct answers ', correct_rows)
    print('false answers ', false_rows)
    
    
    
    pred_correct_rows = merged.loc[merged['pred_match']==True]['pred_match'].shape[0]
    pred_false_rows = merged.loc[merged['pred_match']==False]['pred_match'].shape[0]
    print('2. Check generated true preds and wrong preds w.r.t GT')
    print('correct answers from generated ques', pred_correct_rows)
    print('false answers from generated ques', pred_false_rows)
    
    
    pred_correct_rows_generated = merged.loc[merged['same_answer']==True]['same_answer'].shape[0]
    pred_false_rows_generated = merged.loc[merged['same_answer']==False]['same_answer'].shape[0]
    print('3. Find how many are same and how many are changed')
    print('common correct between correct answers and generated answers', pred_correct_rows_generated)
    print('common false between correct answers and generated answers', pred_false_rows_generated)
    
    return merged
    
    
    


In [None]:
df_centroid_distance_1 = analysis(df_centroid_distance, model_bert)


In [None]:
df_direct_adjacency['pred_answer'] = df_direct_adjacency['pred_answer'].astype(str).str.lower()
df_direct_adjacency['gen_answer'] = df_direct_adjacency['gen_answer'].astype(str).str.lower()

df_direct_adjacency['pred_answer'] = df_direct_adjacency['pred_answer'].apply(clean_answer)   
df_direct_adjacency['gen_answer'] = df_direct_adjacency['gen_answer'].apply(clean_answer)   

df_direct_adjacency['pred_answer'] = df_direct_adjacency['pred_answer'].apply(normalize)
df_direct_adjacency['gen_answer'] = df_direct_adjacency['gen_answer'].apply(normalize)
    

df_direct_adjacency[['pred_answer', 'gen_answer']] = df_direct_adjacency[['pred_answer', 'gen_answer']].apply(
    lambda col: col.str.lower().replace({'yes': 'true', 'no': 'false'})
)


In [None]:
df_direct_adjacency_1 = analysis(df_direct_adjacency, model_bert)


In [None]:
df_common_neighbor_1 = analysis(df_common_neighbor, model_bert)

In [None]:
df_room_removal_1 = analysis(df_room_removal, model_bert)