In [9]:
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine
import os 
import psycopg
from implicit.als import AlternatingLeastSquares 
import scipy
import numpy as np
import mlflow

# load .env contstants
load_dotenv()

# gloabal vars upload
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net" #endpoint бакета от YandexCloud
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID") # получаем id ключа бакета, к которому подключён MLFlow, из .env
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY") 

EXPERIMENT_NAME = "e_commerce"
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000
REGISTRY_MODEL_NAME = "ALS_1STEP"
pip_requirements = '../config/requirements.txt'

# устанавливаем host, который будет отслеживать наши эксперименты
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")
mlflow.set_registry_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

# fix random seed for experiemnts reproduction
SEED = 42 
np.random.seed(seed=SEED)

connection = {"sslmode": "require", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.getenv("DB_DESTINATION_HOST"),
    "port": os.getenv("DB_DESTINATION_PORT"),
    "dbname": os.getenv("DB_DESTINATION_NAME"),
    "user": os.getenv("DB_DESTINATION_USER"),
    "password": os.getenv("DB_DESTINATION_PASSWORD"),
}

# Create a connection string
connection_string = (
    f"postgresql://{postgres_credentials['user']}:{postgres_credentials['password']}"
    f"@{postgres_credentials['host']}:{postgres_credentials['port']}/{postgres_credentials['dbname']}"
)

# Create a SQLAlchemy engine
engine = create_engine(connection_string)


def load(TABLE_NAME):
    connection.update(postgres_credentials)
    with psycopg.connect(**connection) as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT * FROM {TABLE_NAME}")
            data = cur.fetchall()
            columns = [col[0] for col in cur.description]
    df = pd.DataFrame(data, columns=columns)

    return df

  from .autonotebook import tqdm as notebook_tqdm


Обучение будет по категориям товаров,так как взаимодействия с атемами разряжены силь (EDA: <20% users interact >=2 )

In [10]:
from sklearn.preprocessing import LabelEncoder

events = load("events")
categories = load("item_categories")
events = events.merge(categories,how="left",on="itemid")

category_encoder = LabelEncoder()
user_encoder = LabelEncoder()
events["timestamp"] = pd.to_datetime(events['timestamp'], unit='ms')

user_encoder.fit(events["visitorid"])
category_encoder.fit(events['categoryid'])

events.loc[:, 'user_id_enc'] = user_encoder.transform(events['visitorid'])
events.loc[:, 'categoryid_enc'] = category_encoder.fit_transform(events['categoryid'])

Оставляем послении 4 недели на тест

In [21]:
train_test_global_time_split_date = pd.to_datetime("2015-09-18")

train_test_global_time_split_idx = events["timestamp"] < train_test_global_time_split_date
events_train = events[train_test_global_time_split_idx]
events_test = events[~train_test_global_time_split_idx]

add_to_cart = events_train[events_train["event"]=="addtocart"]
view_but_no_cart = events_train[(events_train["event"]!="addtocart") & (events_train["event"]=="view")]

Сохраним самыме популярные из добавленных в корзину

In [22]:
top_100_pop = events_train[events_train["event"]=="addtocart"].groupby("categoryid_enc").agg("count").sort_values("event",ascending=False).reset_index()[["categoryid_enc","event"]].loc[:99]
top_100_pop["rank"] = range(1,101)
top_100_pop.to_parquet("../models/production/top_popular.parquet")

Модель обучения

In [4]:
class ALSRecommender:
    def __init__(self, config, view_but_no_cart, add_to_cart, user_encoder, category_encoder, events_train, events):
        """
        Parameters:
        - config: Гиперпараметры модели (K, FACTORS, ITERATIONS, REGULARIZATION)
        - view_but_no_cart: Датафрейм где юзеры смотрели но не добавили
        - add_to_cart: Датафрэйм где юзеры добавили в корзину товары
        - user_encoder: Энкодер для юзеров
        - category_encoder: Энкодер категорий 
        - events_train: Обучаемая выборка
        - events: Все евенты там есть где были покупки
        """
        self.config = config
        self.view_but_no_cart = view_but_no_cart
        self.add_to_cart = add_to_cart
        self.user_encoder = user_encoder
        self.category_encoder = category_encoder
        self.events_train = events_train
        self.events = events
        self.als_model = None

    def build_interaction_matrix(self):
        """Создание матрицы взаимодействия пользователей с категориями товаров."""
        # Присвоение весов: K добавил в корзину, 1 смотрел только, 0 иначе
        # Товары которые купили были в корзине и им ноль не будет присущен
        scores = list(np.ones(self.view_but_no_cart.shape[0])) + list(np.ones(self.add_to_cart.shape[0]) * self.config["K"])

        idx_users = list(self.view_but_no_cart["user_id_enc"].values.astype(int)) + list(self.add_to_cart["user_id_enc"].values.astype(int))
        idx_items = list(self.view_but_no_cart["categoryid_enc"].values.astype(int)) + list(self.add_to_cart["categoryid_enc"].values.astype(int))
        
        user_item_matrix_train = scipy.sparse.csr_matrix((scores, (idx_users, idx_items)), dtype=np.int8)
        
        return user_item_matrix_train

    def train_als_model(self):
        """Обучение."""
        user_item_matrix_train = self.build_interaction_matrix()

        # Train ALS model
        self.als_model = AlternatingLeastSquares(
            factors=self.config["FACTORS"], 
            iterations=self.config["ITERATIONS"], 
            regularization=self.config["REGULARIZATION"], 
            random_state=0
        )
        self.als_model.fit(user_item_matrix_train)


    def sim_item_chunk(self, chunk_idx, max_similar_items=10):
        """Находим похожие категорий для листа категорий."""
        similar_items = self.als_model.similar_items(chunk_idx, N=max_similar_items+1)
        
        # Convert to DataFrame
        sim_item_item_ids_enc = similar_items[0]
        sim_item_scores = similar_items[1]
        similar_items_df = pd.DataFrame({
            "item_id_enc": chunk_idx,
            "sim_item_id_enc": sim_item_item_ids_enc.tolist(), 
            "score": sim_item_scores.tolist()
        })
        
        similar_items_df = similar_items_df.explode(["sim_item_id_enc", "score"], ignore_index=True)

        return similar_items_df

    def get_similar_categories(self, chunk_size=10000, max_similar_items=10):
        """находим общие айтемы для всех категорий."""
        unique_categories_train = self.events_train['categoryid_enc'].unique()
        num_rows = len(unique_categories_train)
        chunks = []
        
        for start in range(0, num_rows, chunk_size):
            end = min(start + chunk_size, num_rows)
            chunk_idx = unique_categories_train[start:end]
            chunk = self.sim_item_chunk(chunk_idx, max_similar_items=max_similar_items)
            chunks.append(chunk)
        
        # собираем все вместе
        similar_categories = pd.concat(chunks, axis=0)
        
        return similar_categories

    def recommend_items(self, N=30):
        """Generate ALS recommendations for all users."""
        if self.als_model is None:
            raise ValueError("ALS model has not been trained yet. Call `train_als_model` first.")
        
        user_ids_encoded = range(self.events_train['user_id_enc'].max() + 1)
        user_item_matrix_train = self.build_interaction_matrix()
        
        # Generate ALS recommendations
        als_recommendations = self.als_model.recommend(
            user_ids_encoded, 
            user_item_matrix_train[user_ids_encoded], 
            filter_already_liked_items=False, N=N # малая хитрость ставим False чтобы категории которые смотрели
        )                                         # алс модель не выкинула, а то что купили отфильтруется ниже

        item_ids_enc = als_recommendations[0]
        als_scores = als_recommendations[1]

        # Format recommendations into a DataFrame
        als_recommendations_df = pd.DataFrame({
            "user_id_enc": user_ids_encoded,
            "categoryid_enc": item_ids_enc.tolist(), 
            "score": als_scores.tolist()
        })

        chunk_size = 10000
        num_rows = len(als_recommendations_df)
        chunks = []
        count=0
        for start in range(0, num_rows, chunk_size):
            count+=1
            end = min(start + chunk_size, num_rows)
            chunk = als_recommendations_df.iloc[start:end]
            exploded = chunk.explode(['categoryid_enc','score'],ignore_index=True)
            chunks.append(exploded)

        als_predictions = pd.concat(chunks,axis=0)

        return als_predictions

    def filter_already_bought(self, als_recommendations): # на будущие нужно еще убрать из предсказаний товары которые не доступны (not available)
        """Убираем из рекомендаций категории которые юзеры уже покупали."""
        already_bought = self.events_train[self.events_train["event"] == "transaction"][["categoryid", "visitorid"]]

        # кодировка юзеров и категорий товаров которые уже купили
        already_bought["user_id_enc"] = self.user_encoder.transform(already_bought["visitorid"])
        already_bought["categoryid_enc"] = self.category_encoder.transform(already_bought["categoryid"])

        already_bought = already_bought.drop(columns=["visitorid", "categoryid"])

        # Filter out already bought categories
        filtered_recommendations = als_recommendations.merge(already_bought, on=['user_id_enc', 'categoryid_enc'], how='left', indicator=True)
        filtered_recommendations = filtered_recommendations[filtered_recommendations['_merge'] == 'left_only'].drop(columns=['_merge'])
        
        # Rank the filtered recommendations by score
        filtered_recommendations["rank"] = filtered_recommendations.groupby("user_id_enc")["score"].rank(method="first", ascending=False)
        
        return filtered_recommendations

    def get_filtered_recommendations(self):
        """Train ALS, generate recommendations, and filter out already bought items."""
        self.train_als_model()
        als_recommendations = self.recommend_items()
        return self.filter_already_bought(als_recommendations)


Метрики

In [5]:
def process_events_recs_for_binary_metrics(events_train, events_test, recs, top_k=None):
    "функция считает бинарные статистики которые находят слушал ли человек трек из рекомендаций"

    "поставим флаг тру в тесте для всех евентов в тесте зафиксируя факт что пользователь взаимодействовал с треком"
    events_test["gt"] = True
    "выбрем пользователей которые присутствуют и в обучении и тесте"
    common_users = set(events_test["user_id_enc"]) & set(recs["user_id_enc"])
    
    "возьмем из теста евенты где присутствуют пользователи из обучающей выборки"
    events_for_common_users = events_test[events_test["user_id_enc"].isin(common_users)].copy()
    "возьмем рекомендации полученные для пользователей которые присутствуют в тестовой выборке"
    recs_for_common_users = recs[recs["user_id_enc"].isin(common_users)].copy()

    "оставим эвенты из теста где есть песни из обучающей выборки. модель не может выдать трек который не видела в обучении"
    events_for_common_users = events_for_common_users[events_for_common_users["categoryid_enc"].isin(events_train["categoryid_enc"].unique())]
    
    "возьмем лучшие top_k рекомендаций из рекомендаций"
    if top_k is not None:
        recs_for_common_users = recs_for_common_users[recs_for_common_users["rank"]<=top_k]

    events_recs_common = events_for_common_users[["user_id_enc", "categoryid_enc", "gt"]].merge(
        recs_for_common_users[["user_id_enc", "categoryid_enc", "rank"]], 
        on=["user_id_enc", "categoryid_enc"], how="outer") 
    
    events_recs_common["gt"] = events_recs_common["gt"].fillna(False)
    events_recs_common["pr"] = ~events_recs_common["rank"].isnull()

    "TP - сколько песен было общих в рекомендациях и по факту прослушки"
    "FP - сколько песен рекомендовали которых человек не слушал"
    "FN - сколько песен было прослушенно пользователем, но их не было в рекомендациях"
    
    events_recs_common["tp"] = events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fp"] = ~events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fn"] = events_recs_common["gt"] & ~events_recs_common["pr"]

    return events_recs_common



def compute_cls_metrics(events_recs_for_binary_metric):
    "подсчет precision recall"
    
    groupper = events_recs_for_binary_metric.groupby("user_id_enc")

    # precision = tp / (tp + fp)
    precision = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fp"].sum())
    precision = precision.fillna(0).mean()
    
    # recall = tp / (tp + fn)
    recall = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fn"].sum())
    recall = recall.fillna(0).mean()

    return precision, recall

Эксперименты

In [6]:
config1 = {"K": 5, "FACTORS": 50, "REGULARIZATION": 0.05, "ITERATIONS": 30}
config2 = {"K": 5, "FACTORS": 30, "REGULARIZATION": 0.1, "ITERATIONS": 20}
config3 = {"K": 3, "FACTORS": 70, "REGULARIZATION": 0.05, "ITERATIONS": 50}
config4 = {"K": 7, "FACTORS": 50, "REGULARIZATION": 0.01, "ITERATIONS": 50}
config5 =  {"K": 7, "FACTORS": 50, "REGULARIZATION": 0.01, "ITERATIONS": 70}

Логирование результатов

In [7]:
import mlflow
import joblib

RUN_NAME = 'ALS_5_STEP_MODEL'
config = config5

# Check if the experiment exists
experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)

# If not, create a new one
if experiment is None:
    experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)
else:
    experiment_id = experiment.experiment_id

als_recommender = ALSRecommender(
    config=config, 
    view_but_no_cart=view_but_no_cart, 
    add_to_cart=add_to_cart, 
    user_encoder=user_encoder, 
    category_encoder=category_encoder, 
    events_train=events_train, 
    events=events
)

als_recommender.train_als_model()

filtered_recommendations = als_recommender.get_filtered_recommendations()
similar_categories = als_recommender.get_similar_categories(chunk_size=1000, max_similar_items=10)

filtered_recommendations.to_parquet("../models/staging/offline_5.parquet")
similar_categories.to_parquet("../models/staging/online_5.parquet")

precision_at5, recall_at5 = compute_cls_metrics(
    process_events_recs_for_binary_metrics(events_train, events_test, filtered_recommendations, 5)
)

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    mlflow.log_params(config)

    model_path = "../models/als_model5.joblib"
    joblib.dump(als_recommender.als_model, model_path)
    mlflow.log_artifact(model_path, artifact_path="model")  
    mlflow.log_metric("precision_at5", precision_at5)
    mlflow.log_metric("recall_at5", recall_at5)

    mlflow.log_artifact("../models/staging/offline_5.parquet")
    mlflow.log_artifact("../models/staging/online_5.parquet")

  check_blas_config()
100%|██████████| 70/70 [06:12<00:00,  5.33s/it]
100%|██████████| 70/70 [06:05<00:00,  5.22s/it]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  events_test["gt"] = True
  events_recs_common["gt"] = events_recs_common["gt"].fillna(False)
2024/10/26 16:58:04 INFO mlflow.tracking._tracking_service.client: 🏃 View run ALS_5_STEP_MODEL at: http://127.0.0.1:5000/#/experiments/15/runs/582f51904ca749a5b469029610b16030.
2024/10/26 16:58:04 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://127.0.0.1:5000/#/experiments/15.


Результаты

![Parameter Grid Result](/home/mle-user/mle_projects/mle-pr-final/mlflow_server/assets/param_grid_result.png)
