In [1]:
import os
import json
import implicit
import numpy as np
import pandas as pd
import polars as pl
from glob import glob
from tqdm import tqdm
from collections import Counter
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix
from implicit.cpu.als import AlternatingLeastSquares
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score, precision_score, recall_score, f1_score

In [3]:
video_stats = pl.read_parquet('data/video_stat.parquet')
video_stats = video_stats.filter(pl.col("v_frac_avg_watchtime_30_day_duration") > 0.00008)

top_authors = (
    video_stats
    .group_by("author_id")
    .agg([
        pl.col("video_id").count().alias("video_count")
    ])
)
filtered_data = top_authors.filter(pl.col("video_count") > 10)
authors = np.unique(filtered_data["author_id"])

video_to_author = video_stats.select(["video_id", "author_id"]).to_dict(as_series=False)

user_interaction_counts = []
for log_path in tqdm(glob("data/logs*.parquet")):
    user_interaction_counts_i = {}
    log = pl.read_parquet(log_path)

    log_with_authors = log.join(
        video_stats.select(["video_id", "author_id"]),
        on="video_id",
        how="inner"
    )

    log_filtered = log_with_authors.filter(pl.col("author_id").is_in(authors))

    for user_id in log_filtered["user_id"]:
        user_interaction_counts_i[user_id] = user_interaction_counts_i.get(user_id, 0) + 1

    user_interaction_counts.append(user_interaction_counts_i)

total_user_interaction_counts = {}
for user_counts in tqdm(user_interaction_counts):
    for user_id, count in user_counts.items():
        if user_id in total_user_interaction_counts:
            total_user_interaction_counts[user_id] += count
        else:
            total_user_interaction_counts[user_id] = count

filtered_users = {user_id: count for user_id, count in total_user_interaction_counts.items() if count >= 100}


rows = []
cols = []
users_set = set()
for log_path in tqdm(glob("data/logs*.parquet")):
    log = pl.read_parquet(log_path)

    log_with_authors = log.join(
        video_stats.select(["video_id", "author_id"]),
        on="video_id",
        how="inner"
    )

    log_filtered = log_with_authors.filter(pl.col("author_id").is_in(authors))

    user_ids = log_filtered["user_id"].to_list()
    author_ids = log_filtered["author_id"].to_list()

    for user_id, author_id in zip(user_ids, author_ids):
        if user_id in filtered_users:
            rows.append(user_id)
            cols.append(author_id)

filtered_users = sorted(filtered_users)
user_to_idx = {user: idx for idx, user in enumerate(filtered_users)}
author_to_idx = {author: idx for idx, author in enumerate(authors)}

rows_idx = np.array([user_to_idx[user] for user in rows], dtype=np.int32)
cols_idx = np.array([author_to_idx[author] for author in cols], dtype=np.int32)

data = np.ones(len(rows_idx), dtype=np.int8)

user_author_matrix = csr_matrix((data, (rows_idx, cols_idx)), shape=(len(filtered_users), len(authors)))

print("Shape of user-author matrix:", user_author_matrix.shape)

100%|██████████| 30/30 [05:36<00:00, 11.21s/it]


Shape of user-author matrix: (32203, 23206)


In [None]:
import numpy as np
from sklearn.metrics import average_precision_score

def get_similar_items(model, item_idx: int, n: int = 5):
    """
    Возвращает список похожих объектов для заданного item.

    Parameters
    ----------
    model : object
        Тренированная модель рекомендаций.
    item_idx : int
        Идентификатор объекта.
    n : int, optional
        Количество похожих объектов (default is 5).

    Returns
    -------
    list
        Список похожих объектов.
    """
    try:
        similar_items = model.similar_items(item_idx, N=n)
        return [item for item in similar_items[0]]
    except:
        print(item_idx)
        return [0]


def calculate_item_metrics(test_matrix, model, n_recommendations: int = 100):
    """
    Подсчет метрик качества рекомендаций на основе похожих объектов (авторов): precision, recall, F1, MAP.

    Parameters
    ----------
    test_matrix : csr_matrix
        Тестовая матрица взаимодействий объектов (авторов).
    model : object
        Тренированная модель рекомендаций.
    n_recommendations : int, optional
        Количество рекомендаций для каждого объекта (default is 5).

    Returns
    -------
    tuple
        Значения метрик (precision, recall, f1, map_score).
    """
    precisions = []
    recalls = []
    map_scores = []
    
    # Пройдемся по всем объектам (авторам) в тестовой матрице
    for item_id in tqdm(range(test_matrix.shape[1])):
        # Получаем реальные объекты (авторов), с которыми объект связан в тестовой выборке
        true_items = test_matrix[item_id].nonzero()[1]

        if len(true_items) == 0:
            continue

        # Получаем рекомендации похожих объектов (авторов)
        recommended_items = get_similar_items(model, item_id, n=n_recommendations)

        # Пересечение рекомендованных объектов с теми, с которыми объект действительно взаимодействовал
        hits = len(set(recommended_items) & set(true_items))

        # Precision@N
        precision_at_n = hits / len(recommended_items) if len(recommended_items) > 0 else 0
        precisions.append(precision_at_n)

        # # Recall@N
        # recall_at_n = hits / len(true_items) if len(true_items) > 0 else 0
        # recalls.append(recall_at_n)

        # Mean Average Precision (MAP)
        relevant_flags = [1 if item in true_items else 0 for item in recommended_items]
        if np.sum(relevant_flags) > 0:
            map_scores.append(average_precision_score(relevant_flags, relevant_flags))
        else:
            map_scores.append(0)

        # print(hits)
        # break

    # # Средние значения по всем объектам
    precision = np.mean(precisions)
    # recall = np.mean(recalls)
    # f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    map_score = np.mean(map_scores) if len(map_scores) > 0 else 0
    
    return precision, map_score


# Пример использования
model = AlternatingLeastSquares(factors=128, regularization=0.1, iterations=50)
model.fit(user_author_matrix)  # Здесь обучающая матрица авторов и объектов (например, каналы и их авторы)
# model = AlternatingLeastSquares().load("ALS")

precision, map_score = calculate_item_metrics(user_author_matrix, model)

print(f"Precision @ 100: {precision:.4f}")
# print(f"Recall: {recall:.4f}")
# print(f"F1 Score: {f1:.4f}")
print(f"Mean Average Precision (MAP): {map_score:.4f}")

In [140]:
import json
import random
import numpy as np
import polars as pl
from typing import Optional, List
from implicit.cpu.als import AlternatingLeastSquares


class ALS:
    def __init__(
        self,
        model_path: str,
        authors_ids_path: str,
        video_stats_path: str,
        num_close_videos: int
    ) -> None:
        """Инициализация модели ALS, загрузка данных по авторам и статистике видео"""
        self.model = AlternatingLeastSquares().load(model_path)

        self.video_stats = pl.read_parquet(video_stats_path)
        self.num_close_videos = num_close_videos
        self.num_close_authors = num_close_videos // 2

        with open(authors_ids_path, "r") as f:
            self.authors_to_idxs = json.load(f)
        ids_to_autors = {}
        for k,v in self.authors_to_idxs.items():
            ids_to_autors[v] = k
        self.ids_to_autors = ids_to_autors

    def __getitem__(self, author_id: int) -> Optional[list[int]]:
        """Возвращает список рекомендованных видео для похожих авторов"""
        if author_id not in self.authors_to_idxs:
            return None

        author_idx = self.authors_to_idxs[author_id]

        # Получаем похожих авторов через ALS модель
        similar_author_idxs = self.model.similar_items(author_idx, N=self.num_close_authors)[0]

        similar_author_ids = []
        for similar_author_idx in similar_author_idxs:
            similar_author_ids.append(self.ids_to_autors[similar_author_idx])
    
        # Фильтруем видео по списку похожих авторов
        close_author_videos = self.video_stats.filter(
            pl.col("author_id").is_in(similar_author_ids)
        )

        # Получаем рекомендованные видео с использованием взвешенного семплера
        recommended_video_ids = get_top_videos(close_author_videos, n_top=2)

        # Возвращаем список video_id
        return recommended_video_ids


def get_top_videos(data, n_top):
    top_videos = (
        data
        .sort("v_likes", descending=True)
        .group_by("author_id")
        .head(n_top)
    )
    return top_videos["video_id"].to_list()
