# Info

W skrócie:
- zrobiłem zestaw emebddingów na przedmiotach
- nowy przedmiot tranformuję na embedding
- szukam najbardziej podobnego przedmiotu w przygotowanym zestawie
- przydzielam klasę nowemu przedmiotowi wg tego najbardziej podobnego

Ze szczegółami:
- podzieliłem excela na dane test (1000 wierszy) i train (reszta, ok 3tys) tak żeby zachwoać proprocje w klasie `main`
- z danych `train` zrobiłem bazę embeddingów:
    - akapit tekstowy złożony z `supplier_name`, `supplier_reference_description` i `purchase_price`
    - model generujące embeddingi to klasyczny `sentence-transformers/all-mpnet-base-v2`
- dla każdego wiersza w danych `test`
    - tworzę analogiczny akapit tekstowy
    - w bazie mebeddingów wybieram najbardziej podbny wg metryki `cosine`
    - biorę predykcję klasy `main`
    - zawężam zestaw bazowy/treningowy do wierszy z podaną klasą `main`
    - szukam jeszcze raz najbardziej podobnego embeddingu i wybeiram klasę `sub`
    - zawężam  zestaw bazowy/treningowy do wierszy z podaną klasą `sub` i analogicznie szukam kalsy `details`
    - powtarzam zawężanie i szukanie aby znalaźeć ostatnią klasę `level4`

Metryka poprawności klasyfikacji:
- odsetek poprawnie zaklasyfikowanych przedmiotó ze zbioru `test`

Ograniczenia, błędy:
- zbiór bazowy/treningowy musi być aktualny w sotsunku do nowych przedmiotów

# Importy

In [None]:
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
from sentence_transformers import SentenceTransformer
from numpy import dot, argmax
from numpy.linalg import norm
from tqdm import tqdm


pio.templates.default = "plotly_dark"

# Parametry

In [None]:
MAIN_CLASSES = [
    "Furniture",
    "Lighting",
    "Home Textiles",
    "Tableware",
    "Decoration",
    "Flowers & Plants"
]
TEST_ROWS = 1000

# Utils

In [None]:
def train_test_split(raw_df: pd.DataFrame):
    # fill na
    df = raw_df[raw_df["main"].isin(MAIN_CLASSES)]
    for col in ["main", "sub", "detail", "level4"]:
        df[col] = df[col].fillna("Unspecified")
    
    ratios = df["main"].value_counts(normalize=True).to_dict()

    df = df.sample(len(df)) # shuffle data
    test_df = pd.DataFrame()


    for main_class, ratio in ratios.items():
        new_df = df[df["main"] == main_class].sample(int(TEST_ROWS*ratio))
        test_df = pd.concat([test_df, new_df])

    if len(test_df) < TEST_ROWS:
        diff = TEST_ROWS - len(test_df)
        test_df = pd.concat([
            test_df,
            df[~(df["item_id"].isin(test_df["item_id"]))].sample(diff)
        ])

    train_df = df[~(df["item_id"].isin(test_df["item_id"]))]

    return test_df, train_df

In [None]:
def get_embedder(model_id: str) -> SentenceTransformer:
    match model_id:
        case "sentence-transformers/all-mpnet-base-v2":
            return SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
        case _:
            raise ValueError

In [None]:
def generate_embedding_from_text(
        model: SentenceTransformer,
        text_data: list[str]
) -> list[list[float]]:
    results = []
    for x in tqdm(text_data):
        embedding = model.encode([x])[0]
        results.append(embedding)
    return results

In [None]:
def row_to_text_input(df: pd.DataFrame, i: int) -> str:
    text = f"""
    Supplier name = {df["supplier_name"].iloc[i]}
    Product name = {df["supplier_reference_description"].iloc[i]}
    Product price = {df["purchase_price"].iloc[i]}
    """
    return text

In [None]:
def cosine_sim(a, b) -> float:
    return float(dot(a, b)/(norm(a)*norm(b)))

In [None]:
def generate_ratio_df(errors_df: pd.DataFrame, test_df: pd.DataFrame, col: str):
    error_ratios = errors_df[col].value_counts(normalize=True).reset_index().rename(columns={"proportion": "ratio_in_errors"})
    test_ratios = test_df[col].value_counts(normalize=True).reset_index().rename(columns={"proportion": "ratio_in_tests"})
    ratios_df = pd.merge(
        left=error_ratios,
        right=test_ratios,
        on=col,
        how="right"
    ).round(2).fillna(0)
    ratios_df["diff"] = ratios_df["ratio_in_errors"] - ratios_df["ratio_in_tests"]
    print(f'r Pearson Correlation = {round(ratios_df[["ratio_in_errors", "ratio_in_tests"]].corr()["ratio_in_tests"].iloc[0], 3)}')
    return ratios_df

# Predykcje

In [None]:
raw_df = pd.read_csv("../resources/item data 2026_AW(Sheet1).csv", sep=",")

In [None]:
embedder = get_embedder("sentence-transformers/all-mpnet-base-v2")

In [None]:
test_df, train_df = train_test_split(raw_df)

## Embeddingi treninogwe / bazowe

In [None]:
text_inputs = [
    row_to_text_input(train_df, i)
    for i in range(len(train_df))
]
base_embeddings = generate_embedding_from_text(
    model=embedder,
    text_data=text_inputs
)
train_df["embedding"] = base_embeddings

## Embeddingi "nowych" przedmiotów

In [None]:
text_inputs = [
    row_to_text_input(test_df, i)
    for i in range(len(test_df))
]
test_embeddings = generate_embedding_from_text(
    model=embedder,
    text_data=text_inputs
)

## Znajdź najbardziej podobne przedmioty

In [None]:
pred_main, pred_sub, pred_detail, pred_level4 = [], [], [], []
for test_idx in tqdm(range(len(test_df))):
    embedding = test_embeddings[test_idx]

    # main prdiction
    sim_scores = [cosine_sim(embedding, x) for x in base_embeddings]
    best_idx = argmax(sim_scores)
    main = train_df["main"].iloc[best_idx]

    # sub prediction
    train_df_selected = train_df[train_df["main"] == main]
    base_embeddings_selected = train_df_selected["embedding"].to_list()
    sim_scores = [cosine_sim(embedding, x) for x in base_embeddings_selected]
    best_idx = argmax(sim_scores)
    sub = train_df_selected["sub"].iloc[best_idx]

    # detail prediction
    train_df_selected = train_df_selected[train_df_selected["sub"] == sub]
    base_embeddings_selected = train_df_selected["embedding"].to_list()
    sim_scores = [cosine_sim(embedding, x) for x in base_embeddings_selected]
    best_idx = argmax(sim_scores)
    detail = train_df_selected["detail"].iloc[best_idx]

    # detail prediction
    train_df_selected = train_df_selected[train_df_selected["detail"] == detail]
    base_embeddings_selected = train_df_selected["embedding"].to_list()
    sim_scores = [cosine_sim(embedding, x) for x in base_embeddings_selected]
    best_idx = argmax(sim_scores)
    level4 = train_df_selected["level4"].iloc[best_idx]
    
    pred_main.append(main)
    pred_sub.append(sub)
    pred_detail.append(detail)
    pred_level4.append(level4)

test_df["pred_main"] = pred_main
test_df["pred_sub"] = pred_sub
test_df["pred_detail"] = pred_detail
test_df["pred_level4"] = pred_level4


## Oszacuj jakość

In [None]:
test_n = len(test_df)
main_success_ratio = len(test_df[test_df["main"] == test_df["pred_main"]]) / test_n
sub_success_ratio = len(test_df[test_df["sub"] == test_df["pred_sub"]]) / test_n
detail_success_ratio = len(test_df[test_df["detail"] == test_df["pred_detail"]]) / test_n
level4_success_ratio = len(test_df[test_df["level4"] == test_df["pred_level4"]]) / test_n
total_success_ratio = len(
    test_df[(
        (test_df["main"] == test_df["pred_main"])
        & (test_df["sub"] == test_df["pred_sub"])
        & (test_df["detail"] == test_df["pred_detail"])
        & (test_df["level4"] == test_df["pred_level4"])
    )]
) / test_n

print("main_success_ratio = ", round(main_success_ratio, 3))
print("sub_success_ratio = ", round(sub_success_ratio, 3))
print("detail_success_ratio = ", round(detail_success_ratio, 3))
print("level4_success_ratio = ", round(level4_success_ratio, 3))
print("total_success_ratio = ", round(total_success_ratio, 3))

## Wizualziacja

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Bar(
        # orientation="h",
        x=[
            "main",
            "sub",
            "detail",
            "level4",
            "total"
        ],
        y=[
            main_success_ratio,
            sub_success_ratio,
            detail_success_ratio,
            level4_success_ratio,
            total_success_ratio
        ],
        text=[
            main_success_ratio,
            sub_success_ratio,
            detail_success_ratio,
            level4_success_ratio,
            total_success_ratio
        ],
        marker_color=[
            "silver", "silver", "silver","silver", "teal"
        ]
    )
)

fig.update_layout(
    title="Successfull predictions",
    width=1000,
    height=600
)

fig.show(renderer="notebook")

# Analiza błędów

In [None]:
errors_df = test_df[~(
    (test_df["main"] == test_df["pred_main"])
    & (test_df["sub"] == test_df["pred_sub"])
    & (test_df["detail"] == test_df["pred_detail"])
    & (test_df["level4"] == test_df["pred_level4"])
)]

## Błędy

In [None]:
for i in range(len(errors_df)):
    real_class = f'{errors_df["main"].iloc[i]} / {errors_df["sub"].iloc[i]} / {errors_df["detail"].iloc[i]} / {errors_df["level4"].iloc[i]}'
    pred_class = f'{errors_df["pred_main"].iloc[i]} / {errors_df["pred_sub"].iloc[i]} / {errors_df["pred_detail"].iloc[i]} / {errors_df["pred_level4"].iloc[i]}'
    print(f"Real = {real_class}\nPred = {pred_class}\n\n")

## Reprezentatywność klas - korelacja proprocji klas w danych z błędami do danych testowych
- im wieskza, tym bardziej podobne proprocej klas

In [None]:
generate_ratio_df(errors_df, test_df, "main")

In [None]:
generate_ratio_df(errors_df, test_df, "sub")

In [None]:
generate_ratio_df(errors_df, test_df, "detail")

In [None]:
generate_ratio_df(errors_df, test_df, "level4")