In [None]:
import numpy as np
import pandas as pd
from langchain.document_loaders import DataFrameLoader
from langchain_core.documents import Document
from typing import Dict, List, Set, Optional, Any, Tuple
import json
import requests
from api_client import APIClient
from api_config import APIConfig
import os
from dotenv import load_dotenv
load_dotenv()

## APIConfig for evaluating skill prediction APIs

In [None]:
# Custom formatters for different APIs

def competence_analyser_request_formatter(query: str, top_k: int) -> Tuple[Dict, Optional[Dict]]:
    """
    Format request for the competence analyser API v2/chatsearch endpoint
    Returns (data, params) tuple
    """
    data = {
        "doc": query,  # The course description goes into 'doc' field
        "taxonomies": ["ESCO"],  # Focus on ESCO taxonomy for our evaluation
        "targets": ["learning_outcomes"],  # We want learning outcomes
        "top_k": top_k,
        "rerank": True,  # Use reranking
        "finetuned": True,  # Use fine-tuned models
        "trusted_score": 0.0,  # Accept all scores, we'll filter later
        "score_cutoff": 0.0,  # Accept all scores
        "strict": 0,  # Get all top_k offers, without any cutoff
        "use_llm": False,  # Don't use LLM extraction to keep it comparable
        "openai_api_key": os.getenv("OPENAI_API_KEY", ""),  # Use environment variable for OpenAI key
        "llm_validation": False,  # Don't use LLM validation to keep it comparable
    }
    return data, None  # No URL parameters needed

def competence_analyser_response_parser(response) -> List[Tuple[str, float]]:
    """
    Parse response from the competence analyser API v2/chatsearch endpoint
    Returns list of (skill_name, score) tuples
    """
    result = response.json()
    predictions = []
    
    # The v2 API returns a more complex structure
    # We need to extract skills from learning_outcomes -> skills
    if "learning_outcomes" in result and result["learning_outcomes"]:
        learning_outcomes = result["learning_outcomes"]
        if "skills" in learning_outcomes:
            for skill in learning_outcomes["skills"]:
                if "title" in skill and "score" in skill:
                    # Note: Higher scores are better in this API
                    predictions.append((skill["title"], float(skill["score"])))
    
    # Sort by score descending (higher is better)
    predictions.sort(key=lambda x: x[1], reverse=True)
    return predictions

def generic_api_request_formatter(query: str, top_k: int) -> Tuple[Dict, Optional[Dict]]:
    """Generic formatter for simple APIs"""
    data = {"query": query, "top_k": top_k}
    return data, None

def generic_api_response_parser(response) -> List[Tuple[str, float]]:
    """Generic parser for simple API responses"""
    result = response.json()
    predictions = []

def metadatagen_request_formatter(query: str, top_k: int) -> Tuple[Dict, Optional[Dict]]:
    """Format request for the MetadataGen API"""
    data = {
        "name": query,  # Use course name as the query
        "description": "This course covers the fundamentals of machine learning including supervised and unsupervised learning algorithms, neural networks, and practical applications in data science.",
        "top_k": top_k  # Limit results to top_k skills
    }

    return data, None  # No URL parameters needed

def get_esco_skill_name(skill_uri: str, language: str = 'en', version: str = 'v1.2.0') -> str:
    """Fetch the ESCO skill name based on the skill ID and language"""
    # https://ec.europa.eu/esco/api/resource/skill?uri=&language=
    url = f"https://ec.europa.eu/esco/api/resource/skill?uri={skill_uri}&language={language}&selectedVersion={version}"
    headers = {"Content-Type": "application/json"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        result = response.json()
        return result["preferredLabel"][language] if "preferredLabel" in result and language in result["preferredLabel"] else null
    else:
        print(f"Error fetching skill name for {skill_uri}: {response.status_code} {response.text}")
        return null


def metadatagen_response_parser(response) -> List[Tuple[str, float]]:
    """Parse response from the MetadataGen API"""
    result = response.json()
    predictions = []
    # The MetadataGen API returns a list of skills with concept URLs and names
    if isinstance(result, list):
        for index, item in enumerate(result):
            if isinstance(item, dict) and "name" in item and "conceptUrl" in item:
                skill_uri = item["conceptUrl"]
                # Here we assume descending scores based on index
                score = 1.0 - (index / len(result))  # Normalize score to [0, 1]
                de_skill_name = get_esco_skill_name(skill_uri, 'de', 'v1.2.0')
                predictions.append((de_skill_name, score))
            else:
                # Handle unexpected item format
                print("Unexpected item format in response:", item)
                continue
    else:
        # If the response is not a list, handle it gracefully
        print("Unexpected response format:", result)
        return []
    
    predictions.sort(key=lambda x: x[1], reverse=True)  # Sort by score descending
    return predictions
    
    # Try different common response formats
    if "predictions" in result:
        for item in result["predictions"]:
            if isinstance(item, dict) and "skill" in item and "score" in item:
                predictions.append((item["skill"], float(item["score"])))
            elif isinstance(item, dict) and "name" in item and "score" in item:
                predictions.append((item["name"], float(item["score"])))
    elif "skills" in result and "scores" in result:
        skills = result["skills"]
        scores = result["scores"]
        predictions = [(skill, float(score)) for skill, score in zip(skills, scores)]
    elif isinstance(result, list):
        for item in result:
            if isinstance(item, dict):
                if "skill" in item and "score" in item:
                    predictions.append((item["skill"], float(item["score"])))
                elif "name" in item and "score" in item:
                    predictions.append((item["name"], float(item["score"])))
    
    return predictions

# API Configuration Examples
# Customize these configurations based on your actual APIs

# Your competence analyser API configuration
COMPETENCE_ANALYSER_CONFIG = APIConfig(
    name="competence_analyser",
    base_url="https://lab.dlc.sh/competence-analyser",
    endpoint="/v2/chatsearch",
    headers={"Content-Type": "application/json"},
    auth_token=None,  # Add your token if needed
    request_format="json",
    response_format="json",
    max_requests_per_second=2.0,  # Be respectful to the API
    timeout=60.0,  # Longer timeout for complex processing
    custom_request_formatter=competence_analyser_request_formatter,
    custom_response_parser=competence_analyser_response_parser
)

# MetadataGen API configuration
METADATAGEN_API_CONFIG = APIConfig(
    name="metadataGen",
    base_url="http://host.docker.internal",
    endpoint="/get_esco_suggestions",
    headers={"Content-Type": "application/json"},
    auth_token=None,  # Add their token if needed
    request_format="json",
    response_format="json",
    max_requests_per_second=2.0,  # Be respectful to external APIs
    timeout=45.0,
    custom_request_formatter=metadatagen_request_formatter,
    custom_response_parser=metadatagen_response_parser
)

# Additional API configurations can be added here
API_CONFIGS = {
    "competence_analyser": COMPETENCE_ANALYSER_CONFIG,
    "metadata_gen": METADATAGEN_API_CONFIG,
}

## Setup evaluation data

Only execute one of the following cells to either setup a GRETA or ESCO evalution dataset.

Setup for GRETA Evaluation

In [None]:
resultsfile = 'evalGretaModelResults.json'

# load greata csv file to pandas dataframe GRETA-Kompetenzmodell_v2.csv
greta_pd = pd.read_csv('./data/GRETA/sources/GRETA-Kompetenzmodell_v2.csv', sep=';', encoding='utf-8')

# Create new column that combines the columns "Kompetenzfacetten", "Kompetenzaspekte", "Kompetenzbereiche", "Kompetenzanforderungen", "Kompetenzbeschreibung"
# greta_pd['page_content'] = "Kompetenz: " + greta_pd['Kompetenzfacette'] + '/n gehört zu /n Kompetenzaspekt: ' + greta_pd['Kompetenzaspekt'] + ', Kompetenzbereich: ' + greta_pd['Kompetenzbereich'] + ',/n Kompetenzanforderungen: ' + greta_pd['Kompetenzanforderungen'] + ', Kompetenzbeschreibung: ' + greta_pd['Kompetenzbeschreibung']
greta_pd['page_content'] = greta_pd['Kompetenzfacette'] + ',/nKompetenzanforderungen: ' + greta_pd['Kompetenzanforderungen']
# Only get page_content and Kompetenzfacette columns
greta_pd = greta_pd[['page_content', 'Kompetenzfacette']]
# Rename Kompetenzfacette to title
greta_pd = greta_pd.rename(columns={'Kompetenzfacette': 'title'})

# Get the evaluation data
with open('./data/GRETA/validated_greta_240704.json', 'r', encoding='utf-8') as fIn:
    data = json.load(fIn)

# Load the data into the DataFrameLoader
loader = DataFrameLoader(greta_pd, page_content_column="page_content")
documents = loader.load()
corpus = {i: d['title'] for i, d in enumerate(greta_pd.to_dict('records'))}
queries = {i: d['query'] for i, d in enumerate(data)}

relevant_docs = {}
for i, d in enumerate(data):
    relevant_docs[i] = []
    for doc in d['pos']:
        for j, c in corpus.items():
            if c == doc:
                relevant_docs[i].append(j)

Setup for ESCO Evaluation

In [None]:
resultsfile = 'evalESCOModelResults.json'

# Load texts from json file
skills = pd.read_csv("./data/ESCO/sources/skills_as_documents_v120.csv")

skills['description'] = skills['description'].fillna('')
skills['broaderHierarchyConcepts'] = skills['broaderHierarchyConcepts'].fillna('')
skills['broaderSkills'] = skills['broaderSkills'].fillna('')
skills['narrowerSkills'] = skills['narrowerSkills'].fillna('')
skills['isEssentialForOccupations'] = skills['isEssentialForOccupations'].fillna('')
skills['isOptionalForOccupations'] = skills['isOptionalForOccupations'].fillna('')
skills['isEssentialForSkills'] = skills['isEssentialForSkills'].fillna('')
skills['isOptionalForSkills'] = skills['isOptionalForSkills'].fillna('')

# Create a new column that combines preferredLabel and description.
skills['page_content'] = skills['preferredLabel'] + " \n " + skills['description']

# Add a new column called 'taxonomy' with the value 'ESCO'.
skills['taxonomy'] = 'ESCO'

# remove row where page_content or title is empty
skills = skills[skills['page_content'].notna()]
skills = skills[skills['preferredLabel'].notna()]

skills = skills[['page_content', 'preferredLabel']]
# Are there rows with missing preferredLabel?
# Rename Kompetenzfacette to title
skills = skills.rename(columns={'preferredLabel': 'title'})

# Get the evaluation data
import json
with open('./data/ESCO/wisy_validated_240704.json', 'r', encoding='utf-8') as fIn:
    data = json.load(fIn)

# Load  the documents
loader = DataFrameLoader(skills, page_content_column="page_content")
documents = loader.load()
corpus = {i: d['title'] for i, d in enumerate(skills.to_dict('records'))}
queries = {i: d['query'] for i, d in enumerate(data)}
# relevant_docs = {i: [corpus.index(doc) for doc in d['pos']] for i, d in enumerate(data)}
relevant_docs = {}
for i, d in enumerate(data):
    relevant_docs[i] = []
    for doc in d['pos']:
        for j, c in corpus.items():
            if c == doc:
                relevant_docs[i].append(j)

## Setup Retrieval Evaluator

In [None]:
from retrieval_evaluator import RetrievalEvaluator
evaluator = RetrievalEvaluator(queries, corpus, relevant_docs, store_docs=documents)

In [None]:
modelresults = {}

In [None]:
modelresults["isy-finetuned"] = evaluator("isy-thl/multilingual-e5-base-course-skill-tuned", use_cached_db=True)

In [None]:
CACHE_DB = True # Use same vectorstore from previous run, because the embedding model did not change
modelresults["isy-finetuned-w-reranker"] = evaluator("isy-thl/multilingual-e5-base-course-skill-tuned", reranker_model_name="isy-thl/bge-reranker-base-course-skill-tuned")

In [None]:
CACHE_DB = False # Build new vectorstore
modelresults["all-MiniLM-L6-v2"] = evaluator("sentence-transformers/all-MiniLM-L6-v2")

In [None]:
# modelresults["bge_base"] = evaluator("BAAI/bge-base-en-v1.5")
# modelresults["bge_finetuned"] = evaluator("bge_finetuned_no_sync")
# modelresults["bge_greta_finetuned"] = evaluator("bge_greta_finetuned_no_sync")
# modelresults["bge_m3"] = evaluator("BAAI/bge-m3")
# modelresults["bge_m3_greta_finetuned"] = evaluator("bge_m3_greta_finetuned_no_sync")
# modelresults["bge_m3_finetuned"] = evaluator("bge_m3_finetuned_no_sync")
# modelresults["snowflake-arctic-embed-l"] = evaluator("Snowflake/snowflake-arctic-embed-l")
# modelresults["multilingual-e5-base"] = evaluator("intfloat/multilingual-e5-base", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["multilingual_e5_greta_finetuned"] = evaluator("multilingual_e5_greta_finetuned_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["multilingual_e5_finetuned"] = evaluator("multilingual_e5_finetuned_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["multilingual_e5_m3_finetuned"] = evaluator("multilingual_e5_m3_finetuned_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["multilingual_finetuned_esco6000"] = evaluator("multilingual_finetuned_esco6000_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["multilingual_finetuned_esco1500"] = evaluator("multilingual_finetuned_esco1500_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# modelresults["mixed_multilingual_finetuned"] = evaluator("mixed_multilingual_finetuned_no_sync", query_instruction="query: ", embed_instruction="passage: ")
# CACHE_DB = False
# modelresults["bge_reranker_finetuned"] = evaluator("multilingual_e5_finetuned_no_sync", reranker_model_name="bge_reranker_finetuned_no_sync")
# CACHE_DB = True
# modelresults["bge_reranker_greta_finetuned"] = evaluator("multilingual_e5_finetuned_no_sync", reranker_model_name="bge_reranker_greta_finetuned_no_sync")
# CACHE_DB = True
# modelresults["bge_reranker_skillfit"] = evaluator("multilingual_e5_finetuned_no_sync", reranker_model_name="pascalhuerten/bge_reranker_skillfit")
# CACHE_DB = False
# modelresults["instructor-base"] = evaluator("hkunlp/instructor-base")

# API Testing Section

Now we can test APIs alongside the embedding models. Configure your API endpoints above and run the cells below.

In [None]:
# Test API connectivity before running full evaluation
def test_api_connectivity(api_config: APIConfig, test_query: str = "Python programming"):
    """Test if an API is accessible and returns valid responses"""
    try:
        client = APIClient(api_config)
        predictions = client.predict(test_query, top_k=5)
        
        print(f"✅ API '{api_config.name}' is accessible")
        print(f"   Base URL: {api_config.base_url}")
        print(f"   Test query: '{test_query}'")
        print(f"   Returned {len(predictions)} predictions:")
        
        for i, (skill, score) in enumerate(predictions[:3], 1):
            print(f"   {i}. {skill} (score: {score:.4f})")
        
        if len(predictions) > 3:
            print(f"   ... and {len(predictions) - 3} more")
            
        return True
        
    except Exception as e:
        print(f"❌ API '{api_config.name}' failed connectivity test:")
        print(f"   Error: {str(e)}")
        return False

# Test connectivity for all configured APIs
print("Testing API connectivity...")
print("=" * 50)

api_test_results = {}
for api_name, api_config in API_CONFIGS.items():
    print(f"\nTesting {api_name}:")
    api_test_results[api_name] = test_api_connectivity(api_config)

print("\n" + "=" * 50)
print("API Connectivity Summary:")
for api_name, is_working in api_test_results.items():
    status = "✅ Working" if is_working else "❌ Failed"
    print(f"  {api_name}: {status}")

In [None]:
# Run API evaluations
# Only run evaluations for APIs that passed connectivity tests
for api_name, api_config in API_CONFIGS.items():
    if api_name in modelresults:
        print(f"Skipping {api_name} - already evaluated")
        continue
    print(f"\n🚀 Starting evaluation for {api_name}...")
    try:
        modelresults[api_name] = evaluator(api_config=api_config)
        print(f"✅ Completed evaluation for {api_name}")
    except Exception as e:
        # Print error and stack trace
        print(f"❌ Evaluation failed for {api_name}: {str(e)}")
        import traceback
        traceback.print_exc()
        modelresults[api_name] = None

## Save the results to a file or merge with existing results

In [None]:
import json
import os

# Read the existing data
existingresults = {}
if os.path.exists(resultsfile):
    with open(resultsfile, 'r', encoding='utf-8') as fIn:
        existingresults = json.load(fIn)
# Merge the two dictionaries
for model, results in modelresults.items():
    existingresults[model] = results

# Write the new dictionary back to the file
with open(resultsfile, 'w', encoding='utf-8') as fOut:
    json.dump(existingresults, fOut, indent=4)

modelresults = existingresults

## Display the results as a table

In [None]:
# compare results as a table
import pandas as pd

# compare results as a table
import pandas as pd

def get_result_df(modelresults):
    results = pd.DataFrame({
        'Model': list(modelresults.keys()),
        'accuracy@1': [modelresults[model]['accuracy@1'] for model in modelresults],
        'accuracy@3': [modelresults[model]['accuracy@3'] for model in modelresults],
        'accuracy@5': [modelresults[model]['accuracy@5'] for model in modelresults],
        'accuracy@10': [modelresults[model]['accuracy@10'] for model in modelresults],
        'precision@1': [modelresults[model]['precision@1'] for model in modelresults],
        'precision@3': [modelresults[model]['precision@3'] for model in modelresults],
        'precision@5': [modelresults[model]['precision@5'] for model in modelresults],
        'precision@10': [modelresults[model]['precision@10'] for model in modelresults],
        'recall@1': [modelresults[model]['recall@1'] for model in modelresults],
        'recall@3': [modelresults[model]['recall@3'] for model in modelresults],
        'recall@5': [modelresults[model]['recall@5'] for model in modelresults],
        'recall@10': [modelresults[model]['recall@10'] for model in modelresults],
        'ndcg@10': [modelresults[model]['ndcg@10'] for model in modelresults],
        'mrr@10': [modelresults[model]['mrr@10'] for model in modelresults],
        'map@100': [modelresults[model]['map@100'] for model in modelresults],
        # 'avg_time_per_1000_chars': [modelresults[model]['avg_time_per_1000_chars'] for model in modelresults],
        'avg_time_per_query': [modelresults[model]['avg_time_per_query'] for model in modelresults],
        # 'total_time': [modelresults[model]['total_time'] for model in modelresults]
    })
    return results

# Filter modelresults for these modelnames in that order
filtered_results = modelresults
# filter_models = ['instructor-base', 'instructor-large', 'instructor-skillfit', 'bge_base', 'bge_greta_finetuned', 'bge_finetuned', 'bge_m3', 'bge_m3_greta_finetuned', 'bge_m3_finetuned', 'multilingual-e5-base', 'multilingual_e5_greta_finetuned', 'multilingual_e5_finetuned', 'mle5f+bge_reranker_skillfit', 'mle5f+bge_reranker_greta_finetuned', 'mle5f+bge_reranker_finetuned']
# filter_models = ['intfloat/multilingual-e5-base', 'isy-thl/multilingual-e5-base-course-skill-tuned', 'isy-thl/bge-reranker-base-course-skill-tuned']
# filtered_results = {model: modelresults[model] for model in filter_models}
results = get_result_df(filtered_results)

# filter_models = ['instructor-base', 'instructor-large', 'instructor-skillfit', 'bge_base', 'bge_greta_finetuned', 'bge_finetuned', 'bge_m3', 'bge_m3_greta_finetuned', 'bge_m3_finetuned', 'multilingual-e5-base', 'multilingual_e5_greta_finetuned', 'multilingual_e5_finetuned', 'bge_reranker_skillfit', 'bge_reranker_greta_finetuned', 'bge_reranker_finetuned']


# def highlight_max(s):
#     '''
#     Highlight the maximum in a Series yellow.
#     '''
#     is_max = s == s.max()
#     return ['background-color: yellow' if v else '' for v in is_max]

def grade_by_rank(s):
    # skip if column Model
    if s.name == 'Model':
        return ['' for v in s]
    # Get count of values
    count = len(s)
    reverse = False
    if 'time' in s.name:
        reverse = True
    ordered = s.sort_values(ascending=reverse)
    # Define a lighter green RGB
    good = (120, 225, 60)
    # Define a grey RGB
    bad = (230, 230, 230)
    colors = []
    if count == 1:
        # If there is only one value, color it grey
        colors.append('background-color: rgb(230,230,230)')
    else:
        for i, v in enumerate(ordered):
            # Linear interpolation (lerp) between red and light green
            r = int(good[0] + (bad[0] - good[0]) * (i / (count - 1)))
            g = int(good[1] + (bad[1] - good[1]) * (i / (count - 1)))
            b = int(good[2] + (bad[2] - good[2]) * (i / (count - 1)))
            colors.append(f'background-color: rgb({r},{g},{b})')
    
    # Make best color even more vibrant
    colors[0] = 'background-color: rgb(110, 235, 55)'
    
    # Assign colors to the original values based on their rank
    styles = [colors[ordered.index.get_loc(i)] for i in s.index]
    return styles


# Apply the function along the DataFrame's columns
styled_results = results.style.apply(grade_by_rank, axis=0)
        
styled_results