In [None]:
import requests
import pandas as pd
from dataclasses import dataclass
import warnings
import spacy
from typing import Protocol
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from typing import Type
from typing import Type
from functools import lru_cache


import unicodedata


warnings.filterwarnings("ignore")
pd.options.display.max_columns = 100


In [None]:
@dataclass
class MeLiClient:
    site: str

    def get_categories(self) -> pd.DataFrame:
        url = f"https://api.mercadolibre.com/sites/{self.site}/categories"
        return pd.DataFrame(requests.get(url).json())

    def get_items_in_category(self, category_id: str, offset=0) -> pd.DataFrame:

        url = f"https://api.mercadolibre.com/sites/{self.site}/search?category={category_id}&offset={offset}"
        request = requests.get(url)
        items = request.json()
        try:
            return pd.DataFrame(items["results"])
        except Exception:
            return pd.DataFrame()

    def get_all_items_in_category(self, category_id: str) -> pd.DataFrame:

        offset = 0
        results = []

        result = self.get_items_in_category(category_id)
        results.append(result)

        while len(result) > 0:
            offset += 50

            result = self.get_items_in_category(category_id, offset=offset)
            results.append(result)

        return pd.concat(results)


site = "MCO"
meli = MeLiClient(site)


In [None]:
categories = meli.get_categories()
categories.query("id=='MCO1000'")


In [None]:
# https://developers.mercadolibre.com.ar/es_co/atributos
# http://www.heikopaulheim.com/docs/swj2018_product_data.pdf

# items[items["attributes"].apply(lambda sku: 'DESCRIPTIVE_TAGS' in (x["id"] for x in sku))] # empty
# items[items["attributes"].apply(lambda sku: 'PRODUCT_FEATURES' in (x["id"] for x in sku))] # empty


In [None]:
def get_attribute_value(attributes: list, attribute_name: str):
    return next(
        (x["value_name"] for x in attributes if x["id"] == attribute_name),
        None,
    )


def build_dataset(
    meli: MeLiClient,
    category_id: str,
    features: list[str],
    attributes: list[str],
    save=True,
):

    items = meli.get_all_items_in_category(category_id)
    df = items[features]

    for attribute in attributes:
        df[attribute.lower()] = items.attributes.apply(
            get_attribute_value, attribute_name=attribute
        )

    df = df.drop(columns="attributes").dropna(how="all", axis=1).reset_index(drop=True)

    if save:
        df.to_csv(f"{category_id}_items.csv")

    return df


FEATURES = ["id", "title", "thumbnail", "domain_id", "attributes"]
INTERESTING_ATTRIBUTES = ["GTIN", "BRAND", "MODEL"]
category = "MCO1000"  # categories.id.sample().squeeze()
print(category)


df = build_dataset(meli, category, FEATURES, INTERESTING_ATTRIBUTES)
df


In [None]:
# Global Preprocessing Utils

nlp = spacy.load("es_core_news_md")


def preprocessor(text: str):
    text = unicodedata.normalize("NFKD", text)
    return text.lower().strip()


def tokenizer(text: str):
    doc = nlp(text)  # probably overkill using spacy for this but ok
    tokens = [word.lemma_ for word in doc if not word.is_stop and not word.is_punct]
    return tokens


In [None]:
class Embedding(Protocol):
    @classmethod
    def initialize(cls, corpus: list[str]) -> None:
        ...

    def similarity(self, other: "Embedding") -> float:
        ...


In [None]:
class SpacyEmbedding:
    def __init__(self, text: str):
        self.embedding = nlp(text)

    @classmethod
    def initialize(cls, corpus: list[str]) -> None:
        pass

    def similarity(self, other: "SpacyEmbedding") -> float:
        return self.embedding.similarity(other.embedding)

    def __repr__(self):
        return str(tuple(self.embedding))


In [None]:
class BagOfWordsEmbedding:
    def __init__(self, text: str):
        self.embedding = self.bow.transform([text])[0]
        self.tokens = tokenizer(text)

    @classmethod
    def initialize(cls, corpus: list[str]) -> None:
        cls.bow = CountVectorizer(tokenizer=tokenizer, preprocessor=preprocessor).fit(
            corpus
        )

    def similarity(self, other: "BagOfWordsEmbedding") -> float:
        return cosine_similarity(self.embedding, other.embedding)[0][0]

    def __repr__(self):
        return str(tuple(self.tokens))


In [None]:
class TFIDFEmbedding:
    def __init__(self, text: str):
        self.embedding = self.tfidf.transform([text])[0]
        self.tokens = tokenizer(text)

    @classmethod
    def initialize(cls, corpus: list[str]) -> None:
        cls.tfidf = TfidfVectorizer(tokenizer=tokenizer, preprocessor=preprocessor).fit(
            corpus
        )

    def similarity(self, other: "TFIDFEmbedding") -> float:
        return cosine_similarity(self.embedding, other.embedding)[0][0]

    def __repr__(self):
        return str(tuple(self.tokens))


In [None]:
def find_embeddings(df, embedding_type: Type[Embedding], embedding_cols: tuple[str]):

    embedding_type.initialize(df.title)
    for col in embedding_cols:
        df[f"{col}_embedding"] = df[col].apply(embedding_type)
    return df


EMBEDDING_COLS = ("title",)  # , "brand", "model"]
df = find_embeddings(df, BagOfWordsEmbedding, EMBEDDING_COLS)
df


In [None]:
def find_similarities(
    df: pd.DataFrame, embedding: Embedding, feature="title_embedding"
):
    similarities = df[feature].apply(embedding.similarity)
    return similarities


find_similarities(df, df.title_embedding.iloc[0])


In [None]:
def feature_exact_match(df: pd.DataFrame, feature: str, feature_value):
    return df[feature] == feature_value


feature_exact_match(df, "brand", "Hill's")


In [None]:
def find_similar_products(
    df: pd.DataFrame,
    product_id: str,
    embedding_feature="title_embedding",
    penalty_features=["brand", "domain_id"],
    penalty_value=0.1,
    threshold=0.7,
):
    details = df.copy()
    product = df.set_index("id").loc[product_id]
    details["similarity"] = find_similarities(
        df, product[embedding_feature], embedding_feature
    )
    details["raw_similarity"] = details.similarity.copy()

    for penalty_feature in penalty_features:
        is_exact_match = feature_exact_match(
            df, penalty_feature, product[penalty_feature]
        )
        details[f"{penalty_feature}_penalty"] = (
            ~is_exact_match * penalty_value if pd.notna(product[penalty_feature]) else 0
        )
        details["similarity"] -= details[f"{penalty_feature}_penalty"]

    products_above_threshold = details[details["similarity"] > threshold]
    similar_products = details.loc[products_above_threshold.index]
    return similar_products, details


PENALTY_FEATURES = ("brand", "domain_id", "model")
find_similar_products(df, "MCO618049088", penalty_features=PENALTY_FEATURES)[0]


In [None]:
def load_test_dataset(filename: str):
    test_data = pd.read_excel(filename, sheet_name=["test_set", "all"])
    test_set, all = test_data["test_set"], test_data["all"]
    return test_set, all


In [None]:
@dataclass(frozen=True)
class SimilarityConfig:
    test_filename: str
    embedding_type: Type[Embedding]
    embedding_feature: str
    penalty_features: tuple[str]
    penalty_value: float
    threshold: float


In [None]:
config = SimilarityConfig(
    test_filename="test_electronica.xlsx",
    embedding_type=BagOfWordsEmbedding,
    embedding_feature="title",
    penalty_features=PENALTY_FEATURES,
    penalty_value=0.05,
    threshold=0.7,
)


In [None]:
@lru_cache(16)
def build_testing_data(config: SimilarityConfig):
    test_set, df = load_test_dataset(config.test_filename)
    data = find_embeddings(df, config.embedding_type, [config.embedding_feature])
    return test_set, data


test_set, data = build_testing_data(config)


In [None]:
def test_product_matches(
    df: pd.DataFrame, product_id: str, cfg: SimilarityConfig, true_matches: list[str]
):
    matches, details = find_similar_products(
        df,
        product_id,
        penalty_features=cfg.penalty_features,
        embedding_feature=cfg.embedding_feature + "_embedding",
        penalty_value=cfg.penalty_value,
        threshold=cfg.threshold,
    )

    tp = matches[matches.id.isin(true_matches)]
    fp = matches[~matches.id.isin(true_matches)]
    fn = df[(df.id.isin(true_matches) & (~df.id.isin(matches.id)))]
    tn = df[(~df.id.isin(true_matches) & (~df.id.isin(matches.id)))]

    return tp, fp, fn, tn


test_id = test_set.test_id.iloc[0]
tm = test_set[test_set.test_id == test_id].id
test_product_matches(data, test_id, config, tm)


In [None]:
def get_precision_and_recall(
    true_positives: pd.DataFrame,
    false_positives: pd.DataFrame,
    false_negatives: pd.DataFrame,
    true_negatives: pd.DataFrame,
):
    tp, fp, fn, tn = (
        len(true_positives),
        len(false_positives),
        len(false_negatives),
        len(true_negatives),
    )
    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn)
    return precision, recall


get_precision_and_recall(*test_product_matches(data, test_id, config, tm))


In [None]:
def test_similarity_model(cfg: SimilarityConfig):
    print("building testing data...")
    test_set, data = build_testing_data(cfg.test_filename, config)
    test_products = test_set.test_id.unique()
    all_cf = []

    print("finding similar products...")
    for product_id in test_products:
        print(f"\ntesting {product_id}")
        true_matches = test_set[test_set.test_id == product_id].id
        confusion_matrix = test_product_matches(data, product_id, cfg, true_matches)
        precision, recall = get_precision_and_recall(*confusion_matrix)
        print(f"{precision=}")
        print(f"{recall=}")

        all_cf.append(confusion_matrix)

    print("finished\n")

    global_confusion_matrix = [pd.concat(df) for df in zip(*all_cf)]
    global_precision, global_recall = get_precision_and_recall(*global_confusion_matrix)
    print(f"{global_precision=}")
    print(f"{global_recall=}")

    return global_precision, global_recall


config = SimilarityConfig(
    embedding_type=BagOfWordsEmbedding,
    embedding_feature="title",
    penalty_features=PENALTY_FEATURES,
    penalty_value=0.15,
    threshold=0.7,
)
test_similarity_model("test_electronica.xlsx", config)
