In [105]:
import csv
from pathlib import Path
from typing import Any, NamedTuple, Optional
from typing import Tuple
import time

from google.api_core.exceptions import InvalidArgument
import numpy as np
import pandas as pd
import vertexai
from numpy import floating
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel

DATA_FOLDER = Path("../data")
INPUT_FOLDER = DATA_FOLDER / "input"
REGION = "europe-west3"
PROJECT_ID = "adesso-gcc-rtl-uc4"


class EmbeddingModelConfig(NamedTuple):
    name: str
    task_type: list[bool]


EMBEDDING_MODELS = [
    EmbeddingModelConfig("textembedding-gecko@001", [False]),
    EmbeddingModelConfig("textembedding-gecko@002", [True, False]),
    EmbeddingModelConfig("textembedding-gecko@003", [True, False]),
    EmbeddingModelConfig("textembedding-gecko-multilingual@001", [True, False]),
]

vertexai.init(project=PROJECT_ID, location=REGION)

In [106]:
def load_csv(file: Path, col_name: str) -> list[str]:
    """Load a CSV file and return a list of values from the given column."""
    with open(file, newline="") as f:
        reader = csv.DictReader(f)
        return [
            row[col_name].replace("ChromaDB:\n", "") for row in reader if row[col_name]
        ]


def rate_limit(max_per_minute):
    """Yield a generator that will pause for a calculated amount of time
    to avoid exceeding the rate limit."""
    period = 60 / max_per_minute
    while True:
        before = time.time()
        yield
        after = time.time()
        elapsed = after - before
        sleep_time = max(0, period - elapsed)
        if sleep_time > 0:
            time.sleep(sleep_time)


def encode_texts_to_embeddings(
    docs: list[str | TextEmbeddingInput],
    embedding_model: TextEmbeddingModel,
    use_task_type: bool,
    instances_per_batch: int = 5,
    requests_per_minute: int = 100,
) -> list[Optional[list[float]]]:
    """Get embeddings for a list of texts using the given embedding model.
    use_task_type will use the task type parameter in the API call."""

    if use_task_type:
        docs = [
            TextEmbeddingInput(text=sentence, task_type="RETRIEVAL_DOCUMENT")  # type: ignore
            for sentence in docs
        ]
    try:
        embeddings = []
        limiter = rate_limit(requests_per_minute)
        while docs:
            # Working in batches because the API accepts maximum 5
            # documents per request to get embeddings
            head, docs = (
                docs[:instances_per_batch],
                docs[instances_per_batch:],
            )
            chunk = embedding_model.get_embeddings(head)
            embeddings.extend(chunk)
            next(limiter)
        return [embedding.values for embedding in embeddings]
    except Exception as e:
        print(f"Error while getting embeddings: {e}")
        return [None for _ in range(len(docs))]


def get_distance(document: list[float], query_embedding: list[float]) -> floating[Any]:
    """Gets the L2 distance between the document and the query embedding."""
    return np.linalg.norm(np.array(document) - np.array(query_embedding))


def get_text_embedding_df(
    docs: list[str],
    query: str,
    embedding_models: list[EmbeddingModelConfig],
) -> pd.DataFrame:
    """Get the embeddings for the given documents and query using the given embedding models.
    Then calculate the distance between the query and each document embedding."""

    df = pd.DataFrame()
    df["result"] = docs

    for embedding_model in embedding_models:
        for use_task_type in embedding_model.task_type:
            try:
                embedding_model_name = "".join(embedding_model.name.split("-")[1:])
                task_type_infix = "_tasktype" if use_task_type else ""
                embedding_model_desc = f"{embedding_model_name}{task_type_infix}"

                model = TextEmbeddingModel.from_pretrained(embedding_model.name)

                # get query embedding
                if use_task_type:
                    query_textinput = TextEmbeddingInput(
                        text=query, task_type="RETRIEVAL_QUERY"
                    )  # type: ignore
                    query_embedding = model.get_embeddings([query_textinput])[0].values
                else:
                    query_embedding = model.get_embeddings([query])[0].values

                # get text embeddings
                df[embedding_model_desc] = encode_texts_to_embeddings(
                    docs,  # type: ignore
                    model,
                    use_task_type=use_task_type,
                )

                # get distance for each document embedding to the query embedding
                distance_col = f"{embedding_model_desc}_distance"
                df[distance_col] = df[embedding_model_desc].map(
                    lambda x: round(get_distance(x, query_embedding), 2)
                )

                # get order of the distance
                df[f"{embedding_model_desc}_order"] = (
                    df[distance_col].rank().astype(int)
                )
            except InvalidArgument as e:
                print(f"Error while getting embeddings for {embedding_model_desc}: {e}")
                continue
    return df

In [107]:
COL_NAME = "Semantic"


def create_result_file_from_csv(csv_file: Path, result_folder: Path) -> None:
    # query is in file name
    query = " ".join(csv_file.stem.split("_"))
    print(f"Processing {csv_file} with query: {query}")

    # load texts
    docs = load_csv(csv_file, COL_NAME)

    # get df with embeddings and distances to query
    df = get_text_embedding_df(docs, query, EMBEDDING_MODELS)
    df.to_parquet(result_folder / f"{csv_file.stem}.parquet", index=False)


def create_clean_csv_from_result_file(result_file: Path) -> None:
    df = pd.read_parquet(result_file)

    # drop all columns that don't contain "distance" or "order"
    df = df.loc[:, df.columns.str.contains("distance|order|result")]

    # copy index (+ 1) to new column "rank"
    original_rank_col = "original_rank"
    df[original_rank_col] = df.index + 1

    # sort texts by each distance order (distance) column
    for col in df.columns:
        if col.endswith("_order"):
            result_col = col.replace("_order", "")
            distance_col = f"{result_col}_distance"
            helper_df = df[
                ["result", original_rank_col, col, distance_col]
            ].sort_values(by=col)
            df[result_col + "_result"] = helper_df["result"].values
            df[distance_col] = helper_df[distance_col].values
            df[col] = helper_df[col].values

    df = df.reindex(sorted(df.columns), axis=1)
    df.to_csv(result_file.parent / (result_file.stem + ".csv"), index=False)

In [108]:
RESULT_FOLDER = DATA_FOLDER / "embedding_eval"

for csv_file in INPUT_FOLDER.glob("*.csv"):
    create_result_file_from_csv(csv_file, result_folder=RESULT_FOLDER)

for result_file in RESULT_FOLDER.glob("*.parquet"):
    create_clean_csv_from_result_file(result_file)

Processing ../data/input/Kinder_essen_in_deutschem_Kindergarten.csv with query: Kinder essen in deutschem Kindergarten
Processing ../data/input/Robert_Habeck_lachend.csv with query: Robert Habeck lachend
