In [4]:
import pandas as pd
import json
import logging
from tqdm.auto import tqdm
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# from openai import OpenAI
# client = OpenAI()

# Ingestion

In [5]:
with open('../data/arsonor_chunks_300_50.json', 'r', encoding='utf-8') as file:
    documents = json.load(file)

In [6]:
es = Elasticsearch("http://localhost:9200")

In [8]:
index_name = "arsonor_chunks_300"

In [9]:
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)

In [10]:
# Create index if not already created
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body={
        "mappings": {
            "properties": {
                "article_id": {"type": "keyword"},
                "title": {"type": "text"},
                "url": {"type": "keyword"},
                "category": {"type": "keyword"},
                "tags": {"type": "text"},
                "chunk_id": {"type": "keyword"},
                "chunk_text": {"type": "text"}
            }
        }
    })

In [11]:
def prepare_documents_for_indexing(docs):
    for doc in docs:
        yield {
            "_index": index_name,
            "_id": doc['chunk_id'],
            "_source": {
                "article_id": doc['article_id'],
                "title": doc['title'],
                "url": doc['url'],
                "category": doc['category'],
                "tags": doc['tags'],
                "chunk_id": doc['chunk_id'],
                "chunk_text": doc['chunk_text'],
            }
        }

# Index the documents in bulk
bulk(es, prepare_documents_for_indexing(documents))

(572, [])

# RAG flow

### Two-level retrieval mechanism (article-level followed by chunk-level)

In [12]:
def elastic_search2(query, category=None, min_score=0.1):
    # First-level: Article search with diversity
    article_filter = {
        "bool": {
            "should": [
                {
                    "multi_match": {
                        "query": query,
                        "fields": ["title^3", "tags^2", "category"],
                        "type": "cross_fields",
                        "operator": "or",
                        "tie_breaker": 0.3
                    }
                }
            ],
            "filter": []
        }
    }

    if category:
        article_filter['bool']['filter'].append({"term": {"category": category}})

    article_search_body = {
        "query": article_filter,
        "size": 20,  # Increased size for more diversity
        "_source": ["article_id", "title", "category", "tags", "url"],
        "collapse": {
            "field": "article_id",  # Collapse results by article_id
            "inner_hits": {
                "name": "most_relevant_chunk",
                "size": 1,
                "sort": [{"_score": "desc"}]
            },
            "max_concurrent_group_searches": 4
        },
        "min_score": min_score
    }

    try:
        article_search_results = es.search(index=index_name, body=article_search_body)['hits']['hits']
    except Exception as e:
        print(f"Error in article search: {e}")
        return [], []

    if not article_search_results:
        return [], []

    # Extract unique article IDs
    top_article_ids = list(set(hit['_source']['article_id'] for hit in article_search_results))

    # Second-level: Diverse chunk-level search
    chunk_filter = {
        "bool": {
            "must": [
                {"terms": {"article_id": top_article_ids}},
                {
                    "multi_match": {
                        "query": query,
                        "fields": ["chunk_text^2", "title"],
                        "type": "best_fields",
                        "operator": "or",
                        "fuzziness": "AUTO"
                    }
                }
            ]
        }
    }

    chunk_search_body = {
        "query": chunk_filter,
        "size": 20,  # Increased size
        "_source": ["article_id", "chunk_id", "chunk_text", "title", "url"],
        "collapse": {
            "field": "article_id",  # Ensure chunks from different articles
            "inner_hits": {
                "name": "alternative_chunks",
                "size": 2  # Get 2 best chunks per article
            }
        },
        "min_score": min_score
    }

    try:
        chunk_search_results = es.search(index=index_name, body=chunk_search_body)['hits']['hits']
    except Exception as e:
        print(f"Error in chunk search: {e}")
        return article_search_results, []

    return article_search_results, chunk_search_results

In [13]:
def process_search_results(article_results, chunk_results):
    processed_results = []
    
    # Create a mapping of article_id to article details
    article_map = {hit['_source']['article_id']: hit['_source'] for hit in article_results}
    
    # Process chunk results and combine with article information
    for chunk_hit in chunk_results:
        article_id = chunk_hit['_source']['article_id']
        if article_id in article_map:
            article_info = article_map[article_id]
            
            # Get inner hits (alternative chunks)
            alternative_chunks = chunk_hit['inner_hits']['alternative_chunks']['hits']['hits']
            
            processed_result = {
                'article_id': article_id,
                'title': article_info['title'],
                'category': article_info.get('category', ''),
                'tags': article_info.get('tags', []),
                'url': article_info.get('url', ''),
                'chunks': [
                    {
                        'chunk_id': alt_chunk['_source']['chunk_id'],
                        'chunk_text': alt_chunk['_source']['chunk_text'],
                        'score': alt_chunk['_score']
                    }
                    for alt_chunk in alternative_chunks
                ],
                'overall_score': chunk_hit['_score']
            }
            processed_results.append(processed_result)
    
    return processed_results

In [14]:
def search_with_diversity(query, category=None):
    article_results, chunk_results = elastic_search2(query, category)
    processed_results = process_search_results(article_results, chunk_results)
    
    return processed_results

### Prompt

In [11]:
prompt_template = """
You're an audio engineer and sound designer instructor for beginners.
You're particularly specialized in audio home-studio set-up, computer music production and audio post-production in general (editing, mixing and mastering). 
Answer the QUESTION based on the CONTEXT from our arsonor knowledge database (articles).
Use only the facts from the CONTEXT when answering the QUESTION.
Finally, recommend the top 3 Arsonor articles that are the best to read for answering this question.
For each recommended article, include both its title and URL.

QUESTION: {question}

CONTEXT:
{context}

RECOMMENDED ARTICLES:
{recommendations}
""".strip()

entry_template = """
ARTICLE: {title}
KEYWORDS: {tags}
CONTENT: {chunk_text}
""".strip()

recommendation_template = """
- [{title}]({url})
""".strip()


In [12]:
def ensure_diversity(chunks, max_per_article=3):
    articles = {}
    for chunk in chunks:
        title = chunk['title']
        if title not in articles:
            articles[title] = []
        if len(articles[title]) < max_per_article:
            articles[title].append(chunk)
    
    diverse_chunks = []
    for article_chunks in articles.values():
        diverse_chunks.extend(article_chunks)
    
    return sorted(diverse_chunks, key=lambda x: x['score'], reverse=True)

In [13]:
def build_prompt2(query, search_results, max_context_entries=10):
    # Build context from chunks
    context_entries = []
    seen_articles = set()
    article_details = []

    # Create a flat list of all chunks with their article info
    all_chunks = []
    for result in search_results:
        article_id = result['article_id']
        
        # Store article details for recommendations
        if article_id not in seen_articles:
            seen_articles.add(article_id)
            article_details.append({
                'title': result['title'].strip(),
                'url': result.get('url', '#').strip(),
                'relevance_score': result['overall_score']
            })        

        # Add all chunks to a flat list with their article info
        for chunk in result['chunks']:
            all_chunks.append({
                'title': result['title'],
                'tags': result['tags'],
                'chunk_text': chunk['chunk_text'],
                'score': chunk['score']
            })

    # Sort all chunks by score and take top 10
    top_chunks = ensure_diversity(sorted(all_chunks, key=lambda x: x['score'], reverse=True))[:max_context_entries]
    
    # Create context entries from top chunks
    context_entries = [
        entry_template.format(
            title=chunk['title'],
            tags=chunk['tags'],
            chunk_text=chunk['chunk_text']
        )
        for chunk in top_chunks
    ]
    
    # Sort article details by relevance score and get top 3
    top_articles = sorted(article_details, key=lambda x: x['relevance_score'], reverse=True)[:3]
    
    # Create recommendations with clickable links
    recommendations = "\n".join(
        recommendation_template.format(**article) 
        for article in top_articles
    )

    # Build the full prompt
    context = "\n\n".join(context_entries)
    prompt = prompt_template.format(
        question=query,
        context=context,
        recommendations=recommendations
    )
    
    return prompt

In [14]:
def get_prompt_for_query(query, category=None, max_context_entries=10):
    search_results = search_with_diversity(query, category)
    if not search_results:
        return "No relevant articles found for your query."
    
    try:
        prompt = build_prompt2(query, search_results, max_context_entries)
        return prompt
    except Exception as e:
        logging.error(f"Error building prompt: {e}")
        return f"Error generating prompt: {str(e)}"

In [15]:
# Optional: Debug function to check context size
def debug_context_size(prompt):
    context_start = prompt.find("CONTEXT:") + 8
    context_end = prompt.find("RECOMMENDED ARTICLES:")
    context = prompt[context_start:context_end].strip()
    chunks = context.split('\n\nARTICLE:')
    return {
        'num_chunks': len(chunks),
        'total_context_length': len(context),
        'average_chunk_length': len(context) / len(chunks) if chunks else 0
    }

### Test prompt

In [16]:
query = 'Comment obtenir une musique de haute qualité au même niveau sonore que les autres?'

In [17]:
prompt = get_prompt_for_query(query, category=None, max_context_entries=10)
print(prompt)

You're an audio engineer and sound designer instructor for beginners.
You're particularly specialized in audio home-studio set-up, computer music production and audio post-production in general (editing, mixing and mastering). 
Answer the QUESTION based on the CONTEXT from our arsonor knowledge database (articles).
Use only the facts from the CONTEXT when answering the QUESTION.
Finally, recommend the top 3 Arsonor articles that are the best to read for answering this question.
For each recommended article, include both its title and URL.

QUESTION: Comment obtenir une musique de haute qualité au même niveau sonore que les autres?

CONTEXT:
ARTICLE: Les morceaux de référence (2): Comment les choisir?
KEYWORDS: ear training, écoute critique, liste de références
CONTENT: Pour en revenir au dilemme goût personnel/qualité objective, tu peux très bien être fan d’un vieux morceau rock d’Elvis Presley tout en sachant pertinemment que la qualité sonore est à des années-lumières des standards d

In [18]:
debug_info = debug_context_size(prompt)
print(f"Number of chunks: {debug_info['num_chunks']}")
print(f"Total context length: {debug_info['total_context_length']}")
print(f"Average chunk length: {debug_info['average_chunk_length']:.2f}")

Number of chunks: 10
Total context length: 19624
Average chunk length: 1962.40


In [19]:
def llm(prompt, model='gpt-4o-mini'):
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

In [20]:
def rag(query, category=None, model='gpt-4o-mini'):
    prompt = get_prompt_for_query(query, category, max_context_entries=10)
    answer = llm(prompt, model=model)
    return answer

In [21]:
# category = 'LE HOME STUDIO'
question = 'Comment obtenir une musique de haute qualité au même niveau sonore que les autres?'
answer = rag(question)
print(answer)

Pour obtenir une musique de haute qualité avec un niveau sonore comparable à celui des autres productions, il est essentiel de se concentrer sur plusieurs aspects clés :

1. **Qualité de Mixage** : Le mixage doit être transparent, équilibré et dynamique. Cherchez à reproduire un très bon équilibre spectral et une haute définition des instruments, en évitant la Loudness War, qui peut nuire à la qualité sonore.

2. **Utilisation de Morceaux de Référence** : Pour évaluer la qualité de votre mix, sélectionnez des morceaux de référence ayant un mixage de haute qualité. Preférez des formats audio non compressés (comme WAV ou FLAC) pour le traitement de votre musique.

3. **Équilibre Dynamique** : Prenez en compte la macro-dynamique (les niveaux globaux) et la micro-dynamique (les variations de niveaux au sein même d'un instrument). Une compression et une automation appropriées des niveaux peuvent aider à atteindre un niveau perçu global satisfaisant.

4. **Éducation de l'oreille et ajustemen

# Retrieval evaluation

In [15]:
df_question = pd.read_csv('../data/ground-truth-300.csv')
df_question.head()

Unnamed: 0,question,category,chunk,article
0,Quel est l'impact de l'IA sur la post-producti...,LA POST-PROD,4615db39-1,4615db39
1,Comment les outils IA simplifient-ils le trava...,LA POST-PROD,4615db39-1,4615db39
2,Quels avantages l'IA apporte-t-elle aux artist...,LA POST-PROD,4615db39-1,4615db39
3,Comment un débutant peut-il améliorer ses prod...,LA POST-PROD,4615db39-1,4615db39
4,Quelle est l'évolution des outils audio pour l...,LA POST-PROD,4615db39-1,4615db39


In [22]:
ground_truth = df_question.to_dict(orient='records')
len(ground_truth)

2860

In [24]:
def hit_rate(relevance_total):
    cnt = 0

    for line in relevance_total:
        if True in line:
            cnt = cnt + 1

    return cnt / len(relevance_total)


def mrr(relevance_total):
    total_score = 0.0

    for line in relevance_total:
        for rank in range(len(line)):
            if line[rank] == True:
                total_score = total_score + 1 / (rank + 1)

    return total_score / len(relevance_total)

In [28]:
def evaluate(ground_truth, search_function):
    relevance_total = []

    for q in tqdm(ground_truth):
        relevance = [] 
        doc_id = q['chunk']
        results = search_function(q)
        for d in results:
            for c in d['chunks']:
                relevance.append(c['chunk_id'] == doc_id)
        relevance_total.append(relevance)

    return {
        'hit_rate': hit_rate(relevance_total),
        'mrr': mrr(relevance_total)
    }

In [29]:
evaluate(ground_truth, lambda q: search_with_diversity(q['question'], q['category']))

  0%|          | 0/2860 [00:00<?, ?it/s]

{'hit_rate': 0.6961538461538461, 'mrr': 0.5156604215159822}

## New evaluation framework

In [1]:
from typing import List, Dict, Any
from collections import defaultdict
import numpy as np

In [2]:
class SearchEvaluator:
    def __init__(self, ground_truth: List[Dict[str, str]]):
        """
        Initialize the evaluator with ground truth data.
        
        Args:
            ground_truth: List of dictionaries, each containing:
                - 'question': str
                - 'category': str
                - 'chunk': str (chunk_id)
                - 'article': str (article_id)
        """
        self.ground_truth = ground_truth
        
        # Create mappings for efficient lookup
        self.question_to_chunks = defaultdict(list)
        self.question_to_articles = defaultdict(list)
        self.question_to_category = {}
        
        for item in ground_truth:
            question = item['question']
            self.question_to_chunks[question].append(item['chunk'])
            self.question_to_articles[question].append(item['article'])
            self.question_to_category[question] = item['category']

    def evaluate(self, k_values: List[int] = [1, 3, 5, 10]) -> Dict:
        """
        Evaluate the search system using multiple metrics.
        
        Args:
            k_values: List of k values for Hit@K calculation
        
        Returns:
            Dictionary containing various evaluation metrics
        """
        results = {
            'overall': {
                'mrr': 0.0,
                'hit_rates': {k: 0.0 for k in k_values}
            },
            'per_category': defaultdict(lambda: {
                'mrr': 0.0,
                'hit_rates': {k: 0.0 for k in k_values},
                'count': 0
            }),
            'per_question_metrics': []
        }
        
        total_questions = len(self.question_to_chunks)
        
        for question in self.question_to_chunks.keys():
            # Get search results for the question
            search_results = search_with_diversity(question)
            
            # Get relevant chunk and article IDs for this question
            relevant_chunk_ids = set(self.question_to_chunks[question])
            relevant_article_ids = set(self.question_to_articles[question])
            category = self.question_to_category[question]
            
            # Calculate metrics for this question
            question_metrics = self._calculate_question_metrics(
                search_results, relevant_chunk_ids, relevant_article_ids, k_values)
            
            # Update overall metrics
            results['overall']['mrr'] += question_metrics['reciprocal_rank']
            for k in k_values:
                results['overall']['hit_rates'][k] += question_metrics[f'hit@{k}']
            
            # Update per-category metrics
            results['per_category'][category]['mrr'] += question_metrics['reciprocal_rank']
            results['per_category'][category]['count'] += 1
            for k in k_values:
                results['per_category'][category]['hit_rates'][k] += question_metrics[f'hit@{k}']
            
            results['per_question_metrics'].append({
                'question': question,
                'category': category,
                'metrics': question_metrics
            })
        
        # Average the overall metrics
        results['overall']['mrr'] /= total_questions
        for k in k_values:
            results['overall']['hit_rates'][k] /= total_questions
        
        # Average the per-category metrics
        for category_metrics in results['per_category'].values():
            if category_metrics['count'] > 0:
                category_metrics['mrr'] /= category_metrics['count']
                for k in k_values:
                    category_metrics['hit_rates'][k] /= category_metrics['count']
        
        return results

    def _calculate_question_metrics(
        self, 
        search_results: List[Dict], 
        relevant_chunk_ids: set,
        relevant_article_ids: set,
        k_values: List[int]
    ) -> Dict:
        """
        Calculate metrics for a single question.
        """
        metrics = {
            f'hit@{k}': 0 for k in k_values
        }
        metrics.update({
            f'article_hit@{k}': 0 for k in k_values
        })
        metrics['reciprocal_rank'] = 0
        metrics['article_reciprocal_rank'] = 0
        
        chunk_found_positions = []
        article_found_positions = []
        
        for rank, result in enumerate(search_results, 1):
            # Check article-level match
            if result['article_id'] in relevant_article_ids:
                article_found_positions.append(rank)
            
            # Check chunk-level match
            for chunk in result['chunks']:
                if chunk['chunk_id'] in relevant_chunk_ids:
                    chunk_found_positions.append(rank)
                    break
        
        # Calculate Hit@K for both chunk and article levels
        for k in k_values:
            metrics[f'hit@{k}'] = 1 if any(pos <= k for pos in chunk_found_positions) else 0
            metrics[f'article_hit@{k}'] = 1 if any(pos <= k for pos in article_found_positions) else 0
        
        # Calculate Reciprocal Rank for both levels
        if chunk_found_positions:
            metrics['reciprocal_rank'] = 1.0 / min(chunk_found_positions)
        if article_found_positions:
            metrics['article_reciprocal_rank'] = 1.0 / min(article_found_positions)
        
        return metrics

    def print_detailed_results(self, results: Dict):
        """
        Print detailed evaluation results.
        """
        print("Overall Metrics:")
        print(f"MRR: {results['overall']['mrr']:.4f}")
        for k, hit_rate in results['overall']['hit_rates'].items():
            print(f"Hit@{k}: {hit_rate:.4f}")
        
        print("\nPer-category Metrics:")
        for category, metrics in results['per_category'].items():
            print(f"\n{category}:")
            print(f"  Count: {metrics['count']}")
            print(f"  MRR: {metrics['mrr']:.4f}")
            for k, hit_rate in metrics['hit_rates'].items():
                print(f"  Hit@{k}: {hit_rate:.4f}")
        
        print("\nSample Per-question Metrics (first 5):")
        for item in results['per_question_metrics'][:5]:
            print(f"\nQuestion: {item['question']}")
            print(f"Category: {item['category']}")
            m = item['metrics']
            print(f"  Reciprocal Rank: {m['reciprocal_rank']:.4f}")
            for k in sorted(k for k in m.keys() if k.startswith('hit@')):
                print(f"  {k}: {m[k]}")


In [27]:
def run_evaluation(ground_truth: List[Dict[str, str]], k_values: List[int] = [1, 3, 5, 10]) -> Dict:
    """
    Run the complete evaluation process.
    
    Args:
        ground_truth: List of dictionaries containing ground truth data
        k_values: List of k values for Hit@K calculation
    
    Returns:
        Dictionary containing evaluation results
    """
    evaluator = SearchEvaluator(ground_truth)
    results = evaluator.evaluate(k_values)
    evaluator.print_detailed_results(results)
    return results

In [None]:
# Example usage
if __name__ == "__main__":
    # Sample ground truth data in the specified format
    sample_ground_truth = [
        {
            'question': "Quel est l'impact de l'IA sur la post-production audio et musicale",
            'category': 'LA POST-PROD',
            'chunk': '4615db39-1',
            'article': '4615db39'
        },
        {
            'question': "Comment l'IA transforme-t-elle le processus de mixage audio?",
            'category': 'LA POST-PROD',
            'chunk': '4615db39-2',
            'article': '4615db39'
        }
    ]
    
    results = run_evaluation(sample_ground_truth)

In [28]:
run_evaluation(ground_truth)

Overall Metrics:
MRR: 0.5210
Hit@1: 0.4691
Hit@3: 0.5719
Hit@5: 0.5870
Hit@10: 0.5979

Per-category Metrics:

LA POST-PROD:
  Count: 1433
  MRR: 0.5393
  Hit@1: 0.4885
  Hit@3: 0.5876
  Hit@5: 0.6071
  Hit@10: 0.6190

LE HOME STUDIO:
  Count: 647
  MRR: 0.5213
  Hit@1: 0.4529
  Hit@3: 0.5904
  Hit@5: 0.6074
  Hit@10: 0.6167

LE SOUND DESIGN:
  Count: 770
  MRR: 0.4866
  Hit@1: 0.4468
  Hit@3: 0.5273
  Hit@5: 0.5325
  Hit@10: 0.5429

Sample Per-question Metrics (first 5):

Question: Quel est l'impact de l'IA sur la post-production audio et musicale
Category: LA POST-PROD
  Reciprocal Rank: 1.0000
  hit@1: 1
  hit@10: 1
  hit@3: 1
  hit@5: 1

Question: Comment les outils IA simplifient-ils le travail audio
Category: LA POST-PROD
  Reciprocal Rank: 1.0000
  hit@1: 1
  hit@10: 1
  hit@3: 1
  hit@5: 1

Question: Quels avantages l'IA apporte-t-elle aux artistes et créateurs
Category: LA POST-PROD
  Reciprocal Rank: 0.0000
  hit@1: 0
  hit@10: 0
  hit@3: 0
  hit@5: 0

Question: Comment un déb

{'overall': {'mrr': 0.5209809371914631,
  'hit_rates': {1: 0.4691228070175439,
   3: 0.5719298245614035,
   5: 0.5870175438596491,
   10: 0.5978947368421053}},
 'per_category': defaultdict(<function __main__.SearchEvaluator.evaluate.<locals>.<lambda>()>,
             {'LA POST-PROD': {'mrr': 0.5392857142857145,
               'hit_rates': {1: 0.4884856943475227,
                3: 0.5875785066294487,
                5: 0.6071179344033496,
                10: 0.6189811584089323},
               'count': 1433},
              'LE HOME STUDIO': {'mrr': 0.5213010542608686,
               'hit_rates': {1: 0.4528593508500773,
                3: 0.5904173106646059,
                5: 0.60741885625966,
                10: 0.616692426584235},
               'count': 647},
              'LE SOUND DESIGN': {'mrr': 0.48664605236033803,
               'hit_rates': {1: 0.44675324675324674,
                3: 0.5272727272727272,
                5: 0.5324675324675324,
                10: 0.542857142857