# Install requirements

In [None]:
%pip install pandas requests tqdm pyarrow seaborn matplotlib scikit-learn-intelex

In [None]:
import requests
import pandas as pd
from tqdm import tqdm
import re
from typing import Literal
from concurrent.futures import ThreadPoolExecutor
import pyarrow as pa
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt

In [None]:
# please add your API keys
HUGGINGFACE_API_KEY = ""
MISTRAL_API_KEY = ""
OPENAI_API_KEY = ""
DEEPINFRA_API_KEY = ""

### data preprocessing

In [None]:
from collections import defaultdict
from typing import Optional


def read_csv_alt_dict(filename: str, main_column: str, columns_to_join: list[str], joined_column_name: str, remove_null_rows: Optional[bool] = False) -> dict[str, list[str]]:
    # read csv file, splitting the strings at commas and making a list
    df = pd.read_csv(filename, converters={col: lambda x: x.split(',') for col in columns_to_join})
    # joining the lists in the columns into one
    df[joined_column_name] = df[columns_to_join].apply(lambda row: sum(row, []), axis=1)
    # only keep those columns
    df = df[[main_column, joined_column_name]]
    # only keep items in the list that are not 'NULL'
    df[joined_column_name] = df[joined_column_name].apply(lambda x: [item for item in x if item != 'NULL'])
    # remove any empty rows if so desired
    if remove_null_rows:
        df = df[df[joined_column_name].apply(bool)]

    # Create a dictionary of names and alternative names
    name_dict = defaultdict(list)
    for _, row in df.iterrows():
        name_dict[row[main_column]].append(row[main_column])  # Add the main name itself
        if isinstance(row[joined_column_name], list):
            for alt in row[joined_column_name]:
                if alt != 'None':
                    name_dict[row[main_column]].append(alt)

    return dict(name_dict)

In [None]:
def preprocess_sentence(input_sentences: list[str]) -> list[str]:
    classes_semantically_equal = {
        "holding": ["holding", "brandishing", "carrying", "playing", "cradling", "supporting", "pouring", "drawing",
                    "picking", "touching", "containing", "carrying", "drawing_out", "raising", "removing", "collecting",
                    "hanging_on", "shouldering"],
        "wearing": ["wearing", "covered_with"],
        "resting_on": ["resting_on", "reclining_on", "leaning_on", "setting_on", "set_on"],
        "seated_on": ["sitting_on", "sitting", "seated_in", "seated_on", "riding", "riding_on"],
        "grasping": ["grasping", "scooping", "reach_out", "plucking", "clasping", "strangling", "placing"],
        "standing": ["standing", "standing_on", "standing_in", "driving"],
    }

    persons_semantically_equal = read_csv_alt_dict("nlp_list_person.csv", "name", ["alternativenames", "typos"], "alt_names", True)

    # combine both dicts
    semantically_equal_words: dict[str, list[str]] = dict(**classes_semantically_equal, **persons_semantically_equal)

    processed_sentences = []
    for sentence in input_sentences:
        for main_word, alt_word_list in semantically_equal_words.items():
            for alt_word in alt_word_list:
                # replace all alt_words with main_word
                sentence = re.sub(r'\b' + re.escape(alt_word) + r'\b', main_word, sentence, flags=re.IGNORECASE)
        processed_sentences.append(sentence)

    return processed_sentences

### functions for embedding retrieval

In [None]:
# embeddings for hugging face inference API
def get_hf_embeddings(input_sentences: list[str], model_id: str, api_key: str) -> pd.DataFrame:
    # post request to hf
    def get_embeddings_batch(sentence_batch: list[str]) -> pd.DataFrame:
        response: list[list[float]] = requests.post(
            api_url,
            headers=headers,
            json={
                "inputs": sentence_batch,
                "options": {"wait_for_model": True}
            }
        ).json()
        df = pd.DataFrame({
            'sentence': sentence_batch,
            'embedding': response
        })
        return df

    BATCH_SIZE = 16
    api_url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_id}"
    headers = {"Authorization": f"Bearer {api_key}"}
    processed_sentences = preprocess_sentence(input_sentences)

    all_embeddings = pd.DataFrame()
    for i in tqdm(range(0, len(processed_sentences), BATCH_SIZE), miniters=500, disable=True):
        batch = processed_sentences[i:i + BATCH_SIZE]
        embeddings_batch = get_embeddings_batch(batch)
        all_embeddings = pd.concat([all_embeddings, embeddings_batch], ignore_index=True)

    return all_embeddings

In [None]:
# embeddings for mistral and openAI
def get_embeddings_universal(input_sentences: list[str], model_id: str, api_url: str, api_key: str) -> pd.DataFrame:
    # post request
    def get_embeddings_batch(sentence_batch: list[str]) -> pd.DataFrame:
        response = requests.post(
            api_url,
            headers=headers,
            json = {
                "model": model_id,
                "input": sentence_batch,
                "encoding_format": "float"
            }
        ).json()
        data = response.get("data") or []

        df = pd.DataFrame({
            'sentence': sentence_batch,
            'embedding': [item.get("embedding", 0) for item in data]
        })
        return df

    BATCH_SIZE = 16
    headers = {"Authorization": f"Bearer {api_key}"}
    processed_sentences = preprocess_sentence(input_sentences)

    all_embeddings = pd.DataFrame()
    for i in tqdm(range(0, len(processed_sentences), BATCH_SIZE), miniters=500, disable=True):
        batch = processed_sentences[i:i + BATCH_SIZE]
        embeddings_batch = get_embeddings_batch(batch)
        all_embeddings = pd.concat([all_embeddings, embeddings_batch], ignore_index=True)

    return all_embeddings

### methods for calculating embeddings using multiple models

In [None]:
def calculate_embeddings(sentences: list[str],
                         model: Literal[
                             "openai_3_small",
                             "openai_3_large",
                             "minilm-l12-v2",
                             "mpnet_base_v2",
                             "mistral-embed",
                             "baai_bge_large_v1.5",
                             "embedding_e5_large_v2"
                            ]
                        ) -> pd.DataFrame:
    match model:
        case "openai_3_small":
            model_id = "text-embedding-3-small"
            api_url = "https://api.openai.com/v1/embeddings"
            openai_small_embeddings = get_embeddings_universal(sentences, model_id, api_url, OPENAI_API_KEY)
            return openai_small_embeddings
        case "openai_3_large":
            model_id = "text-embedding-3-large"
            api_url = "https://api.openai.com/v1/embeddings"
            openai_large_embeddings = get_embeddings_universal(sentences, model_id, api_url, OPENAI_API_KEY)
            return openai_large_embeddings
        case "minilm-l12-v2":
            model_id = "sentence-transformers/all-MiniLM-L12-v2"
            miniLM_embeddings = get_hf_embeddings(sentences, model_id, HUGGINGFACE_API_KEY)
            return miniLM_embeddings
        case "mpnet_base_v2":
            model_id = "sentence-transformers/all-mpnet-base-v2"
            mpnet_base_v2_embeddings = get_hf_embeddings(sentences, model_id, HUGGINGFACE_API_KEY)
            return mpnet_base_v2_embeddings
        case "mistral-embed":
            model_id = "mistral-embed"
            api_url = "https://api.mistral.ai/v1/embeddings"
            mistral_embeddings = get_embeddings_universal(sentences, model_id, api_url, MISTRAL_API_KEY)
            return mistral_embeddings
        case "baai_bge_large_v1.5":
            model_id = "BAAI/bge-large-en-v1.5"
            api_url = "https://api.deepinfra.com/v1/openai/embeddings"
            bge_large_embeddings = get_embeddings_universal(sentences, model_id, api_url, DEEPINFRA_API_KEY)
            return bge_large_embeddings
        case "embedding_e5_large_v2":
            model_id = "intfloat/e5-large-v2"
            api_url = "https://api.deepinfra.com/v1/openai/embeddings"
            e5_large_embeddings = get_embeddings_universal(sentences, model_id, api_url, DEEPINFRA_API_KEY)
            return e5_large_embeddings


In [None]:
# test a specific model
e5_large_v2_result = calculate_embeddings(coin_desc['sentence'].iloc[0:1].to_list(), "embedding_e5_large_v2")

print(e5_large_v2_result)

In [None]:
from itertools import batched

def query_all_models(sentences: list[str]) -> pd.DataFrame:
    models = [
        "openai_3_small",
        "openai_3_large",
        "minilm-l12-v2",
        "mpnet_base_v2",
        "mistral-embed",
        "baai_bge_large_v1.5",
        "embedding_e5_large_v2"
    ]

    BATCH_SIZE = 256

    combined_df = pd.DataFrame({'sentence': sentences})
    batch_number = 1

    for sentence_batch in batched(sentences, BATCH_SIZE):
        print(f"processed batch {batch_number} out of {len(sentences)//BATCH_SIZE}")
        with ThreadPoolExecutor(max_workers=len(models)) as executor:
            # create dictionary with model and pd.DataFrame
            futures = {model: executor.submit(calculate_embeddings, sentence_batch, model) for model in models}
            # wait for the actual results
            results = {model: future.result() for model, future in futures.items()}

            # append to combined_df
            for model, df in results.items():
                combined_df[f"embedding_{model}"] = combined_df[f"embedding_{model}"].append(df['embedding']).reset_index(drop=True)

    return combined_df


### Import description sentences for each coin

In [None]:
# Einlesen der descriptions
coin_desc = pd.read_csv("data_descriptions_export.csv")
coin_desc.drop('Unnamed: 0', axis=1, inplace=True)
# coin_desc = coin_desc.drop('Unnamed: 0.1', axis=1)

In [None]:
coin_desc.head()

### calculate the embeddings

In [None]:
embeddings = query_all_models(coin_desc.to_list())

In [None]:
embeddings.head()

### export or import calculated embeddings to feather file

In [None]:
import pyarrow.feather as feather

feather.write_feather(embeddings, 'embeddings.feather')

In [None]:
embeddings = feather.read_feather('embeddings.feather')

### data evaluation

In [None]:
def get_top_similar_coins(query_strings: list[str], embeddings_df: pd.DataFrame, n=5) -> dict:
    # list of models
    models = [
        "openai_3_small",
        "openai_3_large",
        "minilm-l12-v2",
        "mpnet_base_v2",
        "mistral-embed",
        "baai_bge_large_v1.5",
        "embedding_e5_large_v2"
    ]

    query_embeddings = query_all_models(query_strings)

    results = {}

    for model in models:
        # extract embeddings for current model
        db_embeddings = np.stack(embeddings_df[f'embedding_{model}'].values)
        query_model_embeddings = np.stack(query_embeddings[f'embedding_{model}'].values)

        similarities = cosine_similarity(query_model_embeddings, db_embeddings)

        # find n nearest neighbors
        model_results = []
        for i, similarity_scores in enumerate(similarities):
            top_indices = np.argsort(similarity_scores)[-n:][::-1]
            top_similarities = similarity_scores[top_indices]

            query_results = []
            for idx, sim in zip(top_indices, top_similarities):
                query_results.append({
                    'query': query_strings[i],
                    'similar_coin': embeddings_df.iloc[idx]['sentence'],
                    'similarity_score': sim
                })
            model_results.append(query_results)

        results[model] = model_results

    return results

In [None]:
def results_to_dataframe(results: dict):
    data = []
    for model, model_results in results.items():
        for query_results in model_results:
            for rank, result in enumerate(query_results, 1):
                data.append({
                    'Model': model,
                    'Query': result['query'],
                    'Rank': rank,
                    'Similar Coin': result['similar_coin'],
                    'Similarity Score': result['similarity_score']
                })

    df = pd.DataFrame(data)
    df = df.sort_values(['Model', 'Query', 'Rank'])
    df['Similarity Score'] = df['Similarity Score'].apply(lambda x: f"{x:.4f}")

    return df


In [None]:
# example usage
query = ["Emperor holding weapon"]
similar_coins_dict = get_top_similar_coins(query, embeddings, n=3)
similar_coins_df = results_to_dataframe(similar_coins_dict)

similar_coins_df.head()

# evaluation

Evaluierung für den zuletzt verwendeten Prompt. (df)

In [None]:
# 1. Häufigkeit der Ergebnisse
ergebnis_haeufigkeit = df['Similar Coin'].value_counts()

print("Häufigkeit der Ergebnisse:")
print(ergebnis_haeufigkeit)

# 2. Identifizierung seltener Ergebnisse (z.B. weniger als 5% der häufigsten Antwort)
schwellenwert = ergebnis_haeufigkeit.max() * 0.3
seltene_ergebnisse = ergebnis_haeufigkeit[ergebnis_haeufigkeit <= schwellenwert]

print("\nSeltene Ergebnisse:")
print(seltene_ergebnisse)

# 3. Analyse der Modelle, die seltene Antworten liefern
seltene_antworten_modelle = df[df['Similar Coin'].isin(seltene_ergebnisse.index)].groupby('Model')['Similar Coin'].count()

print("\nModelle mit seltenen Antworten:")
print(seltene_antworten_modelle)

# 4. Visualisierung
plt.figure(figsize=(12, 6))
ergebnis_haeufigkeit.plot(kind='bar')
plt.title('Häufigkeit der Ergebnisse')
plt.xlabel('Ergebnis')
plt.ylabel('Anzahl')
plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 6))
seltene_antworten_modelle.plot(kind='bar')
plt.title('Modelle mit seltenen Antworten')
plt.xlabel('Modell')
plt.ylabel('Anzahl seltener Antworten')
plt.tight_layout()
plt.show()

Evaluierung für alle getätigten Prompts (combined_df)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Beispiel-Daten erstellen (entfernen, wenn tatsächliche Daten bereits vorliegen)
# df = pd.DataFrame({
#     'Query': ['Query1', 'Query1', 'Query2', 'Query2', 'Query3', 'Query3', 'Query1', 'Query2', 'Query3'],
#     'Model': ['Model1', 'Model2', 'Model1', 'Model3', 'Model2', 'Model1', 'Model1', 'Model2', 'Model3'],
#     'Similar Coin': ['CoinA', 'CoinB', 'CoinA', 'CoinC', 'CoinB', 'CoinD', 'CoinA', 'CoinB', 'CoinC']
# })

# Dictionary, um die seltenen Ergebnisse pro Modell für alle Queries zu speichern
gesamt_seltene_ergebnisse_pro_modell = {}

# Für jede eindeutige Query die Analyse durchführen
for query in df['Query'].unique():
    query_df = df[df['Query'] == query]

    # 1. Häufigkeit der Ergebnisse für diese Query
    ergebnis_haeufigkeit = query_df['Similar Coin'].value_counts()

    # 2. Identifizierung seltener Ergebnisse (z.B. weniger als 5% der häufigsten Antwort)
    schwellenwert = ergebnis_haeufigkeit.max() * 0.3
    seltene_ergebnisse = ergebnis_haeufigkeit[ergebnis_haeufigkeit <= schwellenwert]

    # 3. Bestimmen, welche Modelle die seltenen Ergebnisse geliefert haben
    seltene_ergebnisse_df = query_df[query_df['Similar Coin'].isin(seltene_ergebnisse.index)]
    seltene_ergebnisse_pro_modell = seltene_ergebnisse_df['Model'].value_counts()

    # Hinzufügen der seltenen Ergebnisse für diese Query zur Gesamtliste
    for model, count in seltene_ergebnisse_pro_modell.items():
        if model in gesamt_seltene_ergebnisse_pro_modell:
            gesamt_seltene_ergebnisse_pro_modell[model] += count
        else:
            gesamt_seltene_ergebnisse_pro_modell[model] = count

# Umwandlung des Gesamt-Dictionary in eine Series für die Visualisierung
gesamt_seltene_ergebnisse_pro_modell = pd.Series(gesamt_seltene_ergebnisse_pro_modell)

# Gesamtauswertung der Modelle
print("\nGesamte Anzahl der seltenen Ergebnisse pro Modell:")
print(gesamt_seltene_ergebnisse_pro_modell)

# Visualisierung der seltenen Ergebnisse pro Modell für alle Queries
plt.figure(figsize=(12, 6))
gesamt_seltene_ergebnisse_pro_modell.plot(kind='bar', color='orange')
plt.title('Gesamte Anzahl der seltenen Ergebnisse pro Modell für alle Queries')
plt.xlabel('Modell')
plt.ylabel('Anzahl der seltenen Ergebnisse')
plt.tight_layout()
plt.show()


# Visualizing

In [None]:
from collections import defaultdict

def get_top_n_things(things_dict: dict[str, list[str]], search_body: pd.DataFrame, n: int) -> list[str]:
    # Initialize a count dictionary
    count_dict = defaultdict(int)

    # Search for occurrences in the sentences
    for sentence in search_body['sentence']:
        for person, names in things_dict.items():
            sentence_words = sentence.split()
            if any(name in sentence_words for name in names):
                count_dict[person] += 1

    # Convert the count dictionary to a sorted list
    sorted_things = sorted(count_dict.items(), key=lambda item: item[1], reverse=True)

    return [item[0] for item in sorted_things[:n]]

In [None]:
def generate_grouping_row(things_to_highlight: list[str], alt_words_dict: dict[str, list[str]], search_body: pd.DataFrame, grouping_column_name: str) -> None:

    search_body[grouping_column_name] = 'other'
    # Update the person_grouping column based on occurrences in the sentences
    for index, row in search_body.iterrows():
        sentence = row['sentence']
        for main_word, alt_word_list in alt_words_dict.items():
            sentence_words = sentence.split()
            if any(alt_word in sentence_words for alt_word in alt_word_list) and main_word in things_to_highlight:
                # print([main_word, sentence])
                embeddings.at[index, grouping_column_name] = main_word
                break
    return

In [None]:
from matplotlib.axes import Axes
import numpy as np
# patch scikit-learn with intel's extension. Also compatible with AMD!
from sklearnex import patch_sklearn
patch_sklearn()

from sklearn.manifold import TSNE
import seaborn as sns
import matplotlib.pyplot as plt

def make_scatter_plot(search_body: pd.DataFrame, alt_words_dict: dict[str: list[str]], number_of_highlights: int, category: str, model: str) -> Axes:

    top_things = get_top_n_things(alt_words_dict, search_body, number_of_highlights)

    generate_grouping_row(top_things, alt_words_dict, embeddings, category)

    top_things.sort(key=lambda s: s.casefold())
    top_things.append('other')

    # Set the figure size
    plt.figure(figsize=(20, 10))

    default_palette = sns.color_palette("husl", n_colors=(len(top_things)))
    palette_colors = {k: 'gray' if k == "other" else default_palette[i] for i,k in enumerate(top_things)}


    # Create the t-SNE transformation
    tsne = TSNE(n_components=2, random_state=0).fit_transform(np.array(embeddings[model].to_list()))

    # Create a scatter plot
    ax = sns.scatterplot(x=tsne[:, 0], y=tsne[:, 1], alpha=1, hue=np.array(embeddings[category].to_list()), palette=palette_colors, hue_order=top_things, s=20)

    plt.legend(markerscale=2)
    sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))

    return ax

In [None]:
# dictionaries with important persons, objects, plants etc and their alternative names
persons_dict = read_csv_alt_dict('nlp_list_person.csv', 'name', ['alternativenames', 'typos'], 'alt_names')
objects_dict = read_csv_alt_dict('nlp_list_obj.csv', 'name_en', ['alternativenames_en', 'typos_en'], 'alt_names')
plants_dict = read_csv_alt_dict('nlp_list_plant.csv', 'name_en', ['alternativenames_en', 'typos_en'], 'alt_names')

In [None]:
# make plots for the embeddings colors based on persons, objects and so on

import matplotlib.ticker as ticker
models = [
    "openai_3_small",
    "openai_3_large",
    "minilm-l12-v2",
    "mpnet_base_v2",
    "mistral-embed",
    "baai_bge_large_v1.5",
    "embedding_e5_large_v2"
]

for thing_name, things_dict in [("persons", persons_dict), ("objects", objects_dict), ("plants", plants_dict)]:
    for model in models:
        ax = make_scatter_plot(embeddings, things_dict, 25, f"{thing_name}_grouping", f"embedding_{model}")
        ax.xaxis.set_major_locator(ticker.NullLocator())
        ax.yaxis.set_major_locator(ticker.NullLocator())
        plt.savefig(f"{thing_name}_{model}_scatterplot.png")
        print(f"saved plot for {thing_name}_{model}")