# Контентные методы 

### Базовые операции

In [23]:
import gc
import os 
import logging
import joblib
import psutil
import numpy as np
import pandas as pd
import polars as pl
from tqdm import tqdm
from abc import ABC, abstractmethod
from typing import Dict, List
from collections import defaultdict, Counter 

import lightgbm as lgb
from scipy.sparse import vstack, hstack
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
os.makedirs('../models', exist_ok=True)

In [24]:
# Сразу загружу данные
train = pl.read_parquet("../../data/train.pq")
test = pl.read_parquet("../../data/test.pq")
books = pl.read_parquet("../../data/books.pq")

train_items = set(train["item_id"].unique())
test_items = set(test["item_id"].unique())
cold_items = test_items - train_items

#### Метрики оценивания моделей

In [25]:
class Validator(ABC):
    def __init__(self, train: pd.DataFrame, test: pd.DataFrame, cold_items: set = None):
        self.train = train
        self.test = test
        self.cold_items = cold_items or set()

    @abstractmethod
    def evaluate(self, predictions: Dict[int, List[int]]) -> Dict[str, float]:
        """
        predictions: dict user_id -> list of recommended item_ids
        """
        pass

    def recall_at_k(self, y_true: List[int], y_pred: List[int], k: int = 10) -> float:
        return len(set(y_true) & set(y_pred[:k])) / len(set(y_true)) if y_true else 0.0

    def precision_at_k(self, y_true: List[int], y_pred: List[int], k: int = 10) -> float:
        return len(set(y_true) & set(y_pred[:k])) / k if y_true else 0.0

    def hitrate_at_k(self, y_true: List[int], y_pred: List[int], k: int = 10) -> float:
        return 1.0 if len(set(y_true) & set(y_pred[:k])) > 0 else 0.0

    def ndcg_at_k(self, y_true: List[int], y_pred: List[int], k: int = 10) -> float:
        dcg = 0.0
        for i, item in enumerate(y_pred[:k]):
            if item in y_true:
                dcg += 1 / np.log2(i + 2)
        idcg = sum(1 / np.log2(i + 2) for i in range(min(len(y_true), k)))
        return dcg / idcg if idcg > 0 else 0.0

    def mrr_at_k(self, y_true: List[int], y_pred: List[int], k: int = 10) -> float:
        for i, item in enumerate(y_pred[:k]):
            if item in y_true:
                return 1 / (i + 1)
        return 0.0

    def coverage(self, predictions: Dict[int, List[int]]) -> float:
        all_pred_items = set(item for recs in predictions.values() for item in recs)
        all_train_items = set(self.train["item_id"].unique())
        return len(all_pred_items) / len(all_train_items)

    @staticmethod
    def print_metrics(metrics: Dict[str, float]):
        print("\n=== Evaluation Results ===")
        for key, value in metrics.items():
            print(f"{key:<15}: {value:.4f}")
        print("==========================\n")

Стоит разделить валидацию на две версии, для сравнения. Моя гипотеза заключается в том, что совместная валидация warm и cold может быть не совсем честной. Например, если в тесте 90% warm и 10% cold, то Recall@10 в среднем будет определяться warm-айтемами. Модель может полностью «забыть» про cold items, но в отчёте всё равно будут хорошие цифры. Это вводит в заблуждение: кажется, что модель универсальная, хотя на самом деле cold-start не решён.

#### Блок совместной валидации (baseline)

In [26]:
class JointValidator(Validator):
    def __init__(self, train: pl.DataFrame, test: pl.DataFrame, cold_items: set = None):
        super().__init__(train, test, cold_items)
        self.user2items = (
            test.group_by("user_id").agg(pl.col("item_id")).to_dict(as_series=False)
        )
        self.user2items = dict(zip(self.user2items["user_id"], self.user2items["item_id"]))

    def evaluate(self, predictions: Dict[int, List[int]]) -> Dict[str, float]:
        recalls, precisions, hits, ndcgs, mrrs = [], [], [], [], []
        for user_id, y_pred in predictions.items():
            y_true = self.user2items.get(user_id, [])
            recalls.append(self.recall_at_k(y_true, y_pred))
            precisions.append(self.precision_at_k(y_true, y_pred))
            hits.append(self.hitrate_at_k(y_true, y_pred))
            ndcgs.append(self.ndcg_at_k(y_true, y_pred))
            mrrs.append(self.mrr_at_k(y_true, y_pred))
        results = {
            "Recall@10": np.mean(recalls),
            "Precision@10": np.mean(precisions),
            "HitRate@10": np.mean(hits),
            "NDCG@10": np.mean(ndcgs),
            "MRR@10": np.mean(mrrs),
            "Coverage": self.coverage(predictions),
        }
        self.print_metrics(results)

#### Разделенная валидация (cold vs warm)

In [27]:
class SplitValidator(Validator):
    def __init__(self, train: pl.DataFrame, test: pl.DataFrame, cold_items: set = None):
        super().__init__(train, test, cold_items)
        self.user2items = (
            test.group_by("user_id").agg(pl.col("item_id")).to_dict(as_series=False)
        )
        self.user2items = dict(zip(self.user2items["user_id"], self.user2items["item_id"]))

    def evaluate(self, predictions: Dict[int, List[int]]) -> Dict[str, float]:
        results = {}
        for subset in ["cold", "warm"]:
            recalls, precisions, hits, ndcgs, mrrs = [], [], [], [], []
            for user_id, y_pred in predictions.items():
                y_true = self.user2items.get(user_id, [])
                if not y_true:
                    continue
                if subset == "cold":
                    y_true = [i for i in y_true if i in self.cold_items]
                elif subset == "warm":
                    y_true = [i for i in y_true if i not in self.cold_items]
                if not y_true:
                    continue
                recalls.append(self.recall_at_k(y_true, y_pred))
                precisions.append(self.precision_at_k(y_true, y_pred))
                hits.append(self.hitrate_at_k(y_true, y_pred))
                ndcgs.append(self.ndcg_at_k(y_true, y_pred))
                mrrs.append(self.mrr_at_k(y_true, y_pred))
            results[f"Recall@10_{subset}"] = np.mean(recalls) if recalls else 0.0
            results[f"Precision@10_{subset}"] = np.mean(precisions) if precisions else 0.0
            results[f"HitRate@10_{subset}"] = np.mean(hits) if hits else 0.0
            results[f"NDCG@10_{subset}"] = np.mean(ndcgs) if ndcgs else 0.0
            results[f"MRR@10_{subset}"] = np.mean(mrrs) if mrrs else 0.0
        results["Coverage"] = self.coverage(predictions)
        self.print_metrics(results)

#### Вспомогательные функции для удобного представления результатов

In [28]:
def _shorten_list(lst, max_len=10):
    """Обрезает длинные списки для красивого вывода"""
    if lst is None:
        return []
    return lst[:max_len] if len(lst) > max_len else lst

def show_predictions(models: dict, data: pl.DataFrame, n=5, verbose=True, is_val=False):
    df = data.sample(n).select(["user_id", "item_id"])
    if is_val:
        df = df.rename({"item_id": "true_items"})

    # добавляем предсказания
    for name, preds in models.items():
        df = df.with_columns(
            pl.col("user_id").map_elements(
                lambda u: _shorten_list(preds.get(u, [])), 
                return_dtype=pl.List(pl.Int64)
            ).alias(name)
        )

    if verbose:
        print(df.shape)
        print(df)

    return df


def val_predictions(models: dict, val: pl.DataFrame, validator: Validator, k: int = 10, verbose: bool = True):
    results = []
    user2items = (
        val.group_by("user_id").agg(pl.col("item_id")).to_dict(as_series=False)
    )
    user2items = dict(zip(user2items["user_id"], user2items["item_id"]))

    for model_name, preds in models.items():
        recalls, precisions, hits, ndcgs, mrrs = [], [], [], [], []
        for u, y_true in user2items.items():
            y_pred = preds.get(u, [])
            recalls.append(validator.recall_at_k(y_true, y_pred, k))
            precisions.append(validator.precision_at_k(y_true, y_pred, k))
            hits.append(validator.hitrate_at_k(y_true, y_pred, k))
            ndcgs.append(validator.ndcg_at_k(y_true, y_pred, k))
            mrrs.append(validator.mrr_at_k(y_true, y_pred, k))
        metrics = {
            "model": model_name,
            "Recall@10": np.mean(recalls),
            "Precision@10": np.mean(precisions),
            "HitRate@10": np.mean(hits),
            "NDCG@10": np.mean(ndcgs),
            "MRR@10": np.mean(mrrs),
            "Coverage": validator.coverage(preds),
        }
        results.append(metrics)

    df = pl.DataFrame(results)
    if verbose:
        print(df)
    return df


#### Функция векторизации

In [29]:
class TextVectorizer:
    def __init__(self, max_features: int = 200, stop_words: str = 'english'):  # Уменьшено до 200
        self.vectorizer = TfidfVectorizer(max_features=max_features, stop_words=stop_words)
        self.item_vectors = None
        self.item_ids = None

    def fit(self, metadata: pl.DataFrame):
        logger.info("Начало TF-IDF векторизации...")
        logger.info(f"Использование памяти: {psutil.virtual_memory().percent}%")
        
        # Подготовка текстовых данных
        metadata = metadata.with_columns(
            pl.col("description").fill_null("") + " " + 
            pl.col("tags").list.join(" ").fill_null("")
        )
        corpus = metadata["description"].to_list()
        self.item_ids = metadata["item_id"].to_list()

        # Векторизация
        self.item_vectors = self.vectorizer.fit_transform(corpus)
        logger.info(f"TF-IDF векторизация завершена. Размер матрицы: {self.item_vectors.shape}")
        logger.info(f"Использование памяти после векторизации: {psutil.virtual_memory().percent}%")

    def save(self, path: str):
        try:
            os.makedirs(os.path.dirname(path), exist_ok=True)
            joblib.dump({
                'vectorizer': self.vectorizer,
                'item_vectors': self.item_vectors,
                'item_ids': self.item_ids
            }, path)
            logger.info(f"Векторизатор сохранен в {path}")
        except Exception as e:
            logger.error(f"Ошибка при сохранении векторизатора: {e}")
            raise

    def load(self, path: str):
        try:
            data = joblib.load(path)
            self.vectorizer = data['vectorizer']
            self.item_vectors = data['item_vectors']
            self.item_ids = data['item_ids']
            logger.info(f"Векторизатор загружен из {path}")
        except Exception as e:
            logger.error(f"Ошибка при загрузке векторизатора: {e}")
            raise

#### Базовый класс для загрузки моделей

In [30]:
class Recommender(ABC):
    def __init__(self, model_path: str = None):
        self.model = None
        self.model_path = model_path
        if model_path and os.path.exists(model_path):
            self.load_model()

    @abstractmethod
    def fit(self, train: pl.DataFrame, vectorizer: TextVectorizer = None):
        pass

    @abstractmethod
    def predict(self, user_ids: List[int], k: int = 10) -> Dict[int, List[int]]:
        pass

    def save_model(self):
        if self.model_path:
            try:
                os.makedirs(os.path.dirname(self.model_path), exist_ok=True)
                joblib.dump(self.model, self.model_path)
                logger.info(f"Модель сохранена в {self.model_path}")
            except Exception as e:
                logger.error(f"Ошибка при сохранении модели в {self.model_path}: {e}")
                raise

    def load_model(self):
        if self.model_path and os.path.exists(self.model_path):
            try:
                self.model = joblib.load(self.model_path)
                logger.info(f"Модель загружена из {self.model_path}")
            except Exception as e:
                logger.error(f"Ошибка при загрузке модели из {self.model_path}: {e}")
                raise

### Использование контентных методов

#### LightGBM

In [31]:
class LightGBMRecommender(Recommender):
    def __init__(self, model_path: str = "../models/lightgbm/lightgbm_recommender.pkl", force_retrain: bool = False):
        super().__init__(model_path)
        self.vectorizer = None
        self.item_vectors = None
        self.item_ids = None
        self.force_retrain = force_retrain

    def fit(self, train: pl.DataFrame, vectorizer: TextVectorizer = None):
        if not isinstance(vectorizer, TextVectorizer):
            logger.error("Аргумент 'vectorizer' должен быть экземпляром TextVectorizer")
            raise TypeError("Аргумент 'vectorizer' должен быть экземпляром TextVectorizer")
        if self.model_path and os.path.exists(self.model_path) and not self.force_retrain:
            self.load_model()
            return

        logger.info("Начало обучения LightGBMRecommender...")
        logger.info(f"Использование памяти: {psutil.virtual_memory().percent}%")

        self.vectorizer = vectorizer.vectorizer
        self.item_vectors = vectorizer.item_vectors
        self.item_ids = vectorizer.item_ids

        data_path = "../models/lightgbm/lightgbm_data.pkl"
        if os.path.exists(data_path) and not self.force_retrain:
            logger.info("Загрузка сохраненных данных для LightGBM...")
            data = joblib.load(data_path)
            X, y = data['X'], data['y']
        else:
            X, y = [], []
            user_items = train.group_by("user_id").agg(pl.col("item_id")).to_pandas()
            item_id_to_idx = {item_id: idx for idx, item_id in enumerate(self.item_ids)}
            batch_size = 500
            batch_count = 0

            for i in tqdm(range(0, len(user_items), batch_size), desc="Подготовка данных для LightGBM"):
                batch = user_items[i:i+batch_size]
                batch_X, batch_y = [], []
                for _, row in batch.iterrows():
                    user_id = row["user_id"]
                    pos_items = set(row["item_id"])
                    neg_items = list(set(self.item_ids) - pos_items)[:min(len(pos_items), 5)]

                    valid_pos_items = [item for item in pos_items if item in item_id_to_idx]
                    if not valid_pos_items:
                        logger.debug(f"Пользователь {user_id} не имеет валидных элементов, пропускаем")
                        continue

                    user_vector = vstack([self.item_vectors[item_id_to_idx[item]] for item in valid_pos_items]).mean(axis=0)
                    logger.debug(f"Пользователь {user_id}: {len(valid_pos_items)} валидных элементов")

                    for item_id in pos_items:
                        if item_id in item_id_to_idx:
                            item_vector = self.item_vectors[item_id_to_idx[item_id]]
                            batch_X.append(hstack([user_vector, item_vector]))
                            batch_y.append(1)

                    for item_id in neg_items:
                        if item_id in item_id_to_idx:
                            item_vector = self.item_vectors[item_id_to_idx[item_id]]
                            batch_X.append(hstack([user_vector, item_vector]))
                            batch_y.append(0)

                if not batch_X:
                    logger.warning(f"Батч {batch_count} пустой, пропускаем")
                    continue

                batch_path = f"../models/lightgbm/lightgbm_data_batch_{batch_count}.pkl"
                try:
                    os.makedirs(os.path.dirname(batch_path), exist_ok=True)
                    joblib.dump({'X': batch_X, 'y': batch_y}, batch_path)
                    logger.info(f"Батч {batch_count} сохранен в {batch_path}, размер: {len(batch_X)} пар")
                    batch_count += 1
                except Exception as e:
                    logger.error(f"Ошибка при сохранении батча {batch_count}: {e}")
                    raise
                batch_X, batch_y = [], []
                gc.collect()

            if batch_count == 0:
                logger.error("Не создано ни одного батча данных. Проверьте данные train и item_ids.")
                raise ValueError("Не удалось создать данные для обучения LightGBM: пустой датасет")

            X, y = [], []
            for i in range(batch_count):
                batch_data = joblib.load(f"../models/lightgbm/lightgbm_data_batch_{i}.pkl")
                X.extend(batch_data['X'])
                y.extend(batch_data['y'])
                os.remove(f"../models/lightgbm/lightgbm_data_batch_{i}.pkl")
            try:
                os.makedirs(os.path.dirname(data_path), exist_ok=True)
                joblib.dump({'X': X, 'y': y}, data_path)
                logger.info(f"Данные LightGBM сохранены в {data_path}")
            except Exception as e:
                logger.error(f"Ошибка при сохранении данных LightGBM: {e}")
                raise

        if not X:
            logger.error("Датасет X пустой. Проверьте наличие валидных элементов в train и item_ids.")
            raise ValueError("Пустой датасет X для LightGBM")

        logger.info(f"Создано {len(X)} пар пользователь-книга")
        logger.info(f"Использование памяти после подготовки данных: {psutil.virtual_memory().percent}%")

        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
        if not X_train:
            logger.error("X_train пустой после разделения. Проверьте данные.")
            raise ValueError("Пустой X_train для LightGBM")

        # Convert to single sparse matrix
        X_train_matrix = vstack(X_train)
        X_val_matrix = vstack(X_val)
        logger.info(f"X_train shape: {X_train_matrix.shape}, X_val shape: {X_val_matrix.shape}")

        train_data = lgb.Dataset(X_train_matrix, label=y_train)
        val_data = lgb.Dataset(X_val_matrix, label=y_val, reference=train_data)

        params = {
            'objective': 'binary',
            'metric': 'binary_logloss',
            'num_leaves': 31,
            'learning_rate': 0.05,
            'feature_fraction': 0.9,
            'num_threads': -1
        }
        self.model = lgb.train(params, train_data, valid_sets=[val_data], num_boost_round=30)
        logger.info("Обучение LightGBM завершено")
        self.save_model()

    def predict(self, user_ids: List[int], k: int = 10, item_subset: List[int] = None, batch_size: int = 500) -> Dict[int, List[int]]:
        predictions = {}
        item_id_to_idx = {item_id: idx for idx, item_id in enumerate(self.item_ids)}
        pred_items_subset = item_subset if item_subset is not None else self.item_ids

        for i in tqdm(range(0, len(user_ids), batch_size), desc="Predicting with LightGBM"):
            batch_users = user_ids[i:i+batch_size]
            for user_id in batch_users:
                user_items = train.filter(pl.col("user_id") == user_id)["item_id"].to_list()
                valid_items = [item for item in user_items if item in item_id_to_idx]
                user_vector = vstack([self.item_vectors[item_id_to_idx[item]] for item in valid_items]).mean(axis=0) if valid_items else np.zeros((1, self.item_vectors.shape[1]))

                X_pred = []
                pred_items = []
                for item_id in pred_items_subset:
                    if item_id in item_id_to_idx:
                        item_vector = self.item_vectors[item_id_to_idx[item_id]]
                        X_pred.append(hstack([user_vector, item_vector]))
                        pred_items.append(item_id)

                if not X_pred:
                    logger.warning(f"Пустой X_pred для пользователя {user_id}, пропускаем")
                    predictions[user_id] = []
                    continue

                scores = self.model.predict(vstack(X_pred))
                top_k_indices = np.argsort(scores)[::-1][:k]
                predictions[user_id] = [pred_items[idx] for idx in top_k_indices]
            gc.collect()
        return predictions

    def save_model(self):
        if self.model_path:
            try:
                os.makedirs(os.path.dirname(self.model_path), exist_ok=True)
                joblib.dump({
                    'model': self.model,
                    'vectorizer': self.vectorizer,
                    'item_vectors': self.item_vectors,
                    'item_ids': self.item_ids
                }, self.model_path)
                logger.info(f"Модель сохранена в {self.model_path}")
            except Exception as e:
                logger.error(f"Ошибка при сохранении LightGBMRecommender: {e}")
                raise

    def load_model(self):
        if self.model_path and os.path.exists(self.model_path):
            try:
                data = joblib.load(self.model_path)
                self.model = data['model']
                self.vectorizer = data['vectorizer']
                self.item_vectors = data['item_vectors']
                self.item_ids = data['item_ids']
                logger.info(f"Модель LightGBMRecommender загружена из {self.model_path}")
            except Exception as e:
                logger.error(f"Ошибка при загрузке LightGBMRecommender: {e}")
                raise

### Обучение моделей 

In [32]:
print(f"Train users: {train['user_id'].n_unique()}, items: {len(train_items)}")
print(f"Test users: {test['user_id'].n_unique()}, items: {len(test_items)}")
print(f"Cold items: {len(cold_items)}")
user_ids = test["user_id"].unique().to_list()

vectorizer = TextVectorizer(max_features=1000)
vectorizer_path = "../models/vectorizer/tfidf_vectorizer.pkl"
if os.path.exists(vectorizer_path):
    vectorizer.load(vectorizer_path)
else:
    vectorizer.fit(books)
    vectorizer.save(vectorizer_path)

# Инициализируем и обучаем разные модели
force_retrain = True  # Переключатель для обучения/загрузки моделей
lightgbm_recommender = LightGBMRecommender(model_path="../models/lightgbm/lightgbm_recommender.pkl", force_retrain=force_retrain)

2025-09-07 11:53:17,346 - INFO - Векторизатор загружен из ../models/vectorizer/tfidf_vectorizer.pkl
2025-09-07 11:53:17,374 - INFO - Модель LightGBMRecommender загружена из ../models/lightgbm/lightgbm_recommender.pkl


Train users: 349719, items: 31300
Test users: 185828, items: 27367
Cold items: 1775


In [33]:
sample_users = train["user_id"].unique().sample(fraction=0.1, seed=42).to_list()
train_sample = train.filter(pl.col("user_id").is_in(sample_users))
logger.info(f"Используется подвыборка для обучения: {len(sample_users)} пользователей")

# Подвыборка пользователей для предсказания (10% от всех пользователей в test)
user_ids = test["user_id"].unique().sample(fraction=0.01, seed=42).to_list()
logger.info(f"Используется подвыборка для предсказания: {len(user_ids)} пользователей")

# Ограничение книг для предсказания (топ-1000 популярных)
popular_items = train.group_by("item_id").agg(pl.len()).sort("len", descending=True).head(1000)["item_id"].to_list()
logger.info("=== Обучение и предсказание LightGBMRecommender ===")
pred_gbm_path = "../models/lightgbm/pred_gbm.pkl"
if os.path.exists(pred_gbm_path):
    logger.info("Загрузка сохраненных предсказаний LightGBMRecommender...")
    pred_gbm = joblib.load(pred_gbm_path)
else:
    try:
        lightgbm_recommender = LightGBMRecommender(model_path="../models/lightgbm/lightgbm_recommender.pkl", force_retrain=True)
        lightgbm_recommender.fit(train_sample, vectorizer)
        pred_gbm = lightgbm_recommender.predict(user_ids, item_subset=popular_items, batch_size=500)
        logger.info("LightGBMRecommender обучен и предсказания получены")
        joblib.dump(pred_gbm, pred_gbm_path)
        logger.info(f"Предсказания LightGBMRecommender сохранены в {pred_gbm_path}")
    except Exception as e:
        logger.error(f"Ошибка при обучении/предсказании LightGBMRecommender: {e}")
        raise

models = {
    "LightGBM": pred_gbm,
}


print("\nРекомендации моделей:")
train_df = show_predictions(models, train, n=5, verbose=True)

2025-09-07 11:53:17,704 - INFO - Используется подвыборка для обучения: 34971 пользователей
2025-09-07 11:53:17,738 - INFO - Используется подвыборка для предсказания: 1858 пользователей
2025-09-07 11:53:17,818 - INFO - === Обучение и предсказание LightGBMRecommender ===
2025-09-07 11:53:17,839 - INFO - Модель LightGBMRecommender загружена из ../models/lightgbm/lightgbm_recommender.pkl
2025-09-07 11:53:17,839 - INFO - Начало обучения LightGBMRecommender...
2025-09-07 11:53:17,840 - INFO - Использование памяти: 62.4%
Подготовка данных для LightGBM:   0%|          | 0/70 [00:00<?, ?it/s]2025-09-07 11:53:22,083 - INFO - Батч 0 сохранен в ../models/lightgbm/lightgbm_data_batch_0.pkl, размер: 16734 пар
Подготовка данных для LightGBM:   1%|▏         | 1/70 [00:04<04:54,  4.26s/it]2025-09-07 11:53:26,417 - INFO - Батч 1 сохранен в ../models/lightgbm/lightgbm_data_batch_1.pkl, размер: 17329 пар
Подготовка данных для LightGBM:   3%|▎         | 2/70 [00:08<04:52,  4.30s/it]2025-09-07 11:53:31,065 

KeyboardInterrupt: 

In [None]:
print("\nСтатистика по метрикам:")
validator = JointValidator(train, test, cold_items)
metrics_df = val_predictions(models, test, validator, k=10, verbose=True)

In [None]:
print("\nСтатистика по метрикам (SplitValidator):")
split_validator = SplitValidator(train, test, cold_items)
metrics_df_split = val_predictions(models, test, split_validator, k=10, verbose=True)