In [None]:
import warnings
from loguru import logger


# Ignore warnings
warnings.filterwarnings('ignore')

# Ignore loguru logs from backend
logger.remove()  
logger.add(lambda msg: None, level="ERROR")

In [2]:
import sys
import os

# Add the project directory to the path
backend_path = os.path.abspath('../')
if backend_path not in sys.path:
    sys.path.append(backend_path)

from backend.causal_models.factory import CausalLMFactory
from backend.retriever.factory import DenseRetrieverFactory, SparseRetrieverFactory
from backend.file_handling.extractors import PdfExtractor
from backend.file_handling.chunker import SemanticTextChunker
from backend.storage.chromadb import ChromaDB


In [21]:
vector_store = ChromaDB()
pdf_extractor = PdfExtractor()
chunker = SemanticTextChunker()

dense_retriever = DenseRetrieverFactory.get_model("sentence-transformers/all-MiniLM-L6-v2")
sparse_retriever = SparseRetrieverFactory.get_model("bm25")

# Build knowledge base

In [None]:
from pathlib import Path
import pandas as pd
from unstructured.cleaners.core import clean
from backend.schemas import StoreEntry

# List to store data for statistical analysis
document_data = []

# Dataset path
dataset_path = Path('./files')

for pdf_file in dataset_path.glob('*.pdf'):
    try:
        extraced_content = pdf_extractor.extract_content(pdf_file)
        print(f"Extract content from {pdf_file.name}:")
    except Exception as e:
        print(f"Error while reading {pdf_file.name}: {e}.")
        continue

    # chunk text
    if extraced_content:
        # Clean text
        cleaned_text = clean(
            extraced_content.full_text,
            bullets=True,
            extra_whitespace=True,
            dashes=True,
            trailing_punctuation=True,
        )
        chunked_texts = chunker.chunk_text(cleaned_text)

        document_data.append({
            'document_name': pdf_file.name,
            #'text_length': len(cleaned_text),
            'word_count': len(cleaned_text.split()),
            'token_count': len(dense_retriever.tokenizer.tokenize(cleaned_text)),
            'chunked': len(chunked_texts)
        })

        # Insert text only
        if len(chunked_texts) > 0:
            dense_text_vectors = dense_retriever.vectorize(chunked_texts)
            vector_store.insert(
                StoreEntry(
                    type="text",
                    document_name=pdf_file.name,
                    content=chunked_texts,
                    vector=dense_text_vectors,
                )
            )

         # Sparse embeddings
        sparse_retriever.add_documents(chunked_texts)

        # Insert Images with captions
        dense_caption_vectors = dense_retriever.vectorize(
            [img.caption for img in extraced_content.images]
        )
        if dense_caption_vectors is not None:
            vector_store.insert(
                StoreEntry(
                    type="caption",
                    document_name=pdf_file.name,
                    content=extraced_content.images,
                    vector=dense_caption_vectors,
                )
            )

df = pd.DataFrame(document_data)
df.to_csv('document_data.csv', index=False)

# Search and cache

In [None]:
from backend.schemas import SearchResult

def reciprocal_rank_fusion(
        dense_results: list[SearchResult], sparse_results: list[str], k: int = 60
    ) -> list[SearchResult]:
        # Store text to image mapping
        image_mapping = dict()
        dense_texts: list[str] = list()
        for result in dense_results:
            if result.image:
                image_mapping[result.text] = result.image
            dense_texts.append(result.text)

        # Combine found docs
        all_docs = set(sparse_results + dense_texts)

        # Reciprocal Rank Fusion
        scores = dict()
        for doc in all_docs:
            score = 0
            # Calculate score contribution from sparse results
            if doc in sparse_results:
                rank = sparse_results.index(doc) + 1
            else:
                rank = len(sparse_results) + 1
            score += 1 / (k + rank)

            # Calculate score contribution from dense results
            if doc in dense_results:
                rank = dense_results.index(doc) + 1
            else:
                rank = len(dense_results) + 1
            score += 1 / (k + rank)

            scores[doc] = score

        # Rank results
        ranked_documents = sorted(scores.items(), key=lambda x: x[1], reverse=True)[
            : 10 # Return top 10 results
        ]
        return [
            SearchResult(text=doc[0], image=image_mapping.get(doc[0]))
            for doc in ranked_documents
        ]

In [40]:
from pathlib import Path

# Search queries
queries = [
    "What time period does the sea-ice extent data cover up to?",
    "Which percentage share of reneweable enery systems has europe?",
]

# Search result cache
search_results: list[dict[str, list[SearchResult]]] = list()

# Search
N_RESULTS = 10
for query_index, query in enumerate(queries):
    search_vector = dense_retriever.vectorize(query)
    dense_results = vector_store.query(search_vector, k=N_RESULTS)
    sparse_results = sparse_retriever.search(query, k=N_RESULTS)

    results = reciprocal_rank_fusion(dense_results, sparse_results)

    # Create directory for query to store images
    query_dir = Path(f"./images/{query_index}")
    query_dir.mkdir(parents=True, exist_ok=True)

    for image_index, result in enumerate(results):
        if result.image:
            # Save image
            image_path = query_dir / f"{image_index}.png"
            result.image.save(image_path)

    search_results.append({
        'query': query,
        'results': results
    })

# Answer generation

In [None]:
import pickle
from backend.schemas import GenerationConfig

generation_config = GenerationConfig(
            max_new_tokens=250,
            no_repeat_ngram_size=3,
            temperature=1.0,
            top_k=90,
            num_beams=3,
            do_sample=True,
            length_penalty=-0.7,
        )

models = [
    "meta-llama/Meta-Llama-3.1-8B-Instruct",
    "HuggingFaceM4/idefics2-8b-chatty",
]

results = []

for model_id in models:
    model = CausalLMFactory.get_model(model_id)
    for search_result in search_results:
        for res in search_result:
            answer = model.generate(res["query"], res['results'])
            results.append({
                "model_id": model_id,
                "query": res["query"],
                "results": res['results'],
                "answer": answer
            })

with open('generated_answers.pkl', 'wb') as f:
    pickle.dump(results, f)

# LLM as a judge

In [None]:
from openai import OpenAI
import json
from jinja2 import Template
from pathlib import Path
from backend.schemas import GenerationConfig
from dynaconf import settings

client = OpenAI(api_key=settings.OPENAI)


judge_prompt_template = """
You are an impartial judge tasked with evaluating the performance of an AI assistant. Your job is to assess the assistant's response based on the given question and the provided context.

Please evaluate the answer according to the following criteria:
1. Did the assistant correctly answer the question based on the information in the context?
2. If the context did not contain sufficient information, did the assistant clearly communicate this?

Scoring guidelines:
- Award 1 point if the answer is correct or if the assistant appropriately communicated that the context was insufficient.
- Award 0 points if the answer is incorrect or if the assistant erroneously attempted to answer the question despite insufficient context.

Question: {{ query }}
Context: {{ context }}
Assistant's Answer: {{ answer }}

Please provide your evaluation in the following JSON format. Ensure that the output is valid JSON:

{
  "score": [0 or 1],
  "reasoning": "Your detailed justification for the score",
  "feedback": "Constructive feedback or suggestions for improvement for the assistant"
}
"""

def evaluate_results(results):
    evaluations = []
    template = Template(judge_prompt_template)
    
    for result in results:
        prompt = template.render(
            query=result["query"],
            context=result["context"],
            answer=result["answer"]
        )
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": prompt},
            ],
            max_tokens=150,
            temperature=0
        )
        
        evaluation = json.loads(response.choices[0].message['content'].strip()) # TODO
        evaluations.append(evaluation)
    
    return evaluations

evaluations = evaluate_results(saved_results)

with open("evaluations.json", "w") as f:
    json.dump(evaluations, f, indent=4)

print("Bewertungen gespeichert.")