# Инициализация

Загружаем библиотеки необходимые для выполнения кода ноутбука.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pyarrow as pa
import pyarrow.parquet as pq


# === ЭТАП 1 ===

# Загрузка первичных данных

Загружаем первичные данные из файлов:
- tracks.parquet
- catalog_names.parquet
- interactions.parquet

In [None]:
tracks = pd.read_parquet("tracks.parquet")
catalogs = pd.read_parquet("catalog_names.parquet")
interactions = pd.read_parquet("interactions.parquet")

# Обзор данных

Проверяем данные, есть ли с ними явные проблемы.

In [None]:
tracks.dtypes

In [None]:
catalogs.dtypes

In [None]:
interactions.dtypes

In [None]:
print(tracks.isnull().sum())
print(catalogs.isnull().sum())
print(interactions.isnull().sum())

In [None]:
#вывод пропусков
artists_ref = set(catalogs[catalogs['type'] == 'artist']['id'])
albums_ref = set(catalogs[catalogs['type'] == 'album']['id'])
genres_ref = set(catalogs[catalogs['type'] == 'genre']['id'])
tracks_ref = set(catalogs[catalogs['type'] == 'track']['id'])

all_artists = set(np.concatenate(tracks['artists'].values))
all_albums = set(np.concatenate(tracks['albums'].values))
all_genres = set(np.concatenate(tracks['genres'].values))

miss_artists = all_artists - artists_ref
miss_albums = all_albums - albums_ref
miss_genres = all_genres - genres_ref
miss_tracks = set(interactions['track_id'])-set(tracks['track_id'])
print(miss_artists)
print(miss_albums)
print(miss_genres)
print(miss_tracks)

In [None]:
tracks_clean = tracks[
    tracks['artists'].apply(lambda x: all(i in artists_ref for i in x)) &
    tracks['albums'].apply(lambda x: all(i in albums_ref for i in x)) &
    tracks['genres'].apply(lambda x: all(i in genres_ref for i in x))
]

In [None]:
interactions_clean = interactions[interactions['track_id'].isin(tracks_clean['track_id'])]

# Выводы

Приведём выводы по первому знакомству с данными:
- есть ли с данными явные проблемы,
- какие корректирующие действия (в целом) были предприняты.

In [None]:
print(f"Оригинальные треки: {len(tracks)}, Очищенные треки: {len(tracks_clean)}")
print(f"Оригинальные взаимодействия: {len(interactions)}, Очищенные взаимодействия: {len(interactions_clean)}")
print(f"Отсутствующие артисты: {len(miss_artists)}")
print(f"Отсутствующие альбомы: {len(miss_albums)}")
print(f"Отсутствующие жанры: {len(miss_genres)}")

Были удалены записи, чьи жанры отсутсвовали в таблице жанров

# === ЭТАП 2 ===

# EDA

Распределение количества прослушанных треков.

In [None]:
user_plays = interactions_clean.groupby('user_id').size().reset_index(name='play_count')

plt.figure(figsize=(12, 6))
plt.hist(user_plays['play_count'], bins=50, log=True)
plt.title('Распределение количества прослушиваний на пользователя')
plt.xlabel('Количество прослушанных треков')
plt.ylabel('Количество пользователей')
plt.show()

plt.figure(figsize=(12, 6))
sns.boxplot(x=user_plays['play_count'])
plt.title('boxplot прослушиваний на пользователя')
plt.xlabel('Количество прослушанных треков')
plt.show()

Наиболее популярные треки

In [None]:
track_names = catalogs[catalogs['type']=='track'].set_index('id')['name']
top_tracks = (
    interactions.groupby('track_id')
    .size()
    .reset_index(name='play_count')
    .sort_values('play_count', ascending=False)
    .head(10)
)

top_tracks['track_name'] = top_tracks['track_id'].map(track_names)

print("Топ-10 популярных треков:")
print(top_tracks[['track_name', 'play_count']])

Наиболее популярные жанры

In [None]:
genre_names = catalogs[catalogs['type'] == 'genre'].set_index('id')['name']
# Создание связи трек-жанр (развертка списков жанров)
track_genres = tracks_clean[['track_id', 'genres']].explode('genres')

# Соединение с данными прослушиваний
genre_plays = (
    pd.merge(interactions[['track_id']], track_genres, on='track_id')
    .groupby('genres')
    .size()
    .reset_index(name='play_count')
    .sort_values('play_count', ascending=False)
    .head(10)
)

# Добавление названий жанров
genre_plays['genre_name'] = genre_plays['genres'].map(genre_names)

# Результат
print("Топ-10 популярных жанров:")
print(genre_plays[['genre_name', 'play_count']])

Треки, которые никто не прослушал

In [None]:
listened_tracks = set(interactions_clean['track_id'])
unlistened_tracks = tracks_clean[~tracks_clean['track_id'].isin(listened_tracks)]
track_names = catalogs[catalogs['type'] == 'track'].set_index('id')['name']
unlistened_tracks['track_name'] = unlistened_tracks['track_id'].map(track_names)
print(unlistened_tracks)

Все треки хотя бы раз были прослушаны!

# Преобразование данных

Преобразуем данные в формат, более пригодный для дальнейшего использования в расчётах рекомендаций.

In [None]:
track_names = catalogs[catalogs['type'] == 'track'].set_index('id')['name']
album_names = catalogs[catalogs['type'] == 'album'].set_index('id')['name']
artist_names = catalogs[catalogs['type'] == 'artist'].set_index('id')['name']
genre_names = catalogs[catalogs['type'] == 'genre'].set_index('id')['name']

tracks_clean['track_name'] = tracks_clean['track_id'].map(track_names)
tracks_clean['album_names'] = tracks_clean['albums'].apply(
    lambda x: [album_names.get(a, 'Unknown') for a in x]
)
tracks_clean['artist_names'] = tracks_clean['artists'].apply(
    lambda x: [artist_names.get(a, 'Unknown') for a in x]
)
tracks_clean['genre_names'] = tracks_clean['genres'].apply(
    lambda x: [genre_names.get(g, 'Unknown') for g in x]
)

# Формирование финальной таблицы
items_df = tracks_clean[[
    'track_id', 
    'track_name',
    'albums',
    'album_names',
    'artists',
    'artist_names',
    'genres',
    'genre_names'
]].rename(columns={
    'albums': 'album_ids',
    'artists': 'artist_ids',
    'genres': 'genre_ids'
})


In [None]:
interactions_clean['started_at'] = pd.to_datetime(interactions_clean['started_at'])

interactions_clean['day_of_week'] = interactions_clean['started_at'].dt.dayofweek
interactions_clean['month'] = interactions_clean['started_at'].dt.month

In [None]:
interactions_clean

# Сохранение данных

In [None]:
schema = pa.Schema.from_pandas(items_df)
with pq.ParquetWriter('items.parquet', schema=schema, compression='SNAPPY') as writer:
    table = pa.Table.from_pandas(items_df, schema=schema)
    writer.write_table(table)
print(f"Размер: {len(items_df)} записей")
print(f"Имена столбцов: {items_df.columns.tolist()}")

In [None]:
schema = pa.Schema.from_pandas(interactions_clean)
with pq.ParquetWriter('events.parquet', schema=schema, compression='SNAPPY') as writer:
    table = pa.Table.from_pandas(interactions_clean, schema=schema)
    writer.write_table(table)
print(f"Размер: {len(interactions_clean)} записей")
print(f"Имена столбцов: {interactions_clean.columns.tolist()}")

Сохраним данные в двух файлах в персональном S3-бакете по пути `recsys/data/`:
- `items.parquet` — все данные о музыкальных треках,
- `events.parquet` — все данные о взаимодействиях.

In [None]:
import s3fs
import pyarrow as pa
from dotenv import load_dotenv
import os

load_dotenv()

S3_BUCKET = os.getenv('S3_BUCKET_NAME')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

s3 = s3fs.S3FileSystem(
        key=AWS_ACCESS_KEY_ID,
        secret=AWS_SECRET_ACCESS_KEY,
        endpoint_url='https://storage.yandexcloud.net',
        client_kwargs={'region_name': 'ru-central1'}  # Регион по умолчанию
)

s3_events_path = 'recsys/data/items.parquet'

full_path = f's3://{S3_BUCKET}/{s3_events_path}'
table = pa.Table.from_pandas(items_df)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")


s3_events_path = 'recsys/data/events.parquet'

full_path = f's3://{S3_BUCKET}/{s3_events_path}'
table = pa.Table.from_pandas(interactions_clean)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")

# Очистка памяти

Здесь, может понадобится очистка памяти для высвобождения ресурсов для выполнения кода ниже. 

Приведите соответствующие код, комментарии, например:
- код для удаление более ненужных переменных,
- комментарий, что следует перезапустить kernel, выполнить такие-то начальные секции и продолжить с этапа 3.

Необходимо перезапустить ядро и выполнить первую ячейку с импортами

# === ЭТАП 3 ===

# Загрузка данных

Если необходимо, то загружаем items.parquet, events.parquet.

In [None]:
import pandas as pd
import numpy as np
import math


In [None]:
items = pd.read_parquet("items.parquet")
events = pd.read_parquet("events.parquet")

In [None]:
items.head()

In [None]:
events.head()

Разбиваем данные на тренировочную, тестовую выборки.

In [None]:
from sklearn.preprocessing import MinMaxScaler


events['rating_count'] = events.groupby('track_id')['user_id'].transform('nunique')

scaler = MinMaxScaler()
events['rating'] = scaler.fit_transform(events[['rating_count']])

In [None]:
train_test_global_time_split_date = pd.to_datetime("2022-12-16")

train_test_global_time_split_idx = events["started_at"] < train_test_global_time_split_date
events_train = events[train_test_global_time_split_idx]
events_test = events[~train_test_global_time_split_idx]

users_train = events_train["user_id"].drop_duplicates()
users_test = events_test["user_id"].drop_duplicates()


common_users = set(users_train).intersection(set(users_test))

print(len(users_train), len(users_test), len(common_users))

In [None]:
cold_users = users_test[~users_test.isin(common_users)]

print(len(cold_users))

In [None]:
from sklearn.preprocessing import LabelEncoder


user_encoder = LabelEncoder()
user_encoder.fit(events["user_id"])

events["item_id_enc"] = user_encoder.transform(events["user_id"])
events_train["user_id_enc"] = user_encoder.transform(events_train["user_id"])
events_test["user_id_enc"] = user_encoder.transform(events_test["user_id"])

item_encoder = LabelEncoder()
item_encoder.fit(items["track_id"])

items["track_id_enc"] = item_encoder.transform(items["track_id"])
events_train["track_id_enc"] = item_encoder.transform(events_train["track_id"])
events_test["track_id_enc"] = item_encoder.transform(events_test["track_id"])

In [None]:
import joblib


joblib.dump(user_encoder, 'user_encoder.pkl')
joblib.dump(item_encoder, 'item_encoder.pkl')

In [None]:
events_train.to_parquet('events_train.parquet')
events_test.to_parquet('events_test.parquet')

# Топ популярных

Рассчитаем рекомендации как топ популярных.

In [None]:
import pandas as pd
import joblib

events_train = pd.read_parquet('events_train.parquet')
events_test = pd.read_parquet('events_test.parquet')
items = pd.read_parquet("items.parquet")
user_encoder = joblib.load('user_encoder.pkl')
item_encoder = joblib.load('item_encoder.pkl')
track_info = items[['track_id','track_name','album_names','artist_names','genre_names']]

In [None]:
events_train

In [None]:
item_popularity = events_train \
    .groupby(["track_id"]).agg(users=("user_id", "nunique"), avg_rating=("rating", "mean")).reset_index()

# Добавляем информацию о треках
top_k_pop_items = item_popularity.merge(
    track_info.set_index("track_id"), on="track_id").sort_values('users', ascending=False).head(100)

In [None]:
with pd.option_context('display.max_rows', 100):
    display(top_k_pop_items)

In [None]:
# сохранение в s3
import s3fs
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv
import os

load_dotenv()

S3_BUCKET = os.getenv('S3_BUCKET_NAME')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

s3 = s3fs.S3FileSystem(
        key=AWS_ACCESS_KEY_ID,
        secret=AWS_SECRET_ACCESS_KEY,
        endpoint_url='https://storage.yandexcloud.net',
        client_kwargs={'region_name': 'ru-central1'}  
)

s3_path = 'recsys/recommendations/top_popular.parquet'

full_path = f's3://{S3_BUCKET}/{s3_path}'
table = pa.Table.from_pandas(top_k_pop_items)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")

# Персональные

Рассчитаем персональные рекомендации.

In [None]:
import pandas as pd
similar_items = pd.read_parquet("similar_items.parquet")
candidates = pd.read_parquet("candidates_als_sim.parquet")
als_recommendations = pd.read_parquet("als_recommendations.parquet")
items = pd.read_parquet("items.parquet")
events = pd.read_parquet("events.parquet")
events_train = pd.read_parquet("events_train.parquet")
events_test = pd.read_parquet("events_test.parquet")

In [None]:
import pandas as pd
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from implicit.als import AlternatingLeastSquares
# from implicit.evaluation import precision_at_k, recall_at_k
from collections import defaultdict

In [None]:
user_item_matrix_train = csr_matrix((
    events_train["rating"],
    (events_train['user_id_enc'], events_train['track_id_enc'])),
    dtype=np.int8) 

user_item_matrix_train

In [None]:
als_model = AlternatingLeastSquares(factors=50, 
                                    iterations=10, 
                                    regularization=0.05, 
                                    random_state=42)

als_model.fit(user_item_matrix_train)

In [None]:
import joblib

# user_encoder = joblib.load('user_encoder.pkl')
# item_encoder = joblib.
def get_recommendations_als(user_item_matrix, model, user_id, user_encoder, item_encoder, include_seen=True, n=5):
    """
    Возвращает отранжированные рекомендации для заданного пользователя
    """
    user_id_enc = user_encoder.transform([user_id])[0]
    recommendations = model.recommend(
         user_id_enc, 
         user_item_matrix[user_id_enc], 
         filter_already_liked_items=not include_seen,
         N=n)
    recommendations = pd.DataFrame({"track_id_enc": recommendations[0], "score": recommendations[1]})
    recommendations["track_id"] = item_encoder.inverse_transform(recommendations["track_id_enc"])
    
    return recommendations

In [None]:
user_id = events_train['user_id'].sample().iat[0]

print(f"user_id: {user_id}")

print("История (последние события, recent)")
user_history = (
    events_train
    .query("user_id == @user_id")
    .merge(items.set_index("track_id"), on="track_id")
)
user_history_to_print = user_history.tail(10)
display(user_history_to_print)

print("Рекомендации")
user_recommendations = get_recommendations_als(user_item_matrix_train, als_model, user_id, user_encoder, item_encoder, include_seen=True, n=5)
user_recommendations = user_recommendations.merge(items, on="track_id")
display(user_recommendations)

In [None]:
user_ids_encoded = range(events_train['user_id_enc'].max())

# Получаем рекомендации для всех пользователей
als_recommendations = als_model.recommend(
    user_ids_encoded, 
    user_item_matrix_train[user_ids_encoded], 
    filter_already_liked_items=False, N=5)

In [None]:
item_ids_enc = als_recommendations[0]
als_scores = als_recommendations[1]

als_recommendations = pd.DataFrame({
    "user_id_enc": user_ids_encoded,
    "track_id_enc": item_ids_enc.tolist(), 
    "score": als_scores.tolist()})
als_recommendations = als_recommendations.explode(["track_id_enc", "score"], ignore_index=True)

# Приводим типы данных
als_recommendations["track_id_enc"] = als_recommendations["track_id_enc"].astype("int")
als_recommendations["score"] = als_recommendations["score"].astype("float")

# Получаем изначальные идентификаторы
als_recommendations["user_id"] = user_encoder.inverse_transform(als_recommendations["user_id_enc"])
als_recommendations["track_id"] = item_encoder.inverse_transform(als_recommendations["track_id_enc"])
als_recommendations = als_recommendations.drop(columns=["user_id_enc", "track_id_enc"])

In [None]:
als_recommendations = als_recommendations[["user_id", "track_id", "score"]]
als_recommendations.to_parquet("personal_als.parquet")

In [None]:
als_recommendations

In [None]:
personal_als = als_recommendations.merge(track_info.set_index("track_id"), 
                                         on="track_id")

In [None]:
personal_als[personal_als['user_id']==1374577]

In [None]:
# сохранение в s3
import s3fs
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv
import os

load_dotenv()

S3_BUCKET = os.getenv('S3_BUCKET_NAME')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

s3 = s3fs.S3FileSystem(
        key=AWS_ACCESS_KEY_ID,
        secret=AWS_SECRET_ACCESS_KEY,
        endpoint_url='https://storage.yandexcloud.net',
        client_kwargs={'region_name': 'ru-central1'}  
)

s3_path = 'recsys/recommendations/personal_als.parquet'

full_path = f's3://{S3_BUCKET}/{s3_path}'
table = pa.Table.from_pandas(personal_als)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")

# Похожие

Рассчитаем похожие, они позже пригодятся для онлайн-рекомендаций.

In [None]:
train_item_ids_enc = events_train['track_id_enc'].unique()

max_similar_items = 5

# Получаем списки похожих объектов, используя ранее полученную ALS-модель
similar_items = als_model.similar_items(train_item_ids_enc, N=max_similar_items+1)

# Преобразуем полученные списки в табличный формат
sim_item_item_ids_enc = similar_items[0]
sim_item_scores = similar_items[1]

similar_items = pd.DataFrame({
    "track_id_enc": train_item_ids_enc,
    "sim_track_id_enc": sim_item_item_ids_enc.tolist(), 
    "score": [sim_scores[0:] for sim_scores in sim_item_scores]})

In [None]:
similar_items = similar_items.explode(['sim_track_id_enc', 'score'])

# Приводим типы данных
similar_items["sim_track_id_enc"] = similar_items["sim_track_id_enc"].astype("int")
similar_items["score"] = similar_items["score"].astype("float")

# Получаем исходные идентификаторы
item_id_map = events_train[['track_id_enc', 'track_id']].drop_duplicates().set_index('track_id_enc')['track_id'].to_dict()
similar_items["track_id_1"] = similar_items["track_id_enc"].map(item_id_map)
similar_items["track_id_2"] = similar_items["sim_track_id_enc"].map(item_id_map)
similar_items = similar_items.drop(columns=["track_id_enc", "sim_track_id_enc"])

# Убираем пары с одинаковыми объектами
similar_items = similar_items.query("track_id_1 != track_id_2")

In [None]:
similar_items.to_parquet("similar_items.parquet")

In [None]:
# Создадим функцию чтобы вывести результаты с названиями треков
def print_sim_items(track_id, similar_items):

    item_columns_to_use = ["track_id",'track_name','artist_names','genre_names']
    
    item_id_1 = track_info.query("track_id == @track_id")[item_columns_to_use]
    display(item_id_1)
    
    si = similar_items.query("track_id_1 == @track_id")
    si = si.merge(track_info[item_columns_to_use].set_index("track_id"), left_on="track_id_2", right_index=True)
    display(si)

In [None]:
print_sim_items(53404, similar_items)

In [None]:
# сохранение в s3
import s3fs
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv
import os

load_dotenv()

S3_BUCKET = os.getenv('S3_BUCKET_NAME')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

s3 = s3fs.S3FileSystem(
        key=AWS_ACCESS_KEY_ID,
        secret=AWS_SECRET_ACCESS_KEY,
        endpoint_url='https://storage.yandexcloud.net',
        client_kwargs={'region_name': 'ru-central1'}  
)

s3_path = 'recsys/recommendations/similar_items.parquet'

full_path = f's3://{S3_BUCKET}/{s3_path}'
table = pa.Table.from_pandas(similar_items)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")

# Построение признаков

Построим три признака, можно больше, для ранжирующей модели.

In [None]:
import pandas as pd

events_train = pd.read_parquet('events_train.parquet')
events_test = pd.read_parquet('events_test.parquet')
items = pd.read_parquet("items.parquet")

In [None]:
split_date_for_labels = events_test["started_at"].max() - pd.Timedelta(days=14)

# Разделим данные
split_date_for_labels_idx = events_test["started_at"] < split_date_for_labels
events_labels = events_test[split_date_for_labels_idx].copy()
events_test_2 = events_test[~split_date_for_labels_idx].copy()

# Добавим признак таргета
events_labels["target"] = 1

# Проверим результат
print(f'Тестовый датасет: {events_test_2.shape}')
print(f'Датасет с взаимодействиями за неделю: {events_labels.shape}')

In [None]:
events_labels.sample(3)

In [None]:
als_recommendations = pd.read_parquet("personal_als.parquet")
candidates = als_recommendations.merge(events_labels[["user_id", "track_id", "target"]], 
                                       on=["user_id", "track_id"],
                                       how="left").rename(columns={"score": "als_score"})

# Заполним пропуски
candidates['target'] = candidates['target'].fillna(0).astype('int')

In [None]:
candidates = candidates.iloc[:10000]

In [None]:
candidates.target.value_counts()

In [None]:
candidates_to_sample = candidates.groupby("user_id").filter(lambda x: x["target"].sum() > 0)

# для каждого пользователя оставляем 5 негативных примеров
negatives_per_user = 5
candidates_for_train = pd.concat([
    candidates_to_sample.query("target == 1"),
    candidates_to_sample.query("target == 0") \
        .groupby("user_id") \
        .apply(lambda x: x.sample(negatives_per_user, random_state=0,replace=True))
    ]).reset_index(drop=True)

In [None]:
candidates_for_train.target.value_counts()

In [None]:
candidates_for_train = candidates_for_train.merge(items[["track_id", 'genre_names']],
                                                  on="track_id",
                                                  how="left")

In [None]:
events_train = events_train[events_train['track_id'].isin(candidates_for_train['track_id'])]

In [None]:
genre_and_votes = (events_train.merge(items[["track_id", "genre_names"]],
                                      on="track_id",
                                      how="left")["genre_names"]
                                      .value_counts(normalize=True)
                                      .reset_index(name="genre_and_votes")
                                      .rename(columns={"index": "genres"})
                                      )

# Сортировка по доле жанра
genre_and_votes.sort_values(by="genre_and_votes", ascending=False, inplace=True)

In [None]:
genre_and_votes

In [None]:
genre_and_votes['genre_names'] = genre_and_votes['genre_names'].astype(str)
candidates_for_train['genre_names'] = candidates_for_train['genre_names'].astype(str)
candidates_for_train = candidates_for_train.merge(genre_and_votes, on="genre_names", how="left" )

In [None]:
def get_user_features(events):
    """ считает пользовательские признаки """
    
    user_features = events.groupby("user_id").agg(
        tracks_month=("started_at", lambda x: (x.max()-x.min()).days/30),
        tracks_listened=("track_id", "count"),
        rating_avg=("rating", "mean"),
        rating_std=("rating", "std"))
    
    user_features["tracks_per_month"] = user_features["tracks_listened"] / user_features["tracks_month"]
    
    return user_features

In [None]:
user_features_for_train = get_user_features(events_train)
candidates_for_train = candidates_for_train.merge(user_features_for_train, on="user_id", how="left")

In [None]:
candidates_for_train.to_parquet('candidates_for_train.parquet')

In [None]:
events_inference = pd.concat([events_train, events_labels])
events_inference.info()

In [None]:
from scipy.sparse import coo_matrix, csr_matrix
from implicit.als import AlternatingLeastSquares
import numpy as np

user_item_matrix_train_2 = csr_matrix(
    (
    events_inference["rating"],
    (events_inference["user_id_enc"], events_inference["track_id_enc"])
    ),
    dtype=np.int8
)

# Проверим размерность
user_item_matrix_train_2

In [None]:
als_model_inference = AlternatingLeastSquares(factors=50, 
                                    iterations=50, 
                                    regularization=0.05, 
                                    random_state=42)

# Обучим модель ALS
als_model_inference.fit(user_item_matrix_train_2)

In [None]:
user_ids_encoded = range(events_inference['user_id_enc'].max())

als_recommendations_2 = als_model_inference.recommend(
    user_ids_encoded, 
    user_item_matrix_train_2[user_ids_encoded], 
    filter_already_liked_items=False, 
    N=100
)

In [None]:
import joblib

user_encoder = joblib.load('user_encoder.pkl')
item_encoder = joblib.load('item_encoder.pkl')

item_ids_enc = als_recommendations_2[0]
als_scores = als_recommendations_2[1]

als_recommendations_2 = pd.DataFrame({
    "user_id_enc": user_ids_encoded,
    "track_id_enc": item_ids_enc.tolist(), 
    "score": als_scores.tolist()})


In [None]:
als_recommendations_2 = als_recommendations_2.explode(["track_id_enc", "score"], ignore_index=True)

In [None]:
# Приводим типы данных
als_recommendations_2["track_id_enc"] = als_recommendations_2["track_id_enc"].astype("int")
als_recommendations_2["score"] = als_recommendations_2["score"].astype("float")

In [None]:
# Получаем изначальные идентификаторы
als_recommendations_2["user_id"] = user_encoder.inverse_transform(als_recommendations_2["user_id_enc"])
als_recommendations_2["track_id"] = item_encoder.inverse_transform(als_recommendations_2["track_id_enc"])
als_recommendations_2e = als_recommendations_2.drop(columns=["user_id_enc", "track_id_enc"])

In [None]:
als_recommendations_2 = als_recommendations_2[["user_id", "track_id", "score"]]

# Посмотрим на результат
als_recommendations_2.head(10)

In [None]:
als_recommendations_2.to_parquet("personal_als_2.parquet")

In [None]:
import pandas as pd
als_recommendations_2 = pd.read_parquet("personal_als_2.parquet")

In [None]:
candidates_to_rank = als_recommendations_2[als_recommendations_2.user_id.isin(events_test_2.user_id.drop_duplicates())]\
                                          .rename(columns={"score": "als_score"})

In [None]:
genre_and_votes

In [None]:
candidates_to_rank

In [None]:
# Добавим признаки треков
candidates_to_rank = candidates_to_rank.merge(items[["track_id", "genre_names"]], on="track_id", how="left")

In [None]:
# Преобразуем 'genre_names' в строку и сохраняем как новый столбец
candidates_to_rank['genre_names'] = candidates_to_rank['genre_names'].astype(str)

In [None]:
# Теперь используем новый столбец для объединения
candidates_to_rank = candidates_to_rank.merge(genre_and_votes, left_on="genre_names_str", right_on="genre_names", how="left")

In [None]:
# Добавим признаки треков
candidates_to_rank = candidates_to_rank.merge(items[["track_id", "genre_names"]], on="track_id", how="left")

# Преобразуем 'genre_names' в строку и сохраняем как новый столбец
candidates_to_rank['genre_names_str'] = candidates_to_rank['genre_names'].astype(str)

# Теперь используем новый столбец для объединения
candidates_to_rank = candidates_to_rank.merge(genre_and_votes, left_on="genre_names_str", right_on="genre_names", how="left")

In [None]:
# Получим новые признаки
user_features_for_ranking = get_user_features(events_inference)
candidates_to_rank = candidates_to_rank.merge(user_features_for_ranking, on="user_id", how="left")

In [None]:
# Сохраним результат
candidates_to_rank.to_parquet('candidates_to_rank.parquet')

# Ранжирование рекомендаций

Построим ранжирующую модель, чтобы сделать рекомендации более точными. Отранжируем рекомендации.

In [None]:
from catboost import CatBoostClassifier, Pool

# Задаём имена колонок признаков и таргета
features = ["als_score", "genre_names", "genre_and_votes", 
            'tracks_month','tracks_listened',
            'rating_avg','rating_std','tracks_per_month']
cat_features = ["genre_names"]
target = ["target"]

train_data = Pool(
    data=candidates_for_train[features],
    label=candidates_for_train[target],
    cat_features=cat_features,
)

# Инициализируем модель CatBoostClassifier
cb_model = CatBoostClassifier(iterations=1000,
                           learning_rate=0.1,
                           depth=6,
                           loss_function='Logloss',
                           verbose=100,
                           random_seed=42)

# Обучим модель
cb_model.fit(train_data)

In [None]:
# Создадим датасет для катбуста
inference_data = Pool(data=als_recommendations_2[features], cat_features=cat_features)
# Получим вероятности
predictions = cb_model.predict_proba(inference_data)
# Создадим признак с вероятностями базовой модели
als_recommendations_2["cb_score"] = predictions[:, 1]

In [None]:
# Для каждого пользователя проставляем rank, начиная с 1 — это максимальный cb_score
candidates_to_rank = candidates_to_rank.sort_values(["user_id", "cb_score"], ascending=[True, False])
candidates_to_rank["rank"] = candidates_to_rank.groupby("user_id").cumcount() + 1

# Отранжируем рекомендации
candidates_to_rank["rank"] = candidates_to_rank.groupby("user_id").cumcount() + 1

In [None]:
# Выведем диаграмму с оценкой влияния признаков на целевой
imp = pd.Series(cb_model.feature_importances_,
                features)

fig, ax = plt.subplots(figsize=(6,4))
imp.sort_values(ascending=False).plot.bar(ax=ax)
ax.set_title("Важность признаков")
ax.set_ylabel('Важность')
fig.tight_layout()

In [None]:
# Сохраним результат с другим названием
recommendations = candidates_to_rank.copy()

# Сохраним результат в рекомендации
recommendations.to_parquet("recommendations.parquet")

In [None]:
# сохранение в s3
import s3fs
import pyarrow as pa
import pyarrow.parquet as pq
from dotenv import load_dotenv
import os

load_dotenv()

S3_BUCKET = os.getenv('S3_BUCKET_NAME')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

s3 = s3fs.S3FileSystem(
        key=AWS_ACCESS_KEY_ID,
        secret=AWS_SECRET_ACCESS_KEY,
        endpoint_url='https://storage.yandexcloud.net',
        client_kwargs={'region_name': 'ru-central1'}  
)

s3_path = 'recsys/recommendations/recommendations.parquet'

full_path = f's3://{S3_BUCKET}/{s3_path}'
table = pa.Table.from_pandas(recommendations)
        
pq.write_table(
            table,
            full_path,
            filesystem=s3,
            compression='SNAPPY',
            coerce_timestamps='ms',
            allow_truncated_timestamps=True
)
print(f"Файл успешно сохранен: {full_path}")

# Оценка качества

Проверим оценку качества трёх типов рекомендаций: 

- топ популярных,
- персональных, полученных при помощи ALS,
- итоговых
  
по четырем метрикам: recall, precision, coverage, novelty.

In [None]:
top_k_pop_items = pd.read_parquet('top_popular.parquet')
events_cold = events[events.user_id.isin(cold_users)]
events_cold.info()

In [None]:
# Добавим топ рекомендованных треков
cold_users_events_with_recs = events_cold.merge(top_k_pop_items, on="track_id", how="left") 

# Отберем данные, которые удалось получить из рекомендаций без пропусков
cold_user_items_no_avg_rating_idx = cold_users_events_with_recs["avg_rating"].isnull()
cold_user_recs = cold_users_events_with_recs[~cold_user_items_no_avg_rating_idx] \
                 [["user_id", "track_id", "avg_rating"]]

# Отберем данные, где рекомендации не были получены
cold_user_no_recs = cold_users_events_with_recs[cold_user_items_no_avg_rating_idx] \
                  [["user_id", "track_id", "avg_rating"]] 

In [None]:
# Посчитаем покрытие холодных пользователей рекомендациями

cold_users_hit_ratio = cold_users_events_with_recs.groupby("user_id").agg(hits=("avg_rating", lambda x: (~x.isnull()).mean()))

print(f"Доля пользователей без релевантных рекомендаций: {(cold_users_hit_ratio == 0).mean().iat[0]:.2f}")
print(f"Среднее покрытие пользователей: {cold_users_hit_ratio[cold_users_hit_ratio != 0].mean().iat[0]:.2f}")

In [None]:
# Зададим функцию для расчета recall
def calculate_not_null_mean(series: pd.Series) -> float:
    """Вычисляет среднее значение ненулевых значений"""
    return series.notnull().mean()

# Расчитаем среднее количество вхождений популярных треков для всех пользователей
recall_top_popular = cold_users_events_with_recs.groupby("user_id")["avg_rating"].apply(calculate_not_null_mean).mean()

print(f"Рекомендации топ-100, recall: {recall_top_popular:.5f}")

In [None]:
# Загрузим рекомендации
als_recommendations = pd.read_parquet('personal_als.parquet')

In [None]:
def process_events_recs_for_binary_metrics(events_train, events_test, recs, top_k=None):

    """
    Размечает пары <user_id, item_id> для общего множества пользователей признаками
    - gt (ground truth)
    - pr (prediction)
    top_k: расчёт ведётся только для top k-рекомендаций
    """
    
    events_test["gt"] = True
    common_users = set(events_test["user_id"]) & set(recs["user_id"])

    print(f"Common users: {len(common_users)}")
    
    events_for_common_users = events_test[events_test["user_id"].isin(common_users)].copy()
    recs_for_common_users = recs[recs["user_id"].isin(common_users)].copy()

    recs_for_common_users = recs_for_common_users.sort_values(["user_id", "score"], ascending=[True, False])

    events_for_common_users = events_for_common_users[events_for_common_users["track_id"].isin(
        events_train["track_id"].unique()
    )
    ]

    if top_k is not None:
        recs_for_common_users = recs_for_common_users.groupby("user_id").head(top_k)
    
    events_recs_common = events_for_common_users[["user_id", "track_id", "gt"]].merge(
        recs_for_common_users[["user_id", "track_id", "score"]], 
        on=["user_id", "track_id"], 
        how="outer",
    )    
    events_recs_common["gt"] = events_recs_common["gt"].fillna(False)
    events_recs_common["pr"] = ~events_recs_common["score"].isnull()
    
    events_recs_common["tp"] = events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fp"] = ~events_recs_common["gt"] & events_recs_common["pr"]
    events_recs_common["fn"] = events_recs_common["gt"] & ~events_recs_common["pr"]

    return events_recs_common

In [None]:
events_recs_for_binary_metrics = process_events_recs_for_binary_metrics(
    events_train,
    events_test, 
    als_recommendations, 
    top_k=5)

In [None]:
def compute_cls_metrics(events_recs_for_binary_metric):
    
    """Расчет precision и recall"""
    
    groupper = events_recs_for_binary_metric.groupby("user_id")

    # Computing precision
    precision = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fp"].sum())
    precision = precision.fillna(0).mean()
    
    # Computing recall
    recall = groupper["tp"].sum()/(groupper["tp"].sum()+groupper["fn"].sum())
    recall = recall.fillna(0).mean()

    return precision, recall

In [None]:
# precision@5, recall@5
precision, recall = compute_cls_metrics(events_recs_for_binary_metrics)

print(f'Персональные рекомендации, precision: {precision:.4f}, recall: {recall:.4f}')

In [None]:
# Расчёт покрытия по объектам
cov_als = als_recommendations['track_id'].nunique() / als_recommendations['user_id'].nunique()

print(f"Персональные рекомендации, покрытие: {cov_als:.2f}") 

In [None]:
# разметим каждую рекомендацию признаком listened
events_train["listened"] = True

als_recommendations = als_recommendations.merge(events_train, on=["user_id", "track_id"], how="left")
als_recommendations["listened"] = als_recommendations["listened"].fillna(False).astype("bool")

In [None]:
# Проставим ранги
als_recommendations = als_recommendations.sort_values(by=["user_id", "score"], ascending=[True, False])
als_recommendations["rank"] = als_recommendations.groupby("user_id").cumcount() + 1

In [None]:
# Посчитаем novelty по пользователям
novelty_5_als = (1-als_recommendations.query("rank <= 5").groupby("user_id")["listened"].mean()).mean()

print(f"Персональные рекомендации, novelty_5: {novelty_5_als:.2f}")

In [None]:
# Загрузим рекомендации
final_recommendations = pd.read_parquet('recommendations.parquet')

In [None]:
# Получим метрики для финальных рекомендаций
cb_events_recs_for_binary_metrics_5 = process_events_recs_for_binary_metrics(
    events_inference,
    events_test_2,
    final_recommendations.rename(columns={"cb_score": "score"}), 
    top_k=5)

cb_precision_5, cb_recall_5 = compute_cls_metrics(cb_events_recs_for_binary_metrics_5)

print(f"Итоговые рекомендации, precision: {cb_precision_5:.4f}, recall: {cb_recall_5:.4f}")

In [None]:
# Расчёт покрытия по объектам
cov_final = final_recommendations['track_id'].nunique() / final_recommendations['user_id'].nunique()

print(f"Итоговые рекомендации, покрытие: {cov_final:.2f}") 

In [None]:
# разметим каждую рекомендацию признаком listened
events_train["listened"] = True

final_recommendations = final_recommendations.merge(events_train, on=["user_id", "track_id"], how="left")
final_recommendations["listened"] = final_recommendations["listened"].fillna(False).astype("bool")

In [None]:
# Проставим ранги
final_recommendations = final_recommendations.sort_values(by=["user_id", "cb_score"], ascending=[True, False])
final_recommendations["rank"] = final_recommendations.groupby("user_id").cumcount() + 1

In [None]:
# Посчитаем novelty по пользователям
novelty_5_final = (1-final_recommendations.query("rank <= 5").groupby("user_id")["listened"].mean()).mean()

print(f"Итоговые рекомендации, novelty_5: {novelty_5_final:.3f}")

# === Выводы, метрики ===

Основные выводы при работе над расчётом рекомендаций, рассчитанные метрики.

In [None]:

# Сделаем сводную таблицу по метрикам

popular_metrics = pd.Series([recall_top_popular], index=["recall_top_popular"], name="top_popular")

personal_als_metrics = pd.Series([precision, recall, cov_als, novelty_5_als], 
                                 index=["precision@5", "recall@5", "coverage", "novelty@5"],
                                 name="personal_als")

final_recs_metrics = pd.Series([cb_precision_5, cb_recall_5,cov_final,novelty_5_final], 
                               index=["precision@5", "recall@5", "coverage", "novelty@5"],
                               name="final_recs")

pd.concat([popular_metrics, personal_als_metrics, final_recs_metrics], axis=1)

Результаты получились не самые лучшие, связано это с тем, что были ограничения в вычислительной мощности и приходилось ограничивать данные. По поводу низких значений метрик покрытия можно сделать вывод, что в рекомендациях представлена лишь небольшая часть треков из общего набора. Ркомендация топ-популярных треков в целом показывает себя лучше на усеченном датасете, по сравнению с другими метриками. Показатель новизны высок, думаю это связано с урезкой датасета.