# Инициализация

Загружаем библиотеки необходимые для выполнения кода ноутбука.

In [1]:
import pandas as pd
import boto3
from io import BytesIO
import pandas as pd
import os


# === ЭТАП 1 ===

# Загрузка первичных данных

Загружаем первичные данные из файлов:
- tracks.parquet
- catalog_names.parquet
- interactions.parquet

In [2]:


# Загрузка данных
tracks = pd.read_parquet('tracks.parquet')
catalog_names = pd.read_parquet('catalog_names.parquet')
interactions = pd.read_parquet('interactions.parquet')

# Обзор данных

Проверяем данные, есть ли с ними явные проблемы.

In [None]:
print(tracks.info())
print(tracks.isnull().sum())
print(tracks['track_id'].nunique() == len(tracks))

In [None]:
print(catalog_names.info())
print(catalog_names.isnull().sum())
print(catalog_names.groupby('type')['id'].nunique())

In [None]:
print(interactions.info())
print(interactions.isnull().sum())
print(interactions.duplicated(subset=['user_id', 'track_id']).sum())

# Выводы

Приведём выводы по первому знакомству с данными:
- есть ли с данными явные проблемы,
- какие корректирующие действия (в целом) были предприняты.

# === ЭТАП 2 ===

# EDA

Распределение количества прослушанных треков.

In [None]:
# Группируем по пользователям и считаем количество прослушиваний
user_plays = interactions.groupby('user_id').size().reset_index(name='tracks_played')
distribution = user_plays['tracks_played'].describe(percentiles=[.25, .5, .75, .9, .95, .99])
print("Распределение количества прослушанных треков:")
print(distribution)

Наиболее популярные треки

In [None]:
# Считаем количество прослушиваний для каждого трека
track_popularity = interactions.groupby('track_id').size().reset_index(name='plays')
top_tracks = track_popularity.sort_values('plays', ascending=False).head(10)
print("\nТоп-10 популярных треков:")
print(top_tracks)

Наиболее популярные жанры

In [8]:

track_genres = tracks.set_index('track_id')['genres'].to_dict()



In [9]:

track_plays = interactions['track_id'].value_counts().reset_index()
track_plays.columns = ['track_id', 'plays']

In [10]:
from collections import defaultdict 

genre_plays = defaultdict(int)

In [11]:

for track_id, plays in track_plays.itertuples(index=False):
    genres = track_genres.get(track_id, [])
    for genre_id in genres:
        genre_plays[genre_id] += plays

In [12]:

genre_popularity = (
    pd.DataFrame(list(genre_plays.items()), columns=['genre_id', 'plays'])
    .sort_values('plays', ascending=False)
    .head(10)
)

In [None]:

genre_names = (
    catalog_names[catalog_names['type'] == 'genre']
    .set_index('id')['name']
    .to_dict()
)

genre_popularity['genre_name'] = genre_popularity['genre_id'].map(genre_names)

print("Топ-10 популярных жанров:")
print(genre_popularity[['genre_name', 'plays']])

Треки, которые никто не прослушал

In [None]:
# Находим треки из каталога, отсутствующие в прослушиваниях
unplayed_tracks = tracks[~tracks['track_id'].isin(interactions['track_id'])][['track_id']]
print("\nТреки без прослушиваний:")
print(unplayed_tracks)

# Преобразование данных

Преобразуем данные в формат, более пригодный для дальнейшего использования в расчётах рекомендаций.

In [None]:
# Извлечение названий треков
track_names = catalog_names[catalog_names['type'] == 'track'][['id', 'name']]
track_names = track_names.rename(columns={'id': 'track_id', 'name': 'track_name'})

# Извлечение названий альбомов
album_names = catalog_names[catalog_names['type'] == 'album'][['id', 'name']]
album_names = album_names.rename(columns={'id': 'album_id', 'name': 'album_name'})

# Извлечение названий артистов
artist_names = catalog_names[catalog_names['type'] == 'artist'][['id', 'name']]
artist_names = artist_names.rename(columns={'id': 'artist_id', 'name': 'artist_name'})

# Извлечение названий жанров
genre_names = catalog_names[catalog_names['type'] == 'genre'][['id', 'name']]
genre_names = genre_names.rename(columns={'id': 'genre_id', 'name': 'genre_name'})

# Объединение данных
items = (
    tracks
    # Добавление названий треков
    .merge(track_names, on='track_id', how='left')
    
    # Развертывание списков альбомов
    .explode('albums')
    .rename(columns={'albums': 'album_id'})
    # Добавление названий альбомов
    .merge(album_names, on='album_id', how='left')
    
    # Развертывание списков артистов
    .explode('artists')
    .rename(columns={'artists': 'artist_id'})
    # Добавление названий артистов
    .merge(artist_names, on='artist_id', how='left')
    
    # Развертывание списков жанров
    .explode('genres')
    .rename(columns={'genres': 'genre_id'})
    # Добавление названий жанров
    .merge(genre_names, on='genre_id', how='left')
    
    # Группировка в списки
    .groupby(['track_id', 'track_name']).agg({
        'album_id': list,
        'album_name': list,
        'artist_id': list,
        'artist_name': list,
        'genre_id': list,
        'genre_name': list
    }).reset_index()
)

# Проверка результата
print(items.head(3))

# Сохранение данных

Сохраним данные в двух файлах в персональном S3-бакете по пути `recsys/data/`:
- `items.parquet` — все данные о музыкальных треках,
- `events.parquet` — все данные о взаимодействиях.

In [16]:
events = interactions.copy()

In [None]:
%load_ext autoreload
%autoreload 2
from dotenv import load_dotenv, find_dotenv
import os

In [None]:
# подгружаем .env
load_dotenv()

In [19]:
s3_bucket = os.environ.get('S3_BUCKET_NAME')
s3_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
s3_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

In [None]:
print(s3_access_key)

In [None]:
import os
import boto3
import pandas as pd
from io import BytesIO

# Правильное получение переменных окружения
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
BUCKET_NAME = os.getenv("S3_BUCKET_NAME")  # имя переменной должно совпадать

# Проверка наличия ВСЕХ переменных
required_vars = {
    "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
    "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
    "S3_BUCKET_NAME": BUCKET_NAME
}

missing = [var for var, val in required_vars.items() if not val]
if missing:
    raise ValueError(f"Отсутствуют переменные окружения: {', '.join(missing)}")

# Настройка окружения для MLflow (только если нужно для других компонентов)
os.environ.update({
    "MLFLOW_S3_ENDPOINT_URL": "https://storage.yandexcloud.net",
    "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,  # используем полученные значения
    "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
})

# Инициализация клиента S3
s3 = boto3.client(
    's3',
    endpoint_url="https://storage.yandexcloud.net",  # явное указание endpoint
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

def save_df_to_s3_parquet(df, bucket, key):
    """Сохраняет DataFrame в Parquet на S3"""
    if not isinstance(df, pd.DataFrame):
        raise TypeError("df должен быть pandas DataFrame")
    
    buffer = BytesIO()
    df.to_parquet(buffer, index=False)  # index=False для экономии места
    buffer.seek(0)
    
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=buffer,
        ContentType='application/parquet'
    )
    print(f"Файл {key} успешно загружен в S3")

try:
    save_df_to_s3_parquet(items, BUCKET_NAME, 'recsys/data/items.parquet')
    save_df_to_s3_parquet(events, BUCKET_NAME, 'recsys/data/events.parquet')
    print("Все файлы успешно загружены в S3!")
    
except Exception as e:
    print(f"Ошибка при загрузке в S3: {str(e)}")
    # Добавляем вывод самих ключей для отладки (осторожно!)
    print(f"BUCKET_NAME: {BUCKET_NAME}")
    print(f"Ключ доступа: {AWS_ACCESS_KEY_ID[:5]}...")  # показываем только первые 5 символов
    print(f"Тип данных items: {type(items)}")
    print(f"Тип данных events: {type(events)}")

In [22]:

items.to_parquet("items.parquet")
events.to_parquet("events.parquet")

# Очистка памяти

Здесь, может понадобится очистка памяти для высвобождения ресурсов для выполнения кода ниже. 

Приведите соответствующие код, комментарии, например:
- код для удаление более ненужных переменных,
- комментарий, что следует перезапустить kernel, выполнить такие-то начальные секции и продолжить с этапа 3.

In [None]:
import gc
gc.collect()

print("Память частично освобождена")


# === ЭТАП 3 ===

# Загрузка данных

Если необходимо, то загружаем items.parquet, events.parquet.

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

# Загрузка обработанных данных
items = pd.read_parquet('/home/mle-user/mle-project-sprint-4-v001/items.parquet')
events = pd.read_parquet('/home/mle-user/mle-project-sprint-4-v001/events.parquet')

# Преобразование даты
events['started_at'] = pd.to_datetime(events['started_at'])

# Разделение данных
train_events = events[events['started_at'] < datetime(2022, 12, 16)]
test_events = events[events['started_at'] >= datetime(2022, 12, 16)]

# Сохранение тестовых данных для оценки
test_events.to_parquet('test_events.parquet', index=False)

# Разбиение данных

Разбиваем данные на тренировочную, тестовую выборки.

# Топ популярных

Рассчитаем рекомендации как топ популярных.

In [2]:
# Расчет популярности треков
top_popular = (
    train_events.groupby('track_id')
    .size()
    .reset_index(name='count')
    .sort_values('count', ascending=False)
    .head(1000)  # Топ-1000 популярных треков
)

# Сохранение результатов
top_popular.to_parquet('top_popular.parquet', index=False)

# Персональные

Рассчитаем персональные рекомендации.

In [3]:
from implicit.als import AlternatingLeastSquares
import scipy.sparse as sp
import numpy as np
import pandas as pd

# Ограничение датасета: выбираем топ-N пользователей по количеству взаимодействий
USER_LIMIT = 10000  # Уменьшите для ускорения

# Фильтрация активных пользователей
user_activity = train_events['user_id'].value_counts()
selected_users = user_activity.head(USER_LIMIT).index
train_events_limited = train_events[train_events['user_id'].isin(selected_users)].copy()

# Пересоздаем отображения для ограниченного датасета
user_ids = train_events_limited['user_id'].unique()
track_ids = train_events_limited['track_id'].unique()

user2idx = {user: idx for idx, user in enumerate(user_ids)}
track2idx = {track: idx for idx, track in enumerate(track_ids)}
idx2user = {idx: user for user, idx in user2idx.items()}
idx2track = {idx: track for track, idx in track2idx.items()}

# Строим матрицу взаимодействий
rows = train_events_limited['user_id'].map(user2idx)
cols = train_events_limited['track_id'].map(track2idx)
values = np.ones(len(train_events_limited))

user_item_matrix = sp.csr_matrix(
    (values, (rows, cols)), 
    shape=(len(user_ids), len(track_ids))
)

# Уменьшаем параметры модели для ускорения
model = AlternatingLeastSquares(
    factors=32,           # Уменьшено с 64
    regularization=0.05,
    iterations=10,        # Уменьшено с 15
    random_state=42,
    use_gpu=False        # Отключить GPU при проблемах
)
model.fit(user_item_matrix)

# Генерируем рекомендации только для ограниченного набора пользователей
user_indices = np.arange(len(user_ids))
recommendations = model.recommend(
    user_indices, 
    user_item_matrix,
    N=10,
    filter_already_liked_items=True
)

# Формируем результат
user_ids_arr = np.array([idx2user[i] for i in range(len(user_ids))])
track_ids_arr = np.array([idx2track[i] for i in range(len(track_ids))])

user_indices_flat = np.repeat(user_indices, 10)
user_ids_flat = user_ids_arr[user_indices_flat]
track_indices_flat = recommendations[0].flatten()
track_ids_flat = track_ids_arr[track_indices_flat]
scores_flat = recommendations[1].flatten()

personal_als = pd.DataFrame({
    'user_id': user_ids_flat,
    'track_id': track_ids_flat,
    'score': scores_flat
})

personal_als.to_parquet('personal_als.parquet', index=False)

  check_blas_config()


  0%|          | 0/10 [00:00<?, ?it/s]

# Похожие

Рассчитаем похожие, они позже пригодятся для онлайн-рекомендаций.

In [5]:
import concurrent.futures
import numpy as np

# Ограничиваем количество треков
TRACK_LIMIT = 5000

# Выбираем топ-N популярных треков
track_popularity = train_events_limited['track_id'].value_counts()
top_tracks = track_popularity.head(TRACK_LIMIT).index.tolist()

# Создаем эффективные отображения
track_indices = np.array([track2idx[track] for track in top_tracks])
idx2track_arr = np.array(list(idx2track.values()))  # Векторизованный доступ

# Кэшируем матрицу факторов для треков
track_factors = model.item_factors

# Функция для обработки одного трека
def process_track(track_idx):
    similar = model.similar_items(track_idx, N=6)  # Берем только 5 похожих + сам трек
    
    results = []
    for similar_idx, score in zip(similar[0][1:], similar[1][1:]):
        # Прямой доступ через массив вместо словаря
        similar_track_id = idx2track_arr[similar_idx]
        results.append((
            idx2track_arr[track_idx],  # original_track_id
            similar_track_id,
            score
        ))
    return results

# Многопоточная обработка
similar_items = []
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(process_track, tidx) for tidx in track_indices]
    for future in concurrent.futures.as_completed(futures):
        similar_items.extend(future.result())

# Создаем DataFrame и сохраняем
similar = pd.DataFrame(similar_items, columns=['original_track_id', 'similar_track_id', 'score'])
similar.to_parquet('similar.parquet', index=False)

# Построение признаков

Построим три признака, можно больше, для ранжирующей модели.

# Ранжирование рекомендаций

Построим ранжирующую модель, чтобы сделать рекомендации более точными. Отранжируем рекомендации.

# Оценка качества

Проверим оценку качества трёх типов рекомендаций: 

- топ популярных,
- персональных, полученных при помощи ALS,
- итоговых
  
по четырем метрикам: recall, precision, coverage, novelty.

# === Выводы, метрики ===

Основные выводы при работе над расчётом рекомендаций, рассчитанные метрики.