<a href="https://colab.research.google.com/github/claudia-limon-maestria-ciencias/maestria-ciencias/blob/main/langchain_model_comparison.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install python-dotenv
!pip install langchain-openai
!pip install langchain-community
!pip install langchain-anthropic
!pip install langchain-google-genai
!pip install rouge-score
!pip install pypdf
!pip install chromadb
!pip install unstructured
!pip install pandas numpy tabulate sentence-transformers nltk scikit-learn openpyxl
!pip install rouge-score

In [None]:

from google.colab import drive
drive.mount('/content/drive')

In [2]:
import os

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

In [4]:
documents = [
    {"path": "/content/drive/MyDrive/articulo_langchain/codigompalags_libro_sexto.pdf", "format": "pdf"},
    {"path": "/content/drive/MyDrive/articulo_langchain/codigompalags_librosexto_txt4.txt", "format": "txt"}
]

# File where we will save the texts of the chunks (Matches variable in refactored code)
CHUNKS_TEXTS_FILE = 'chunks_texts.json'

# File where we will save the names of the created collections (Matches variable in refactored code)
COLLECTIONS_FILE = 'created_collections.json'

# Embedding types (Matches variable in refactored code)
embedding_types = ["openai", "google"]

In [None]:
import os
import json
import time
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma

retrievers = {}
created_collections = {} # Start with empty dict, will be populated
all_chunks_data = {}     # Start with empty dict, will be populated

if os.path.exists(COLLECTIONS_FILE):
    try:
        with open(COLLECTIONS_FILE, 'r', encoding='utf-8') as f:
            created_collections = json.load(f)
        print(f"Loaded {len(created_collections)} existing collection names from '{COLLECTIONS_FILE}' (will be updated).")
    except Exception as e:
        print(f"Warning: Could not read existing collections file '{COLLECTIONS_FILE}': {e}. Starting fresh.")
        created_collections = {}

if os.path.exists(CHUNKS_TEXTS_FILE):
    try:
        with open(CHUNKS_TEXTS_FILE, 'r', encoding='utf-8') as f:
             all_chunks_data = json.load(f)
        print(f"Loaded existing chunk data for {len(all_chunks_data)} collections from '{CHUNKS_TEXTS_FILE}' (will be updated).")
    except Exception as e:
        print(f"Warning: Could not read existing chunks file '{CHUNKS_TEXTS_FILE}': {e}. Starting fresh.")
        all_chunks_data = {}
# --- End of setup ---


# --- Main Processing Loop ---
for document in documents:
    # Validate path existence
    doc_path = document.get("path", "")
    if not os.path.exists(doc_path):
        print(f"Error: The path does not exist or was not specified for '{doc_path}'. Skipping.")
        continue
    doc_name = os.path.basename(doc_path)

    for embedding_type in embedding_types:
        # Unique key for this retriever (still useful for the retrievers dictionary)
        key = f"{doc_name}_{embedding_type}"
        print(f"\n--- Processing: {key} ---")

        # --- Initialize embeddings ---
        print(f"Initializing {embedding_type} embeddings...")
        embeddings = None # Initialize
        try:
            if embedding_type == "openai":
                embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
            elif embedding_type == "google":
                embeddings = GoogleGenerativeAIEmbeddings(
                    model="models/embedding-001",
                    api_key=GOOGLE_API_KEY
                )
            else:
                 print(f"⚠️ Warning: Unknown embedding type '{embedding_type}'. Skipping.")
                 continue # Skip to next embedding type if unknown
            if embeddings is None: # Should not happen if type is known, but safety check
                 print(f"Error: Failed to initialize embeddings for {embedding_type}. Skipping.")
                 continue
        except Exception as e:
            print(f"Error initializing embeddings for {embedding_type}: {e}. Skipping.")
            continue
        # --- End embedding initialization ---

        vector_store = None # Initialize vector_store variable for this iteration

        # Create New Vector Database
        try:
            print(f"Attempting to create new vector database for {key}...")

            # Create a unique name for this ChromaDB collection (with timestamp)
            safe_doc_name = "".join(c if c.isalnum() or c in ['-', '_'] else '_' for c in doc_name)
            # Ensure collection name is valid for Chroma
            collection_name = f"{safe_doc_name}_{embedding_type}_{int(time.time())}"
            # Basic Chroma name validation (adjust if needed based on Chroma docs)
            collection_name = collection_name.replace('..', '_') # Avoid consecutive dots
            if not (3 <= len(collection_name) <= 63):
                 collection_name = f"col_{int(time.time())}" # Fallback name if length is wrong
                 print(f"Generated collection name was invalid, using fallback: {collection_name}")
            print(f"ChromaDB collection name will be: {collection_name}")


            # Load document according to format
            print(f"Loading document: {doc_path}")
            loader = None
            if doc_path.lower().endswith('.pdf'):
                loader = PyPDFLoader(doc_path)
            elif doc_path.lower().endswith('.txt'):
                loader = TextLoader(doc_path, encoding='utf-8')
            else:
                print(f"Unsupported format for {doc_path}. Skipping creation.")
                continue # Skip this embedding type

            loaded_documents = loader.load()
            if not loaded_documents:
                print(f"Warning: No documents were loaded from {doc_path}. Skipping creation.")
                continue

            # Split into chunks
            print("   Splitting into chunks...")
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=1500,
                chunk_overlap=200
            )
            chunks = splitter.split_documents(loaded_documents)
            if not chunks:
                print(f"Warning: No chunks were generated for {doc_path}. Skipping creation.")
                continue
            print(f"   Document split into {len(chunks)} chunks.")

            # --- Save Chunk Texts ---
            print(f"Saving chunk texts in '{CHUNKS_TEXTS_FILE}'...")
            chunk_texts = [chunk.page_content for chunk in chunks]
            all_chunks_data[collection_name] = chunk_texts # Use collection_name as key
            try:
                with open(CHUNKS_TEXTS_FILE, 'w', encoding='utf-8') as f:
                    json.dump(all_chunks_data, f, indent=2, ensure_ascii=False)
                print(f"Chunk texts saved successfully for {collection_name}.")
            except Exception as e_json_save:
                print(f"Error saving chunk texts to JSON: {e_json_save}")
            # --- End Save Chunk Texts ---

            # --- Create Chroma vector database ---
            print(f"   Creating ChromaDB vector database '{collection_name}'...")
            vector_store = Chroma.from_documents(
                 chunks,
                 embeddings,
                 collection_name=collection_name
            )

            # --- Save collection name to registry ---
            created_collections[key] = collection_name # Map the original key to the new name
            try:
                with open(COLLECTIONS_FILE, 'w', encoding='utf-8') as f:
                    json.dump(created_collections, f, indent=4)
                print(f"Collection name '{collection_name}' registered for key '{key}'.")
            except Exception as e_json_reg:
                    print(f"Error saving collection registry JSON: {e_json_reg}")
            # --- End Save Collection Name ---

            print(f"Vector database '{collection_name}' created successfully.")

        except Exception as e:
            print(f"Fatal error during creation process for {key}: {str(e)}")
            # vector_store remains None if creation failed
            continue # Skip to next iteration if creation failed
        # --- End Creation Block ---

        # --- Create retriever if vector_store was successfully created ---
        if vector_store:
            try:
                 retrievers[key] = vector_store.as_retriever(search_kwargs={"k": 5})
                 print(f"Retriever for '{key}' ready.")
            except Exception as e:
                 print(f"Error creating retriever for '{key}' from vector store: {e}")
        else:
            # This message should now only appear if the creation block had an error
            print(f"No vector store was successfully created for '{key}', cannot create retriever.")
        # --- End retriever creation ---

# --- Final Summary ---
print(f"\n--- Final Summary ---")
print(f"Collection registry file: {COLLECTIONS_FILE}")
print(f"Chunk texts file: {CHUNKS_TEXTS_FILE}")
print(f"{len(retrievers)} retrievers are ready in memory (for the current session).")
print(f"{len(created_collections)} collections are known (according to collections JSON).")
print(f"Text data exists for {len(all_chunks_data)} collections (according to chunks JSON).")
# --- End Final Summary ---

In [None]:
# --- NLP & Similarity ---
from sentence_transformers import SentenceTransformer, util
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import nltk
try:
    nltk.data.find('tokenizers/punkt')
except LookupError: # Changed to catch LookupError
    print("Descargando recursos de NLTK (punkt)...")
    nltk.download('punkt', quiet=True)

print("Imported libraries NLTK verified.")

In [None]:
import json
import os
import pandas as pd
import numpy as np
from langchain_openai import OpenAIEmbeddings
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma
from sentence_transformers import SentenceTransformer, util
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity as sklearn_cosine_similarity
import time

# Evaluation Questions
questions = [
    "¿Cuáles son las categorías de superficie para las Tiendas de Productos Básicos según la clasificación?",
    "¿En qué categorías se clasifican los centros de Educación Superior según su capacidad de ocupantes?",
    "¿Cómo se clasifican las Torres, Antenas y Chimeneas?",
    "¿Cuales son las especificaciones para una recámara principal?",
    "¿Como se conforma el grupo de Deportes y Recreación?",
    "¿Cuántas camas o consultorios se permiten como máximo para los Hospitales?",
    "¿Cuál es el ancho mínimo requerido para el acceso principal en edificaciones de tipo Habitación?",
    "¿Cuáles son los requisitos mínimos para ventilación e iluminación en espacios habitables?",
    "¿Qué longitud mínima deben tener las áreas de espera techadas en los estacionamientos?",
    "¿Cuáles son las dimensiones mínimas para un inodoro en un baño publico?",
    "¿Qué ancho mínimo deben tener las escaleras en edificaciones comerciales?",
    "¿Cuáles son los requisitos para rampas de accesibilidad?"
]

correct_answers = [
    "Hasta 250 m² y más de 250 m²",
    "Hasta 250 ocupantes y más de 250 ocupantes",
    "Hasta 8 m de altura, de 8 m hasta 30 m de altura, y más de 30 m de altura",
    "La recámara principal requiere un área mínima de 7.00 m², con un lado mínimo de 2.50 m y una altura de 2.30 m",
    "lienzos charros, canchas y centros deportivos, estadios, albercas, plazas de toros, billares, juegos electrónicos o de mesa, hipódromos, autódromos, pistas de patinaje y equitación, y campos de tiro",
    "Más de 10 camas o consultorios",
    "0.90 metros",
    "Los espacios habitables deben tener iluminación y ventilación natural por medio de vanos con área mínima del 17.5% y 5% de la superficie del espacio, respectivamente",
    "6 metros",
    "0.75 mts de frente y 1.10 mts de fondo.",
    "1.20 metros de ancho mínimo",
    "Las rampas peatonales deben tener un ancho mínimo de 1.00 m y pendiente máxima de 10%"
]


# Load necessary data
def load_json(filepath):
    """Loads a JSON file with error handling"""
    if os.path.exists(filepath):
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            print(f"Error loading {filepath}: {e}")
    print(f"File {filepath} not found")
    return {}

# Calculate similarity between response and ground truth
def calculate_similarity(response, ground_truth, similarity_model=None, rouge_evaluator=None):
    """Calculates similarity between response and ground truth"""
    metrics = {'cosine_similarity': 0.0, 'rougeL_f': 0.0}

    # Validate inputs
    response = str(response) if response else ""
    ground_truth = str(ground_truth) if ground_truth else ""

    if not response or not ground_truth or not similarity_model:
        return metrics

    # Cosine similarity
    try:
        response_embedding = similarity_model.encode([response], convert_to_tensor=True)
        ground_truth_embedding = similarity_model.encode([ground_truth], convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(response_embedding, ground_truth_embedding)[0]
        metrics['cosine_similarity'] = float(cos_scores.item())
    except Exception as e:
        print(f"Error in cosine similarity: {e}")

    # ROUGE-L
    if rouge_evaluator:
        try:
            scores = rouge_evaluator.score(ground_truth, response)
            metrics['rougeL_f'] = float(scores['rougeL'].fmeasure)
        except Exception as e:
            print(f"Error in ROUGE: {e}")

    return metrics

# Corrected TF-IDF Search
def search_by_tfidf(query, text_chunks):
    """Performs TF-IDF search on chunks"""
    if not text_chunks or not query:
        return "No data for search", 0.0

    text_chunks_str = [str(chunk) for chunk in text_chunks if chunk]
    query_str = str(query)

    try:
        # Ensure there is text to process
        if not text_chunks_str:
            return "No valid chunks", 0.0

        # Create vectorizer without stop words (for higher precision)
        vectorizer = TfidfVectorizer()

        # Try to fit the vectorizer
        try:
            tfidf_chunks = vectorizer.fit_transform(text_chunks_str)
        except ValueError:
            # If it fails, try with a simpler vectorizer
            vectorizer = TfidfVectorizer(lowercase=True, analyzer='word')
            tfidf_chunks = vectorizer.fit_transform(text_chunks_str)

        # Transform the query
        tfidf_query = vectorizer.transform([query_str])

        # Calculate similarities
        similarities = sklearn_cosine_similarity(tfidf_query, tfidf_chunks)[0]

        # Verify that similarities were calculated
        if similarities.size == 0:
            return "Similarities were not calculated", 0.0

        # Get the best result
        best_index = np.argmax(similarities)
        best_score = similarities[best_index]

        # Return the full result
        return text_chunks_str[best_index], float(best_score)

    except Exception as e:
        print(f"Detailed error in TF-IDF: {type(e).__name__}: {e}")
        return f"TF-IDF Error: {type(e).__name__}", 0.0

# Function to determine the source (PDF/TXT) from the collection name
def determine_source(collection_name, collection_key):
    """Determines if the source is PDF or TXT based on the collection name and key"""
    collection_name = collection_name.lower()
    collection_key = collection_key.lower()

    # First, try using the collection name
    if "pdf" in collection_name:
        return "PDF"
    elif "text" in collection_name or "txt" in collection_name:
        return "TXT"

    # If not found in the name, try with the key
    if "pdf" in collection_key:
        return "PDF"
    elif "text" in collection_key or "txt" in collection_key:
        return "TXT"

    return "Unknown"

# Main function to evaluate models
def evaluate_models_per_db():
    """Evaluates all models for each created database"""
    print("Starting evaluation of models per database...")

    # Define files to use
    COLLECTIONS_FILE = "created_collections.json"
    TEXT_CHUNKS_FILE = "chunks_texts.json"

    # Load necessary data
    collections = load_json(COLLECTIONS_FILE)
    text_chunks_data = load_json(TEXT_CHUNKS_FILE)

    if not collections:
        print("No collections to evaluate")
        return

    # Initialize similarity model and ROUGE
    similarity_model = None
    try:
        similarity_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        print("Similarity model initialized")
    except Exception as e:
        print(f"Error initializing similarity model: {e}")
        return

    rouge_evaluator = None
    try:
        from rouge_score import rouge_scorer
        rouge_evaluator = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
        print("ROUGE evaluator initialized")
    except Exception as e:
        print(f"Rouge not available: {e}")

    # Prepare results
    results = []

    # Evaluate each collection
    for key, collection_name in collections.items():
        print(f"\nEvaluating collection: {key} -> {collection_name}")

        # Check if we have the texts for this collection
        if collection_name not in text_chunks_data:
            print(f"No texts found for collection {collection_name}")
            continue

        chunks = text_chunks_data[collection_name]
        print(f"  Found {len(chunks)} chunks to evaluate")

        # Determine embedding type
        if "_openai" in key:
            embedding_type = "openai"
            print("  Using OpenAI embeddings")
        elif "_google" in key:
            embedding_type = "google"
            print("  Using Google embeddings")
        else:
            print(f"Cannot determine embedding type for {key}")
            continue

        # Determine source (PDF or TXT) from name and key
        source = determine_source(collection_name, key)
        print(f"  Source: {source}")

        # Evaluate questions
        for i, query in enumerate(questions):
            print(f"\n  Question {i+1}: {query}")
            print(f"  Expected answer: {correct_answers[i]}")

            # TF-IDF method (keyword-based)
            start_time = time.time()
            response_tfidf, score_tfidf = search_by_tfidf(query, chunks)
            time_tfidf = time.time() - start_time

            print(f"  • TF-IDF (score: {score_tfidf:.4f}, time: {time_tfidf:.2f}s)")
            print(f"    TF-IDF Response:")
            print(f"    '{response_tfidf}'")

            # Evaluate similarity with the correct answer
            sim_tfidf = calculate_similarity(response_tfidf, correct_answers[i],
                                             similarity_model, rouge_evaluator)

            print(f"TF-IDF Metrics: CosSim={sim_tfidf['cosine_similarity']:.4f}, RougeL={sim_tfidf['rougeL_f']:.4f}")

            # 2. Embeddings + vectorstore method
            try:
                # Create embeddings
                if embedding_type == "openai":
                    embeddings = OpenAIEmbeddings(api_key=os.environ.get("OPENAI_API_KEY"))
                else:  # google
                    embeddings = GoogleGenerativeAIEmbeddings(
                        model="models/embedding-001",
                        api_key=os.environ.get("GOOGLE_API_KEY")
                    )

                # Load vectorstore
                vector_store = Chroma(
                    collection_name=collection_name,
                    embedding_function=embeddings
                )

                # Perform vector search
                start_time = time.time()
                vector_results = vector_store.similarity_search_with_score(query, k=1)
                time_vector = time.time() - start_time

                if vector_results:
                    doc, score_vector = vector_results[0]
                    response_vector = doc.page_content
                    print(f"  • Vector (score: {score_vector:.4f}, time: {time_vector:.2f}s)")
                    print(f"    Vector Response:")
                    print(f"    '{response_vector}'")

                    # Evaluate similarity with the correct answer
                    sim_vector = calculate_similarity(response_vector, correct_answers[i],
                                                      similarity_model, rouge_evaluator)

                    print(f"    Vector Metrics: CosSim={sim_vector['cosine_similarity']:.4f}, RougeL={sim_vector['rougeL_f']:.4f}")
                else:
                    response_vector = "No results found"
                    score_vector = 0.0
                    sim_vector = {'cosine_similarity': 0.0, 'rougeL_f': 0.0}
                    print("    No results found with vectorstore")

                # Save result
                results.append({
                    'Collection': collection_name,
                    'Embedding_Type': embedding_type,
                    'Source': source,
                    'Question': query,
                    'Correct_Answer': correct_answers[i],
                    'Response_TFIDF': response_tfidf,
                    'Score_TFIDF': score_tfidf,
                    'Time_TFIDF': time_tfidf,
                    'CosSim_TFIDF': sim_tfidf['cosine_similarity'],
                    'RougeL_TFIDF': sim_tfidf['rougeL_f'],
                    'Response_Vector': response_vector,
                    'Score_Vector': float(score_vector),
                    'Time_Vector': time_vector,
                    'CosSim_Vector': sim_vector['cosine_similarity'],
                    'RougeL_Vector': sim_vector['rougeL_f']
                })

            except Exception as e:
                print(f"Error evaluating with vectorstore: {e}")

                # Save partial result
                results.append({
                    'Collection': collection_name,
                    'Embedding_Type': embedding_type,
                    'Source': source,
                    'Question': query,
                    'Correct_Answer': correct_answers[i],
                    'Response_TFIDF': response_tfidf,
                    'Score_TFIDF': score_tfidf,
                    'Time_TFIDF': time_tfidf,
                    'CosSim_TFIDF': sim_tfidf['cosine_similarity'],
                    'RougeL_TFIDF': sim_tfidf['rougeL_f'],
                    'Response_Vector': "Error",
                    'Score_Vector': 0.0,
                    'Time_Vector': 0.0,
                    'CosSim_Vector': 0.0,
                    'RougeL_Vector': 0.0
                })

    # Convert to DataFrame and save results
    if results:
        df = pd.DataFrame(results)
        df_csv = df.copy()
        df_csv.to_csv('model_evaluation.csv', index=False)

        # Save the entire complete DataFrame as pickle to preserve full data
        df.to_pickle('model_evaluation_complete.pkl')

        print("\nEvaluation completed.")
        print("  • Results saved in 'model_evaluation.csv'")
        print("  • Complete data saved in 'model_evaluation_complete.pkl'")

        # Show summary
        print("\nRESULTS SUMMARY:")
        print(f"Total evaluations: {len(results)}")

        # Averages by embedding type
        print("\nAverage by embedding type:")
        for e_type in df['Embedding_Type'].unique():
            subset = df[df['Embedding_Type'] == e_type]
            print(f"  • {e_type}:")
            print(f"    - TF-IDF: CosSim={subset['CosSim_TFIDF'].mean():.4f}, RougeL={subset['RougeL_TFIDF'].mean():.4f}")
            print(f"    - Vector: CosSim={subset['CosSim_Vector'].mean():.4f}, RougeL={subset['RougeL_Vector'].mean():.4f}")

        # Comparative table PDF vs TXT by embedding type
        print("\nGenerating comparative tables PDF vs TXT...")

        # Check if we have source data and multiple source types
        if 'Source' in df.columns and len(df['Source'].unique()) > 1:
            # Full table grouped by Source and Embedding_Type
            comparison_table = df.groupby(['Source', 'Embedding_Type']).agg({
                'CosSim_TFIDF': 'mean',
                'RougeL_TFIDF': 'mean',
                'CosSim_Vector': 'mean',
                'RougeL_Vector': 'mean',
                'Time_TFIDF': 'mean',
                'Time_Vector': 'mean'
            })

            print("\nCOMPARATIVE TABLE BY SOURCE AND EMBEDDING TYPE:")
            print(comparison_table)

            # Save the table to a separate CSV file
            comparison_table.to_csv('source_embedding_comparison.csv')
            print("Comparative table saved in 'source_embedding_comparison.csv'")

            # Performance table by method (vector vs tf-idf)
            table_by_method = pd.pivot_table(
                df,
                values=['CosSim_TFIDF', 'CosSim_Vector', 'RougeL_TFIDF', 'RougeL_Vector'],
                index=['Source'],
                columns=['Embedding_Type'],
                aggfunc='mean'
            )

            print("\nPERFORMANCE COMPARISON TABLE BY METHOD AND SOURCE:")
            print(table_by_method)
            table_by_method.to_csv('full_performance_comparison.csv')
            print("Complete comparative table saved in 'full_performance_comparison.csv'")

            metrics_list = []
            for source_type in df['Source'].unique():
                for e_type in df['Embedding_Type'].unique():
                    subset = df[(df['Source'] == source_type) & (df['Embedding_Type'] == e_type)]

                    # Calculate averages for each combination
                    if not subset.empty: # Ensure subset exists
                        metrics_list.append({
                            'Source': source_type,
                            'Embedding': e_type,
                            'CosSim_TFIDF': subset['CosSim_TFIDF'].mean(),
                            'CosSim_Vector': subset['CosSim_Vector'].mean(),
                            'RougeL_TFIDF': subset['RougeL_TFIDF'].mean(),
                            'RougeL_Vector': subset['RougeL_Vector'].mean()
                        })

            # Create DataFrame for the final table
            if metrics_list:
                final_table = pd.DataFrame(metrics_list)

                print("\nFINAL PERFORMANCE TABLE (easy-to-read format):")
                print(final_table)

                # Save the final table to CSV
                final_table.to_csv('final_performance_table.csv', index=False)
                print("Final table saved in 'final_performance_table.csv'")
            else:
                print("Could not generate final performance table (no valid data combinations found).")

        else:
            print("Cannot generate comparative tables: missing 'Source' data or only one source type exists.")
    else:
        print("No evaluation results were obtained")

# To run the evaluation
if __name__ == "__main__":
    evaluate_models_per_db()

In [3]:
modelos = {}

# Inicializar OpenAI
try:
    from langchain_openai import ChatOpenAI

    modelos["GPT-4"] = ChatOpenAI(
        model_name="gpt-4.1-mini",
        temperature=0.0,
        max_tokens=2000,
        api_key=OPENAI_API_KEY
    )
    print("Modelo GPT-4 inicializado correctamente")
except Exception as e:
    print(f"Error al inicializar GPT-4: {str(e)}")

# Inicializar Claude
try:
    from langchain_anthropic import ChatAnthropic

    modelos["Claude"] = ChatAnthropic(
        model="claude-3-opus-20240229",
        temperature=0.0,
        max_tokens_to_sample=2000,
        api_key=ANTHROPIC_API_KEY
    )
    print("Modelo Claude inicializado correctamente")
except Exception as e:
    print(f"Error al inicializar Claude: {str(e)}")

# Inicializar Gemini
try:
    from langchain_google_genai import ChatGoogleGenerativeAI

    modelos["Gemini"] = ChatGoogleGenerativeAI(
        model="gemini-2.0-flash",
        temperature=0.0,
        max_output_tokens=2000,
        google_api_key=GOOGLE_API_KEY
    )
    print("Modelo Gemini inicializado correctamente")
except Exception as e:
    print(f"Error al inicializar Gemini: {str(e)}")




Modelo GPT-4 inicializado correctamente
Modelo Claude inicializado correctamente
Modelo Gemini inicializado correctamente


In [None]:

# Initialize the similarity model
similarity_model = None
try:
    similarity_model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')
    print("MPNet model loaded")
except Exception:
    try:
        similarity_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        print("MiniLM model loaded (alternative)")
    except Exception as e:
        print(f"Error loading similarity models: {e}")

# Initialize the ROUGE evaluator
rouge_evaluator = None
try:
    print("ROUGE evaluator initialized (placeholder - requires 'rouge_scorer')")
except NameError:
     print("Skipping ROUGE evaluator initialization: 'rouge_scorer' not defined.")
except Exception as e:
    print(f"Error with ROUGE: {e}")

# Load the collections file
COLLECTIONS_FILE = 'created_collections.json'

# Load collections from the JSON file
try:
    with open(COLLECTIONS_FILE, 'r', encoding='utf-8') as f:
        collections_data = json.load(f)
    print(f"Loaded {len(collections_data)} collections from '{COLLECTIONS_FILE}'")
except Exception as e:
    print(f"Error loading collections file: {e}")
    collections_data = {}

# Main evaluation loop
results = []

# Select collections to evaluate
selected_collections = []

# If we don't select specific ones (list is empty), use all available
if len(selected_collections) < 1:
    selected_collections = [(k, v) for k, v in collections_data.items()]
    print("Using all available collections")

# Instruction template (now using PromptTemplate)
template = """Answer the following question using ONLY the information provided in the context.
If the complete answer is not found in the context, say "I cannot answer based on the provided context." Do not add anything else.
Do not invent information and keep your answer concise and direct.

Context: {context}

Question: {question}

Answer:"""

# Create PromptTemplate object
prompt_template = PromptTemplate(
    template=template,
    input_variables=["context", "question"]
)

# Evaluate each combination
for key, collection_name in selected_collections:
    print(f"\nEvaluating collection: {key} -> {collection_name}")

    # Determine embedding type
    try:
        if "_openai" in key:
            embeddings = OpenAIEmbeddings(api_key=os.environ.get("OPENAI_API_KEY"))
            embedding_type = "openai"
        elif "_google" in key:
            embeddings = GoogleGenerativeAIEmbeddings(
                model="models/embedding-001",
                api_key=os.environ.get("GOOGLE_API_KEY")
            )
            embedding_type = "google"
        else:
            # Fallback or default case
            print(f"Unknown embedding type pattern in key '{key}', using OpenAI as fallback")
            embeddings = OpenAIEmbeddings(api_key=os.environ.get("OPENAI_API_KEY"))
            embedding_type = "openai_fallback"

        # Load vector store
        vector_store = Chroma(
            collection_name=collection_name,
            embedding_function=embeddings
        )

        # Create retriever
        retriever = vector_store.as_retriever(
            search_kwargs={"k": 5} # Retrieve top 5 documents
        )


        for llm_name, llm_instance in models.items():
            print(f"\nEvaluating LLM: {llm_name} with retriever: {embedding_type}")

            try:
                # Configure QA chain with the correct prompt
                qa_chain = RetrievalQA.from_chain_type(
                    llm=llm_instance,
                    chain_type="stuff",
                    retriever=retriever,
                    chain_type_kwargs={
                        "prompt": prompt_template,
                        "verbose": False
                    },
                    return_source_documents=True
                )

                for i, question in enumerate(questions):
                    print(f"  Question {i+1}: {question}")

                    try:
                        # Measure time
                        start_time = time.time()

                        result_data = qa_chain.invoke({"query": question})

                        # Calculate time
                        total_time = time.time() - start_time

                        # Get answer and documents
                        answer = result_data["result"]
                        documents = result_data.get("source_documents", [])

                        print(f"  Time: {total_time:.2f}s")
                        print(f"  Answer: {answer}")

                        sim_scores = calculate_similarity(answer, correct_answers[i], similarity_model)

                        similarity_score = sim_scores.get('cosine_similarity', 0.0) # Default to 0 if key missing
                        print(f"  Similarity: {similarity_score:.4f}")

                        # Save result
                        results.append({
                            'LLM': llm_name,
                            'Retriever': embedding_type,
                            'Collection': collection_name,
                            'Question': question,
                            'LLM_Answer': answer,
                            'Correct_Answer': correct_answers[i],
                            'Time': total_time,
                            'Similarity': similarity_score,
                            'Documents_Used': len(documents)
                        })

                    except Exception as e:
                        print(f"  Error processing question '{question}': {str(e)}")

                        # Record error
                        results.append({
                            'LLM': llm_name,
                            'Retriever': embedding_type,
                            'Collection': collection_name,
                            'Question': question,
                            'LLM_Answer': f"ERROR: {str(e)}",
                            'Correct_Answer': correct_answers[i],
                            'Time': 0,
                            'Similarity': 0.0,
                            'Documents_Used': 0
                        })
            except Exception as e:
                print(f"Error configuring QA chain for {llm_name} with {collection_name}: {str(e)}")

    except Exception as e:
        print(f"Error processing collection {collection_name} (key: {key}): {str(e)}")


# Save and analyze results
if results:
    # Convert to DataFrame
    df = pd.DataFrame(results)

    # Save results
    df.to_csv('evaluacion_llms.csv', index=False)
    df.to_pickle('evaluacion_llms.pkl')

    print("\nEvaluation completed. Results saved to 'evaluacion_llms.csv' and 'evaluacion_llms.pkl'.")

    # Display summary
    print("\nRESULTS SUMMARY:")

    # By LLM
    print("\nPerformance by LLM:")
    if 'LLM' in df.columns and 'Similarity' in df.columns and 'Time' in df.columns:
        for llm_name_summary in df['LLM'].unique():
            subset = df[df['LLM'] == llm_name_summary]
            print(f"  • {llm_name_summary}:")
            print(f"    - Average Similarity: {subset['Similarity'].mean():.4f}")
            print(f"    - Average Time: {subset['Time'].mean():.2f}s")
    else:
        print("  Could not generate LLM summary - required columns missing.")


    # By Retriever Type
    print("\nPerformance by Retriever Type:")
    if 'Retriever' in df.columns and 'Similarity' in df.columns:
        for retriever_type in df['Retriever'].unique():
            subset = df[df['Retriever'] == retriever_type]
            print(f"  • {retriever_type}:")
            print(f"    - Average Similarity: {subset['Similarity'].mean():.4f}")
    else:
         print("  Could not generate Retriever summary - required columns missing.")


    # Best Combinations
    print("\nTop 3 Best LLM+Retriever Combinations (by average similarity):")
    if 'LLM' in df.columns and 'Retriever' in df.columns and 'Similarity' in df.columns and 'Time' in df.columns:
        best_combinations = df.groupby(['LLM', 'Retriever']).agg(
            Avg_Similarity=('Similarity', 'mean'),
            Avg_Time=('Time', 'mean')
        ).reset_index().sort_values('Avg_Similarity', ascending=False).head(3)

        for i, row in best_combinations.iterrows():
            print(f"  {i+1}. {row['LLM']} + {row['Retriever']}:")
            print(f"     - Avg Similarity: {row['Avg_Similarity']:.4f}")
            print(f"     - Avg Time: {row['Avg_Time']:.2f}s")
    else:
        print("  Could not generate Best Combinations summary - required columns missing.")

else:
    print("\nNo results were obtained during the evaluation.")