In [1]:
import os
import sys

# Project origin
sys.path.append(os.path.abspath(".."))

#### Imports

In [2]:
import time
import json
import random
import tiktoken
import pandas as pd

from enum import Enum
from time import sleep
from dotenv import load_dotenv
from langchain_mistralai import ChatMistralAI
from langchain_core.prompts import  PromptTemplate
from langchain_core.output_parsers import  StrOutputParser

from prompts.general_prompts import FEW_SHOT_PROMPT, ZERO_SHOT_PROMPT

load_dotenv()
# os.environ["MISTRAL_API_KEY"]

True

#### Load data and enviroment

In [3]:
with open("../../dataset/llms/llm.json", "r", encoding="utf-8") as f:
    data = json.load(f)
    
with open("../../dataset/test.json", "r", encoding="utf-8") as f:
    test_data = json.load(f)

print(data[0:2])
print(test_data[0:2])

[{'intent': 'accept_reservations', 'text': 'let me know if grub burger takes reservations'}, {'intent': 'account_blocked', 'text': 'are there any problems with my bank account'}]
[{'text': 'does village inn let you make reservations', 'intent': 'accept_reservations'}, {'text': 'can i make a reservation at chima steakhouse in chicago', 'intent': 'accept_reservations'}]


### Prompt config and utils

In [16]:
def few_shot(user_input):
    intents = sorted(set(f"- {item['intent']}" for item in data))
    intents_str = "\n".join(intents)

    examples_str = "\n\n".join(
        [f'Phrase: "{item["text"]}"\nIntent: {item["intent"]}' for item in data[:5]]
    )

    prompt_final = FEW_SHOT_PROMPT.format(
        intents=intents_str, examples=examples_str, user_input=user_input
    )
    return prompt_final


def zero_shot(user_input):
    intents = sorted(set(f"- {item['intent']}" for item in data))
    intents_str = "\n".join(intents)

    return ZERO_SHOT_PROMPT.format(intents=intents_str, user_input=user_input)

In [5]:
def count_tokens(input):
    encoder = tiktoken.get_encoding("cl100k_base")
    num_tokens = len(encoder.encode(input))

    return num_tokens

#### Configure LLMs and Evaluation Method

In [6]:
class MistralModels(str, Enum):
    SMALL_LATEST = "mistral-small-latest"
    MEDIUM_LATEST = "mistral-medium-latest"
    LARGE_LATEST = "mistral-large-latest"

class PrompType(str, Enum):
    FEW_SHOT = "few-shot"
    ZERO_SHOT = "zero-shot"
    
llm = ChatMistralAI(
    model=MistralModels.SMALL_LATEST.value,
    temperature=0.0,
)

prompt_template = PromptTemplate.from_template("{message}")
output_parser = StrOutputParser()

chain = prompt_template | llm | output_parser

In [7]:
def safe_invoke(chain, message, retries=3, base_wait=2.5):
    for attempt in range(retries):
        try:
            return chain.invoke({"message": message})
        except Exception as e:
            if attempt < retries - 1:
                wait_time = base_wait + random.uniform(0, 0.5)
                print(f"Error: {e}. Retrying in {wait_time:.2f}s...")
                time.sleep(wait_time)
            else:
                print("Failed after retries:", e)
                return "ERROR"


In [None]:
# def run_evaluation(
#     test_data: dict,
#     mode: PrompType = PrompType.FEW_SHOT,
#     sleep_time: float = 2.5,
#     verbose: bool = False,
#     save_path: str = None  # checkpoint
# ):
#     results = []

#     handler_mode: PrompType = {
#         PrompType.FEW_SHOT: few_shot,
#         PrompType.ZERO_SHOT: zero_shot,
#     }

#     if mode not in handler_mode:
#         raise ValueError("Mode must be 'few-shot' or 'zero-shot'")

#     total_start = time.time()

#     for i, item in enumerate(test_data):
#         try:
#             start_time = time.time()
#             sleep(sleep_time)
#             user_input = item["text"]
#             expected_intent = item["intent"]

#             prompt = handler_mode[mode](user_input)
#             predicted_intent = safe_invoke(chain, prompt)
#             duration = time.time() - start_time

#             results.append(
#                 {
#                     "text": user_input,
#                     "expected": expected_intent,
#                     "predicted": predicted_intent.strip(),
#                     "latency_sec": round(duration, 3)
#                 }
#             )

#             if verbose:
#                 print(f"[{i+1}/{len(test_data)}] Latency: {round(duration, 2)}s")

#             # checkpoint partial
#             if save_path:
#                 df_partial = pd.DataFrame(results)
#                 df_partial.to_csv(f"{save_path}_partial.csv", index=False)

#         except Exception as e:
#             print(f"Error to process índex {i}: {e}")
#             break 

#     total_duration = time.time() - total_start
#     avg_latency = total_duration / len(results) if results else 0
#     throughput = len(results) / total_duration if total_duration > 0 else 0

#     stats = {
#         "total_time_sec": round(total_duration, 3),
#         "avg_latency_sec": round(avg_latency, 3),
#         "throughput_samples_per_sec": round(throughput, 3),
#     }

#     if save_path:
#         pd.DataFrame(results).to_csv(f"{save_path}_final.csv", index=False)
#         with open(f"{save_path}_stats.json", "w") as f:
#             json.dump(stats, f, indent=2)

#     return results, stats


In [None]:
from datetime import timedelta

def load_partial_results(save_path):
    partial_path = f"{save_path}_partial.csv"
    if os.path.exists(partial_path):
        df = pd.read_csv(partial_path)
        texts_done = set(df["text"].tolist())
        return df.to_dict(orient="records"), texts_done
    return [], set()


def run_evaluation_robust(
    test_data: list,
    mode: PrompType = PrompType.FEW_SHOT,
    sleep_time: float = 2.5,
    verbose: bool = False,
    save_path: str = None
):
    results, already_processed = [], set()
    errors = []

    if save_path:
        results, already_processed = load_partial_results(save_path)

    handler_mode = {
        PrompType.FEW_SHOT: few_shot,
        PrompType.ZERO_SHOT: zero_shot,
    }

    if mode not in handler_mode:
        raise ValueError("Mode must be 'few-shot' or 'zero-shot'")

    total_start = time.time()
    total = len(test_data)

    for i, item in enumerate(test_data):
        user_input = item["text"]

        if user_input in already_processed:
            if verbose:
                print(f"[{i+1}/{total}] ⏩ Skipped")
            continue

        try:
            sleep(sleep_time)
            iter_start = time.time()
            
            prompt = handler_mode[mode](user_input)
            predicted_intent = safe_invoke(chain, prompt)
            duration = time.time() - iter_start

            result = {
                "text": user_input,
                "expected": item["intent"],
                "predicted": predicted_intent.strip() if predicted_intent != "ERROR" else "ERROR",
                "latency_sec": round(duration, 3)
            }
            results.append(result)

            if verbose:
                elapsed = time.time() - total_start
                percent = 100 * (i + 1) / total
                eta = timedelta(seconds=int((elapsed / (i + 1)) * (total - (i + 1))))
                print(f"[{i+1}/{total}] ✅ {percent:.2f}% - {round(duration, 2)}s - ETA: {eta}")

            if save_path:
                pd.DataFrame(results).to_csv(f"{save_path}_partial.csv", index=False)

        except Exception as e:
            print(f"[{i+1}/{total}] ❌ Error: {e}")
            errors.append(item)

            results.append({
                "text": user_input,
                "expected": item["intent"],
                "predicted": "ERROR",
                "latency_sec": 0
            })

            if save_path:
                pd.DataFrame(results).to_csv(f"{save_path}_partial.csv", index=False)
                with open(f"{save_path}_error.json", "w", encoding="utf-8") as f:
                    json.dump(errors, f, indent=2, ensure_ascii=False)

    total_duration = time.time() - total_start
    n_responses = len(results)

    if not n_responses:
        stats = {
            "total_time_sec": 0,
            "avg_latency_sec_inference": 0,
            "throughput_inference": 0,
            "throughput_pipeline": 0,
        }
    else:
        sum_latencies = sum(r["latency_sec"] for r in results)
        avg_latency_inference = sum_latencies / n_responses
        throughput_inference = n_responses / sum_latencies if sum_latencies else 0
        throughput_pipeline = n_responses / total_duration if total_duration else 0

        stats = {
            "total_time_sec": round(total_duration, 3),
            "avg_latency_sec_inference": round(avg_latency_inference, 3),
            "throughput_inference": round(throughput_inference, 3),
            "throughput_pipeline": round(throughput_pipeline, 3),
        }

    if save_path:
        pd.DataFrame(results).to_csv(f"{save_path}_final.csv", index=False)
        with open(f"{save_path}_stats.json", "w", encoding="utf-8") as f:
            json.dump(stats, f, indent=2, ensure_ascii=False)
        if errors:
            with open(f"{save_path}_error.json", "w", encoding="utf-8") as f:
                json.dump(errors, f, indent=2, ensure_ascii=False)

    return results, stats


In [None]:
test_data[100]

{'text': 'do you know what the latest is with my credit card application',
 'intent': 'application_status'}

#### Count Tokens

In [33]:
user_input = test_data[100]

few_shot_prompt = few_shot(user_input)
zero_shot_prompt = zero_shot(user_input)

test_data_len = len(test_data)  # 4500

print(f"Tokens per few-shot {count_tokens(few_shot_prompt)}")
print(f"Total Tokens few-shot {count_tokens(few_shot_prompt) * test_data_len}")
print(f"Tokens per zero-shot {count_tokens(zero_shot_prompt)}")
print(f"Total Tokens zero-shot {count_tokens(zero_shot_prompt) * test_data_len}")

Tokens per few-shot 810
Total Tokens few-shot 3645000
Tokens per zero-shot 719
Total Tokens zero-shot 3235500


#### Calculate and Export LLM Results

In [28]:
LLM_BASE_PATH = "../../results/llm"

# zero_results, zero_stats = run_evaluation(
#     test_data=test_data[:10], # Change to full dataset
#     mode=PrompType.ZERO_SHOT,
#     sleep_time=2.5,
#     verbose=False,
#     save_path=f"{LLM_BASE_PATH}/zero_shot",
# )

# print("Zero-shot stats:", zero_stats)

In [53]:
few_results, few_stats = run_evaluation_robust(
    test_data=test_data[:5], # Change to full dataset
    mode=PrompType.FEW_SHOT,
    sleep_time=2.5,
    verbose=False,
    save_path=f"{LLM_BASE_PATH}/few_shot",
)

print("Few-shot stats:", few_stats)

Few-shot stats: {'total_time_sec': 15.002, 'avg_latency_sec_inference': 0.499, 'throughput_inference': 2.003, 'throughput_pipeline': 0.333}
