<a href="https://colab.research.google.com/github/OscarRojasG/Experimentos-EvaluAI/blob/main/Framework_EvaluAI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementación

## Cargar dataset y librerías necesarias

In [1]:
!pip install openai==0.28 &> /dev/null
!pip install openai-multi-client &> /dev/null
!pip install plotly
!git clone https://github.com/rilianx/GPTEvaluator &> /dev/null



In [2]:
import pandas as pd
from IPython.display import display

# Muestra información relevante del dataset
def show_dataset_info(dataset):
    display(dataset.head())
    print()
    print(dataset.value_counts("real_eval"), end="\n\n")
    print(dataset.value_counts("dataset"))
    pass

# Carga un dataset a partir de un archivo xlsx y valida sus columnas
def load_dataset(path, sheet_name, column_data):
    df = pd.read_excel(path, sheet_name=sheet_name)

    mandatory_cols = ["context", "question", "answer", "real_eval", "dataset"]
    for key in mandatory_cols:
        if key not in column_data.keys():
            raise Exception(f"Error: Debe especificar la columna para la variable {key}")

        value = column_data[key]
        if value not in df.columns:
            raise Exception(f"Error: La columna {value} no existe")

        df = df.rename(columns={value: key})

    df = df[mandatory_cols]
    df['row'] = df.index + 2
    show_dataset_info(df)
    return df

## Generación prompts

In [3]:
import pprint
import copy
import json
import os
import re

class Prompt():
    def __init__(self, structure, instructions, base_folder, key_pos):
        self.structure = structure
        self.instructions = instructions
        self.base_folder = base_folder
        self.key_pos = key_pos
        self.raw_text_structure = None
        self.text_structure = None
        self.criteria = None
        self.output_instructions = None
        self.prompt = None

        self.read_files()
        self.extract_metadata()
        self.build_prompt()

    # Retorna la estructura base del prompt (diccionario)
    def base_structure(self):
        structure = copy.deepcopy(self.structure)
        structure['instructions'] = {}
        for i in self.instructions:
            structure['instructions'][i] = structure[i]
            structure.pop(i, None)
        return structure

    # Crea un diccionario con el contenido de cada archivo en la estructura
    def read_files(self):
        self.raw_text_structure = copy.deepcopy(self.structure)

        for key, value in self.raw_text_structure.items():
            if key == "instructions": continue

            path = f"{self.base_folder}/{key}/{value}"
            try:
                self.raw_text_structure[key] = open(path, 'r', encoding='utf-8').read()
                self.raw_text_structure[key] += "\n\n"
            except:
                raise Exception(f"Error: El archivo {path} no existe")

    # Extrae metadatos de los archivos como los criterios e instrucciones de salida
    def extract_metadata(self):
        self.text_structure = copy.deepcopy(self.raw_text_structure)

        if 'score' in self.text_structure:
            lines = self.text_structure['score'].split('\n')
            self.criteria = [line[1:] for line in lines if line.startswith('$')]
            text = [line for line in lines if not line.startswith('$')]
            self.text_structure['score'] = '\n'.join(text)

        self.output_instructions = {}
        for key in self.text_structure:
            value = self.text_structure[key]

            if value.startswith('#') and not value.startswith('##'):
                m = re.search(r'#(.*?)\n', value).group(1)
                self.output_instructions[key] = m
                self.text_structure[key] = '\n'.join(value.split('\n')[1:])

    '''
    # Construye el prompt en formato string
    def build_prompt(self):
        self.prompt = ""
        for key, value in self.text_structure.items():
            self.prompt += value

        output = "I expect a dict in python as answer: {{"
        for key, value in self.output_instructions.items():
            output += f'"{key}": \'{value}\', '

        if len(self.criteria) > 0:
            for c in self.criteria:
                output += f'"{c}": {c}_score, '
            output = output[:-2]
        else:
            output += '"score": score'

        output += "}}\n\nPython dict:"
        self.prompt += output
    '''

    # Construye el prompt en formato string
    def build_prompt(self):
        self.prompt = ""
        for key, value in self.text_structure.items():
            self.prompt += value

        if self.key_pos == 'after':
            output = self.build_output_keys_after()
        else:
            output = self.build_output_keys_before()

        self.prompt += output

    def build_output_keys_before(self):
        output = "I expect a dict in python as answer: {{"
        for key, value in self.output_instructions.items():
            output += f'"{key}": \'{value}\', '

        if len(self.criteria) > 0:
            for c in self.criteria:
                output += f'"{c}": {c}_score, '
            output = output[:-2]
        else:
            output += '"score": score'

        output += "}}\n\nPython dict:"
        return output


    def build_output_keys_after(self):
        output = "I expect a dict in python as answer: {{"
        if len(self.criteria) > 0:
            for c in self.criteria:
                output += f'"{c}": {c}_score, '
            output = output[:-2]
        else:
            output += '"score": score'

        for key, value in self.output_instructions.items():
            output += f', "{key}": \'{value}\''

        output += "}}\n\nPython dict:"
        return output

# Procesa y elimina los diccionarios anidados de prompt_data
def normalize_prompt_dict(prompt_data):
    instructions = []
    after_instructions = {}
    found_target = False

    if "instructions" in prompt_data:
        for key, value in prompt_data.items():
            if found_target:
                after_instructions[key] = value
            if key == "instructions":
                found_target = True

        for (key, value) in prompt_data["instructions"].items():
            prompt_data[key] = value
            instructions.append(key)

        for key, value in after_instructions.items():
            del prompt_data[key]
            prompt_data[key] = value

    prompt_data["instructions"] = "Instructions:\n"
    return prompt_data, instructions

# Retorna la lista de archivos para reemplazar el comodín *
def expand_prompt_data(prompt_data, prompt_folder):
    wildcard_field = None
    for key, value in prompt_data.items():
        if value == "*":
            wildcard_field = key
            break

    if not wildcard_field: return None, None

    wildcard_files = []
    path = f"{prompt_folder}/{wildcard_field}"
    for file in sorted(os.listdir(path)):
        if os.path.isfile(os.path.join(path, file)):
            wildcard_files.append(file)

    return wildcard_field, wildcard_files

# Genera una lista con los prompts a evaluar
def generate_prompts(prompt_data, prompt_folder, key_pos='before'):
    template, instructions = normalize_prompt_dict(prompt_data)
    wildcard_field, wildcard_files = expand_prompt_data(template, prompt_folder)
    prompts = []

    if wildcard_field == None:
        prompt = Prompt(template, instructions, prompt_folder, key_pos)
        prompts.append(prompt)
        print(prompt.prompt)
        return prompts

    for file in wildcard_files:
        structure = copy.deepcopy(template)
        structure[wildcard_field] = file
        prompt = Prompt(structure, instructions, prompt_folder, key_pos)
        prompts.append(prompt)

    # Visualizar
    template = copy.deepcopy(prompts[0])
    template.raw_text_structure[wildcard_field] = f"{{{wildcard_field}}}\n\n"
    template.extract_metadata()
    template.build_prompt()
    print(template.prompt)

    print(f"\n\nArchivos a utilizar ({len(wildcard_files)}):\n")
    print("\n".join(wildcard_files))

    return prompts

## Optimización parámetros

In [4]:
from scipy.optimize import differential_evolution
from abc import ABC, abstractmethod
import numpy as np

class ScoreWeighter():
    @staticmethod
    def eval(x, theta, right_offset):
        return np.dot(x, theta[:-right_offset])

class MapOptimizer(ABC):
    def __init__(self, map_params_size):
        self.map_params_size = map_params_size

    def optimize(self, criteria_scores, real_scores):
        bounds =  [(0, 1) for _ in range(len(criteria_scores[0]))] + [(0, 10)] * self.map_params_size
        result = differential_evolution(self.error, bounds, args=(criteria_scores, real_scores), seed=1, strategy='rand1exp', mutation=(0,1), recombination=1)
        return result.x.tolist()

    def error(self, theta, x, y):
        y_pred = self.f(x, theta)
        mse = np.sum((y - y_pred) ** 2)

        # Penalización cuando suma de ponderaciones != 1
        weights = theta[:-self.map_params_size]
        penalty = 1e6 * np.abs(np.sum(weights) - 1)

        # Penalización cuando parámetros de mapeo no están de menor a mayor
        map_params = theta[-self.map_params_size:]
        penalty += sum((map_params[i] - map_params[i+1]) * 1e5 for i in range(len(map_params)-1) if map_params[i] > map_params[i+1])
        return mse + penalty

    @abstractmethod
    def f(self, x, theta):
        pass

class MapOptimizer4(MapOptimizer):
    def __init__(self):
        super().__init__(4)

    def map_array(self, w_scores, theta):
        a, b, c, d = theta[-4:]
        def map(x):
            if x <= a:
                return 0
            if a < x <= b:
                return (x - a) / (b - a)
            if b < x <= c:
                return 1 + (x - b) / (c - b)
            if c < x <= d:
                return 2 + (x - c) / (d - c)
            else:
                return 3
        return np.array([map(x) for x in w_scores])

    def f(self, x, theta):
        w_scores = ScoreWeighter.eval(x, theta, 4)
        return self.map_array(w_scores, theta)

class MapOptimizer2(MapOptimizer):
    def __init__(self):
        super().__init__(2)

    def map_array(self, w_scores, theta):
        a, b = theta[-2:]
        def map(x):
            if x <= a:
                return x / a
            if a < x <= b:
                return 1 + (x - a) / (b - a)
            else:
                return 2 + (x - b) / (10 - b)
        return np.array([map(x) for x in w_scores])

    def f(self, x, theta):
        w_scores = ScoreWeighter.eval(x, theta, 2)
        return self.map_array(w_scores, theta)

class MapOptimizer2Simple(MapOptimizer):
    def __init__(self):
        super().__init__(2)

    def map_array(self, w_scores, theta):
        a, b = theta[-2:]
        def map(x):
            if x <= a:
                return 0
            if a < x <= b:
                return 3 * (x - a) / (b - a)
            else:
                return 3
        return np.array([map(x) for x in w_scores])

    def f(self, x, theta):
        w_scores = ScoreWeighter.eval(x, theta, 2)
        return self.map_array(w_scores, theta)

class MapOptimizer2Mini(MapOptimizer):
    def __init__(self):
        super().__init__(1)

    def map_array(self, w_scores, theta):
        a = theta[-1]
        def map(x):
            b = 10 - a
            if x <= a:
                return 0
            if a < x <= b:
                return 3 * (x - a) / (b - a)
            else:
                return 3
        return np.array([map(x) for x in w_scores])

    def f(self, x, theta):
        w_scores = ScoreWeighter.eval(x, theta, 1)
        return self.map_array(w_scores, theta)

class MapOptimizer4Mini(MapOptimizer):
    def __init__(self):
        super().__init__(3)

    def map_array(self, w_scores, theta):
        a, b, c = theta[-3:]
        def map(x):
            d = 10 - a
            if x <= a:
                return 0
            if a < x <= b:
                return (x - a) / (b - a)
            if b < x <= c:
                return 1 + (x - b) / (c - b)
            if c < x <= d:
                return 2 + (x - c) / (d - c)
            else:
                return 3
        return np.array([map(x) for x in w_scores])

    def f(self, x, theta):
        w_scores = ScoreWeighter.eval(x, theta, 3)
        return self.map_array(w_scores, theta)

# Convierte una lista de diccionarios en una lista de tuplas
def get_x(gpt_dicts, criteria):
    if len(criteria) > 0:
        return [
            [gpt_dict[key] for key in criteria]
            for gpt_dict in gpt_dicts
        ]

    return [[gpt_dict['score']] for gpt_dict in gpt_dicts]

# Obtiene los parámetros óptimos para disminuir el error
def optimize_params(gpt_dicts, real_scores, criteria, eval_function):
    criteria_scores = get_x(gpt_dicts, criteria)
    if eval_function == "map4":
        params = MapOptimizer4().optimize(criteria_scores, real_scores)
    if eval_function == "map4-mini":
        params = MapOptimizer4Mini().optimize(criteria_scores, real_scores)
    if eval_function == "map2" or eval_function == "map":
        params = MapOptimizer2().optimize(criteria_scores, real_scores)
    if eval_function == "map2-simple":
        params = MapOptimizer2Simple().optimize(criteria_scores, real_scores)
    if eval_function == "map2-mini":
        params = MapOptimizer2Mini().optimize(criteria_scores, real_scores)

    return params

# Obtiene los parámetros óptimos para disminuir el error
def convert_gpt_scores(gpt_dicts, real_scores, criteria, eval_function, eval_params):
    criteria_scores = get_x(gpt_dicts, criteria)
    if eval_function == "map4":
        return MapOptimizer4().f(criteria_scores, eval_params)
    if eval_function == "map4-mini":
        return MapOptimizer4Mini().f(criteria_scores, eval_params)
    if eval_function == "map2" or eval_function == "map":
        return MapOptimizer2().f(criteria_scores, eval_params)
    if eval_function == "map2-simple":
        return MapOptimizer2Simple().f(criteria_scores, eval_params)
    if eval_function == "map2-mini":
        return MapOptimizer2Mini().f(criteria_scores, eval_params)

In [5]:
gpt_dicts = [
    {
        'score': 9
    },
    {
        'score': 8
    },
    {
        'score': 4
    }
]

real_scores = [3, 2, 1]
criteria = []
eval_function = "map4"

eval_params = optimize_params(gpt_dicts, real_scores, criteria, eval_function)
print(eval_params)
convert_gpt_scores(gpt_dicts, real_scores, criteria, eval_function, eval_params)

[1.0, 1.1720268471946058, 3.9999999999999996, 8.0, 8.972909085620861]


array([3., 2., 1.])

In [6]:
gpt_dicts = [
    {
        'relevance': 8,
        'clarity': 9,
        'precision': 7
    },
    {
        'relevance': 8,
        'clarity': 5,
        'precision': 3
    },
    {
        'relevance': 4,
        'clarity': 1,
        'precision': 2
    }
]

real_scores = [3, 2, 1]
criteria = ['relevance', 'clarity', 'precision']

eval_params = optimize_params(gpt_dicts, real_scores, criteria, "map")
print(eval_params)
convert_gpt_scores(gpt_dicts, real_scores, criteria, "map", eval_params)

[0.0003229277124999208, 0.9992691267276775, 0.00040794541936967077, 0.9601403977846292, 4.753965929960487]


array([2.80916273, 2.0469282 , 1.01086933])

## Experimentos

In [7]:
import random
from GPTEvaluator.GPTEvaluator import chat_gpt_multiple
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from openai_multi_client import OpenAIMultiClient
import openai
from google.colab import userdata
from sklearn.metrics import mean_squared_error
import os
from datetime import datetime, timedelta
import json
import numpy
import plotly.express as px
from plotly.colors import sample_colorscale
import pytz
from sklearn.metrics import mean_absolute_error, r2_score, accuracy_score, precision_score, recall_score, f1_score

openai.api_key = userdata.get('OPENAI_API_KEY')

class SetPair():
    def __init__(self, train_set, test_set):
        self.train_set = train_set
        self.test_set = test_set

# Divide el dataset en conjuntos de entrenamiento/prueba
def generate_sets(dataset, repetitions, train_set_size, test_set_size, seed, repeat_test_set, balance_train_set, test_set):
    sets = []
    random.seed(seed)

    group_size = train_set_size // 4 # Tamaño grupo para set balanceado
    proportions = df['real_eval'].value_counts(normalize=True) # Para set no balanceado
    test_samples_per_class = (proportions * test_set_size).round().astype(int)
    train_samples_per_class = (proportions * train_set_size).round().astype(int)

    for i in range(repetitions):
        if (repeat_test_set == False or i == 0) and test_set is None:
            test_set = df.groupby('real_eval', group_keys=False)[df.columns.tolist()].apply(lambda x: x.sample(test_samples_per_class[x.name], random_state=random.randint(0,100000)))

        train_set = dataset[~dataset['row'].isin(test_set['row'])]
        if balance_train_set:
            train_set = train_set.groupby('real_eval', group_keys=False)[df.columns.tolist()].apply(lambda x: x.sample(group_size, random_state=random.randint(0,100000)))
        else:
            train_set = train_set.groupby('real_eval', group_keys=False)[df.columns.tolist()].apply(lambda x: x.sample(train_samples_per_class[x.name], random_state=random.randint(0,100000)))

        sets.append(SetPair(train_set, test_set))

    return sets

# Genera las respuestas con ChatGPT
def eval_gpt(df, prompt, model, temperature):
    api = OpenAIMultiClient(endpoint="chats", data_template={"model": model, "temperature": temperature, "n": 1, "timeout":10}, concurrency=50, wait_interval=1, max_retries=3, retry_max=10, retry_multiplier=1)

    texts = []
    for i, row in df.iterrows():
        text = prompt.format(Question=row['question'], Answer=row['answer'], Context=row['context'])
        texts.append(text)

    answers_gpt = chat_gpt_multiple(api, texts)
    return answers_gpt

# Extrae diccionario de salida de las respuestas GPT
def extract_dicts(answers_gpt):
    pattern = r'\{[^{}]+\}'

    gpt_dicts = []
    for answer_gpt in answers_gpt:
        try:
            answer = re.findall(pattern, answer_gpt[0])[0]
            gpt_dicts.append(eval(answer))
        except Exception as e:
            print(f"Error al extraer diccionario. Respuesta GPT: \n{answer_gpt[0]}\n\n")
            gpt_dicts.append(None)

    return gpt_dicts

# Elimina filas del dataset donde hubo errores en la salida GPT
def clean_set(dataset, gpt_dicts, criteria):
    for i in reversed(range(len(gpt_dicts))):
        if gpt_dicts[i] is None:
            gpt_dicts.pop(i)
            dataset.drop(dataset.index[i], inplace=True)
        elif all(key in gpt_dicts[i] for key in criteria) == False:
            print(gpt_dicts[i])
            gpt_dicts.pop(i)
            dataset.drop(dataset.index[i], inplace=True)

# Obtiene los puntajes reales de un dataset
def get_real_scores(dataset):
    return dataset['real_eval'].tolist()

# Prepara el set de entrenamiento y obtiene los parámetros óptimos para disminuir el error
def train(train_set, prompt, criteria, eval_function, model, temperature):
    train_set = train_set.copy()
    answers_gpt = eval_gpt(train_set, prompt, model, temperature)
    gpt_dicts = extract_dicts(answers_gpt)
    clean_set(train_set, gpt_dicts, criteria)
    real_scores = get_real_scores(train_set)
    params = optimize_params(gpt_dicts, real_scores, criteria, eval_function)
    return np.round(params, 2)

# Prepara el set de prueba y calcula las métricas del modelo preentrenado usando el conjunto de prueba
def test(test_set, prompt, criteria, eval_function, eval_params, model, temperature):
    test_set2 = test_set.copy()
    answers_gpt = eval_gpt(test_set2, prompt, model, temperature)
    gpt_dicts = extract_dicts(answers_gpt)
    clean_set(test_set2, gpt_dicts, criteria)
    real_scores = get_real_scores(test_set2)
    pred_scores = convert_gpt_scores(gpt_dicts, real_scores, criteria, eval_function, eval_params)

    test_set2['gpt_eval'] = pred_scores
    df_dicts = pd.DataFrame(gpt_dicts)
    result_set = pd.concat([test_set2, df_dicts.set_index(test_set2.index)], axis=1)
    return result_set

# Retorna un dataset con el MSE por grupo
def calculate_mse(result_set, normalize):
    if normalize:
        mse_dict = result_set.groupby('dataset')[result_set.columns.tolist()].apply(lambda x: mean_squared_error(x['real_eval']/3, x['gpt_eval']/3)).to_dict()
        overall_mse = mean_squared_error(result_set['real_eval']/3, result_set['gpt_eval']/3)
    else:
        mse_dict = result_set.groupby('dataset')[result_set.columns.tolist()].apply(lambda x: mean_squared_error(x['real_eval'], x['gpt_eval'])).to_dict()
        overall_mse = mean_squared_error(result_set['real_eval'], result_set['gpt_eval'])

    mse_dict['All'] = overall_mse
    return mse_dict

# Muestra un gráfico con los puntajes obtenidos para cada pregunta ordenadas por puntaje real (0-3)
def show_distribution(full_df):
    rep = full_df['repetition'].nunique()
    full_df = full_df.drop(columns=['params'])

    info_cols = ['row', 'dataset', 'question', 'answer', 'context']
    extra_cols = full_df.columns.difference(['question', 'answer', 'context', 'real_eval', 'row', 'dataset', 'repetition', 'gpt_eval', 'score']).tolist()
    for col in info_cols + extra_cols:
        if full_df[col].dtype == 'object':
            full_df[col] = full_df[col].str.wrap(80).apply(lambda x: x.replace('\n', '<br>'))

    full_df = full_df.pivot(index=['question', 'answer', 'context', 'real_eval', 'row', 'dataset'], columns='repetition', values=['gpt_eval', 'score'] + extra_cols)
    full_df = full_df.sort_values('real_eval').reset_index()
    full_df.loc[full_df['dataset'] == 'C1-OscarBadAnswers20', 'dataset'] = 'C1-BadAnswers'

    value_counts = full_df['real_eval'].value_counts()
    n = np.array([value_counts.get(i, 0) for i in range(4)])
    x_pos = [x + (y+1)/(n[x]+1) for x in range(4) for y in range(n[x])]

    def plot(col, title):
        dev = full_df[col].apply(lambda row: row.std(ddof=0), axis=1).values.tolist()
        mean = full_df[col].apply(lambda row: round(row.mean(), 2), axis=1).values.tolist()

        fig = go.Figure()

        if col == 'gpt_eval':
            for i in range(4):
                fig.add_trace(go.Scatter(
                    x=[i, i + 1],
                    y=[i, i],
                    mode='lines',
                    line=dict(color='red', width=1),
                    showlegend=False,
                    hoverinfo='none'
                ))

        # Información extra
        if rep == 1:
            template_cols = info_cols + [col] + extra_cols
            customdata = list(zip(*[full_df[col] for col in info_cols], mean, *[full_df[col][1] for col in extra_cols]))
        else:
            template_cols = info_cols + [col]
            customdata = list(zip(*[full_df[col] for col in info_cols], mean))

        template = ''
        for i, x in enumerate(template_cols):
            template += f'<b>{x}:</b> %{{customdata[{i}]}}<br>'
        template += '<extra></extra>'

        # Colores por clase (dataset)
        colors = sample_colorscale(px.colors.qualitative.Plotly, [i / 3 for i in range(4)])

        # Añadir los puntos
        for n, cls in enumerate(sorted(full_df['dataset'].unique())):
            indices = full_df.index[full_df['dataset'] == cls].tolist()
            x_filtered = [x_pos[i] for i in indices]
            y_filtered = [mean[i] for i in indices]
            dev_filtered = [dev[i] for i in indices]
            customdata_filtered = [customdata[i] for i in indices]

            # Color barra de error
            color_hex = px.colors.qualitative.Plotly[n]
            r = int(color_hex[1:3], 16)
            g = int(color_hex[3:5], 16)
            b = int(color_hex[5:7], 16)
            alpha = 0.5
            color_rgba = f"rgba({r}, {g}, {b}, {alpha})"

            fig.add_trace(go.Scatter(
                x=x_filtered,
                y=y_filtered,
                mode='markers',
                marker=dict(size=8, color=color_hex),
                error_y=dict(type='data', array=dev_filtered, visible=True, thickness=2, width=4, color=color_rgba),
                customdata=customdata_filtered,
                hovertemplate=template,
                name=cls
            ))

        # Configurar el layout
        y_range = (0, 3) if col == 'gpt_eval' else (0, 10)
        fig.update_layout(
            title=title,
            title_x=0.5,
            xaxis_title='Real Eval',
            yaxis_title='GPT Eval',
            xaxis=dict(tickvals=[0, 1, 2, 3]),
            legend_title_text='Dataset',
            template='plotly_white',
            showlegend=True,
            width=1200,
            height=700,
            hoverlabel=dict(
                bgcolor="green",
                font_size=12
            )
        )

        # Mostrar el gráfico
        fig.show()
        print()

    plot('gpt_eval', 'Distribution of scores obtained by the model')
    #plot('score', 'Distribución de puntajes GPT sin normalizar')

# Exporta los resultados a un archivo EXCEL
def export_results(result_set, prompt, repetitions, eval_function, eval_params, train_set_size, test_set_size, seed, model, temperature, repeat_test_set, balance_train_set):
    dir = "Results"
    if not os.path.exists(dir):
        os.makedirs(dir)

    timezone = pytz.timezone('America/Santiago')
    date = datetime.now(timezone)
    formatted_date = date.strftime('%Y%m%d-%H%M%S')
    filename = f'{dir}/{formatted_date}.xlsx'

    metadata = {
        'prompt': prompt,
        'repetitions': repetitions,
        'eval_function': eval_function,
        'eval_params': eval_params,
        'train_set_size': train_set_size,
        'test_set_size': test_set_size,
        'seed': seed,
        'model': model,
        'temperature': temperature,
        'repeat_test_set': repeat_test_set,
        'balance_train_set': balance_train_set
    }
    md_set = pd.DataFrame.from_dict(metadata, orient='index')

    with pd.ExcelWriter(filename) as writer:
        result_set.to_excel(writer, sheet_name='Results', index=False)
        md_set.to_excel(writer, sheet_name='Metadata', index=True, header=False)
    return filename

# Evalúa una lista de prompts obtienendo el MSE promedio en M repeticiones
def experiment(dataset, prompts, repetitions, eval_function, eval_params=None, train_set_size=40, test_set_size=60, seed=42, model="gpt-4o-mini", temperature=0.1, repeat_test_set=True, balance_train_set=False, test_set=None):
    if eval_params is not None: train_set_size = 0
    sets = generate_sets(dataset, repetitions, train_set_size, test_set_size, seed, repeat_test_set, balance_train_set, test_set)
    filenames = []

    for i, prompt_data in enumerate(prompts):
        prompt = prompt_data.prompt
        criteria = prompt_data.criteria
        result_set = pd.DataFrame()

        for j in range(repetitions):
            train_set = sets[j].train_set
            test_set = sets[j].test_set

            rep_params = eval_params
            if not eval_params:
                print(f"Entrenando Prompt {i+1} con Train Set {j+1}")
                rep_params = train(train_set, prompt, criteria, eval_function, model, temperature)
                print()

            print(f"Evaluando Prompt {i+1} con Test Set {j+1}")
            rep_set = test(test_set, prompt, criteria, eval_function, rep_params, model, temperature)
            rep_set['repetition'] = j+1
            rep_set['params'] = ", ".join("{:.2f}".format(param) for param in rep_params)

            result_set = pd.concat([result_set, rep_set], ignore_index=True)
            print()

        prompt_structure = json.dumps(prompt_data.base_structure())
        filename = export_results(result_set, prompt_structure, repetitions, eval_function, eval_params, train_set_size, test_set_size, seed, model, temperature, repeat_test_set, balance_train_set)
        filenames.append(filename)

    read_results(filenames)

# Lee y muestra resultados de un archivo EXCEL
def read_results(filenames, normalize=False):
    df_mean_all = pd.DataFrame()
    df_std_all = pd.DataFrame()

    for filename in filenames:
        full_df = pd.read_excel(filename, sheet_name='Results')
        md = pd.read_excel(filename, sheet_name='Metadata', header=None, index_col=0).T.reset_index(drop=True)
        df = full_df.copy()
        df = df[['repetition', 'dataset', 'real_eval', 'gpt_eval']]
        df['rounded_gpt_eval'] = round(df['gpt_eval']).astype(int)

        mp_stats = []
        for repetition in df['repetition'].unique():
            df_rep = df[df['repetition'] == repetition]

            # Métricas de regresión
            stats = dict()

            if normalize:
                stats['overall'] = mean_squared_error(df_rep['real_eval']/3, df_rep['gpt_eval']/3)
                stats.update(df_rep.groupby('dataset')[df_rep.columns.tolist()].apply(lambda x: mean_squared_error(x['real_eval']/3, x['gpt_eval']/3)).to_dict())
                mae = mean_absolute_error(df_rep['real_eval']/3, df_rep['gpt_eval']/3)
            else:
                stats['overall'] = mean_squared_error(df_rep['real_eval'], df_rep['gpt_eval'])
                stats.update(df_rep.groupby('dataset')[df_rep.columns.tolist()].apply(lambda x: mean_squared_error(x['real_eval'], x['gpt_eval'])).to_dict())
                mae = mean_absolute_error(df_rep['real_eval'], df_rep['gpt_eval'])

            stats = {'mse_' + k: v for k, v in stats.items()}
            r2 = r2_score(df_rep['real_eval'], df_rep['gpt_eval'])

            # Métricas de clasificación: Se calculan con puntaje GPT redondeado (0-3)
            accuracy = accuracy_score(df_rep['real_eval'], df_rep['rounded_gpt_eval'])
            precision = precision_score(df_rep['real_eval'], df_rep['rounded_gpt_eval'], average='weighted', zero_division=0)
            recall = recall_score(df_rep['real_eval'], df_rep['rounded_gpt_eval'], average='weighted', zero_division=0)
            f1 = f1_score(df_rep['real_eval'], df_rep['rounded_gpt_eval'], average='weighted', zero_division=0)

            stats.update({
                'mae': mae,
                'r2': r2,
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1
            })
            mp_stats.append(stats)

        mp_stats = pd.DataFrame(mp_stats)
        df_mean = mp_stats.apply(['mean'])
        df_std = mp_stats.apply([np.std])

        df_mean.insert(0, 'prompt', md['prompt'].iloc[0])
        df_std.insert(0, 'prompt', md['prompt'].iloc[0])
        df_mean_all = pd.concat([df_mean_all, df_mean], ignore_index=True)
        df_std_all = pd.concat([df_std_all, df_std], ignore_index=True)

    pd.set_option('display.max_columns', None)
    pd.set_option('display.max_colwidth', None)

    print("\nTabla Promedios")
    display(df_mean_all)
    print("\nTabla Desviación estándar")
    display(df_std_all)
    pd.reset_option('^display.', silent=True)

    repeat_test_set = md['repeat_test_set'].iloc[0]
    repetitions = md['repetitions'].iloc[0]
    if len(filenames) == 1 and (repeat_test_set or repetitions == 1):
        print()
        show_distribution(full_df)

# Espacio para experimentos

In [8]:
column_data = {
    "context": "Contexto detallado",
    "question": "Pregunta",
    "answer": "Respuesta",
    "real_eval": "Promedio Redondeado",
    "dataset": "DataSet"
}

df = load_dataset("datasets_v2.xlsx", "AllDatasets (1dif)", column_data)

Unnamed: 0,context,question,answer,real_eval,dataset,row
0,BFS es preferible en problemas donde se busca ...,¿En qué tipo de problemas una búsqueda BFS pod...,El bfs es mucho mas util en ejecuciones cortas...,2,C3-Sample100,2
1,BFS es preferible en problemas donde se busca ...,¿En qué tipo de problemas una búsqueda BFS pod...,en una situación de resolución de un problema ...,2,C3-Sample100,3
2,BFS es preferible en problemas donde se busca ...,¿En qué tipo de problemas una búsqueda BFS pod...,en problemas donde pidan obtener el camino de ...,3,C3-Sample100,4
3,BFS es preferible en problemas donde se busca ...,¿En qué tipo de problemas una búsqueda BFS pod...,en problemas de grafos no ponderados ya que no...,3,C3-Sample100,5
4,La principal diferencia entre BFS y DFS radica...,¿Cuál es la principal diferencia entre búsqued...,La búsqueda por anchura tiene un procedimiento...,3,C3-Sample100,6



real_eval
3    127
2     75
0     52
1     36
Name: count, dtype: int64

dataset
C2-Nan                  92
C3-Sample100            91
C2-Sample100            90
C1-OscarBadAnswers20    17
Name: count, dtype: int64


In [9]:
prompt_data = {
    "examples_basic": "examples_0G4B_basic.txt",
    "context": "context.txt",
    "question": "question.txt",
    "answer": "answer.txt",
    "instructions": {
        "score": "score_single.txt",
        "feedback": "feedback_minimal.txt",
        "analysis": "analysis_minimal.txt"
    }
}

prompt_folder = "GPTEvaluator/Experiments/Miniprompts_v2"

prompts = generate_prompts(prompt_data, prompt_folder, key_pos='after')

### Examples
**Question**: ¿Cuando se recomienda utilizar arreglos en vez de listas enlazadas? Haga referencia a complejidades temporales en su explicación.
**Student's Answer**: Un arreglo es recomendable en determinadas situaciones, mientras que la lista enlazada en otras.
**Score**: 0

**Question**: ¿Cuál es la complejidad temporal del peor caso para la operación de búsqueda en una tabla hash y por qué? Describe las condiciones que debe tener la tabla para encontrarse en este peor caso.
**Student's Answer**: La complejidad del peor caso es ocurre cuando la tabla es inefectiva para realizar las operación de búsqueda.
**Score**: 0

**Question**: ¿Cómo se podría implementar un historial de navegación web usando dos pilas? El historial debe permitir ir hacia atrás y adelante con las páginas previamente visitadas. Describa un algoritmo.
**Student's Answer**: Usamos dos pilas para ir hacia adelante y hacia atrás en el historial.
**Score**: 0

**Question**: ¿Por qué el acceso a una posició

In [10]:
x = experiment(df, prompts, repetitions=1, eval_function="map2-simple", eval_params=[1,1,9], train_set_size=40, test_set_size=60, seed=42, model="gpt-4o-mini", temperature=0.1, repeat_test_set=False, balance_train_set=False)

Evaluando Prompt 1 con Test Set 1
37-4-15-0-34-2-1-6-7-14-17-12-16-9-47-36-10-24-11-25-8-13-18-21-5-45-43-3-28-48-23-32-39-44-38-19-26-35-40-42-46-31-33-49-27-41-20-22-30-29-50-52-51-54-56-58-53-57-59-55-

Tabla Promedios


Unnamed: 0,prompt,mse_overall,mse_C1-OscarBadAnswers20,mse_C2-Nan,mse_C2-Sample100,mse_C3-Sample100,mae,r2,accuracy,precision,recall,f1
0,"{""examples_basic"": ""examples_0G4B_basic.txt"", ""context"": ""context.txt"", ""question"": ""question.txt"", ""answer"": ""answer.txt"", ""instructions"": {""score"": ""score_single.txt"", ""feedback"": ""feedback_minimal.txt"", ""analysis"": ""analysis_minimal.txt""}}",0.619271,0.678571,1.00372,0.358553,0.347356,0.620833,0.516509,0.566667,0.616775,0.566667,0.57153



Tabla Desviación estándar


Unnamed: 0,prompt,mse_overall,mse_C1-OscarBadAnswers20,mse_C2-Nan,mse_C2-Sample100,mse_C3-Sample100,mae,r2,accuracy,precision,recall,f1
0,"{""examples_basic"": ""examples_0G4B_basic.txt"", ""context"": ""context.txt"", ""question"": ""question.txt"", ""answer"": ""answer.txt"", ""instructions"": {""score"": ""score_single.txt"", ""feedback"": ""feedback_minimal.txt"", ""analysis"": ""analysis_minimal.txt""}}",0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0








In [11]:
!zip -r results.zip Results/

  adding: Results/ (stored 0%)
  adding: Results/20250413-182106.xlsx (deflated 1%)
