# Инициализация

Загружаем библиотеки необходимые для выполнения кода ноутбука.

In [2]:
import logging
from statistics import mode
import threading
import matplotlib.pyplot as plt
import numpy as np
import fireducks.pandas as pd
import os
import boto3

In [2]:
%matplotlib inline
%config InlineBackend.figure_format = 'png'
%config InlineBackend.figure_format = 'retina'

# === ЭТАП 1 ===

# Загрузка первичных данных

Загружаем первичные данные из файлов:
- tracks.parquet
- catalog_names.parquet
- interactions.parquet

In [3]:
tracks = pd.read_parquet("data/tracks.parquet")
catalog_names = pd.read_parquet("data/catalog_names.parquet")
clickstream = pd.read_parquet("data/interactions.parquet")

In [4]:

def rename_list(lst, dic):
    """
    Replace a list of ids with the values from a dictionary
    """
    for i, element in enumerate(lst):
        if element in dic.keys():
            lst[i] = dic[element][0]
        else:
            lst[i] = np.nan
    return lst

In [5]:
tracks_to_catalog_dict ={'albums':'album', 'artists':'artist','genres':'genre','track_id':'track'}
for col in ['albums', 'artists', 'genres']:
    print('Starting renaming: {}'.format(col))
    cat_col = tracks_to_catalog_dict[col]
    renamer = dict(zip(catalog_names[catalog_names['type']==cat_col]['id'], zip(catalog_names[catalog_names['type']==cat_col]['name'])))
    tracks[col]=tracks[col].apply(lambda x: rename_list(list(x), renamer))
    print('Finished renaming: {}'.format(col))
renamer = dict(zip(catalog_names[catalog_names['type']=='track']['id'], zip(catalog_names[catalog_names['type']=='track']['name'])))
tracks['track_name']=tracks['track_id'].map(renamer)
tracks

Starting renaming: albums
Finished renaming: albums
Starting renaming: artists
Finished renaming: artists
Starting renaming: genres
Finished renaming: genres


Unnamed: 0,track_id,albums,artists,genres,track_name
0,26,"[Taller Children, Taller Children]",[Elizabeth & the Catapult],"[pop, folk]",[Complimentary Me]
1,38,"[Taller Children, Taller Children]",[Elizabeth & the Catapult],"[pop, folk]",[Momma's Boy]
2,135,"[Wild Young Hearts, Wild Young Hearts, Wild Yo...",[Noisettes],[pop],[Atticus]
3,136,"[Wild Young Hearts, Wild Young Hearts, Wild Yo...",[Noisettes],[pop],[24 Hours]
4,138,"[Wild Young Hearts, Wild Young Hearts, Don't U...",[Noisettes],[pop],[Don't Upset The Rhythm (Go Baby Go)]
...,...,...,...,...,...
999995,101478482,[На лицо],[FLESH],"[rusrap, rap]",[На лицо]
999996,101490148,[Без капли мысли],[Даня Милохин],"[pop, ruspop]",[Без капли мысли]
999997,101493057,[SKITTLES],[WhyBaby?],"[foreignrap, rap]",[SKITTLES]
999998,101495927,[Москва],[Yanix],"[rusrap, rap]",[Москва]


In [6]:
items = tracks.copy()
del tracks

# Обзор данных

Проверяем данные, есть ли с ними явные проблемы.

In [7]:
clickstream.info()

<class 'fireducks.pandas.frame.DataFrame'>
Index: 222629898 entries, 0 to 291
Data columns (total 4 columns):
 #   Column      Dtype         
---  ------      -----         
 0   user_id     int32         
 1   track_id    int32         
 2   track_seq   int16         
 3   started_at  datetime64[ns]
dtypes: datetime64[ns](1), int16(1), int32(2)
memory usage: 5.5 GB


In [8]:
items.info()

<class 'fireducks.pandas.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 5 columns):
 #   Column      Non-Null Count    Dtype 
---  ------      --------------    ----- 
 0   track_id    1000000 non-null  int64 
 1   albums      1000000 non-null  object
 2   artists     1000000 non-null  object
 3   genres      1000000 non-null  object
 4   track_name  1000000 non-null  object
dtypes: int64(1), object(4)
memory usage: 38.3+ MB


Пример данных по треку

In [9]:
items.sample(5)

Unnamed: 0,track_id,albums,artists,genres,track_name
197752,6814031,"[Breakfast At Pappa's, Breakfast at Pappa's]",[Consumed],"[rock, allrock]",[Heavy Metal Winner]
990243,96225911,"[Электроакустика, Часть 1]",[ДМЦ],"[rusrock, allrock]",[Сирень]
462156,32208409,"[Камикадзе, Камикадзе]",[#####],"[metal, None]",[Пусто]
955983,86796485,[Forever Sailing],"[Honshu Lo fi, Tonion, zmeyev]","[foreignrap, rap]",[Forever Sailing]
227334,10798373,"[Feel Pop, Sport Hits for Kids, Morninng Has B...",[Knightsbridge],[pop],[If I Fell]


Пример данных по действиям в приложении

In [10]:
clickstream.sample(5)

Unnamed: 0,user_id,track_id,track_seq,started_at
120,440524,25922318,121,2022-06-28
41,895626,17856052,42,2022-12-24
348,1155789,62654832,349,2022-08-17
196,497496,59169438,197,2022-12-06
196,512773,50685860,197,2022-12-04


In [11]:
items.isna().sum()

track_id      0
albums        0
artists       0
genres        0
track_name    0
dtype: int64

In [12]:
np.isnan(items['artists'].any()).sum()

0

In [13]:
# Подсчитываем количество строк, в которых в списках содержится np.nan
for col in ['albums', 'artists', 'genres','track_name']:
    print(col, items[col].apply(lambda x: None in x).sum())

albums 0
artists 0
genres 48345
track_name 0


In [14]:
items[items['genres'].apply(lambda x: None in x)]

Unnamed: 0,track_id,albums,artists,genres,track_name
36,436,[A Secret Place],"[Grover Washington, Jr.]","[jazz, None]",[A Secret Place]
59,594,"[Sides Of Blue, Vol. 2, Jazz Six Pack, Everybo...",[Bill Evans],"[jazz, None]",[Peace Piece]
125,1025,"[Jazz Six Pack, Art Pepper Meets The Rhythm Se...",[Art Pepper],"[jazz, None]",[You'd Be So Nice To Come Home To]
126,1026,"[Jazz Six Pack, The Art Of The Ballad, Winter ...",[Art Pepper],"[jazz, None]",[Winter Moon]
128,1028,"[Jazz Six Pack, Smack Up, On Contemporary: Art...",[Art Pepper Quintet],"[jazz, None]",[Smack Up]
...,...,...,...,...,...
999770,101009893,[Доработано Напильником],"[Metalolom, Доктор Дью]","[metal, None]",[Глаза]
999798,101049628,[Demons Are a Girl's Best Friend (Powerwolf Co...,[Radio Tapok],"[metal, None]",[Demons Are a Girl's Best Friend (Powerwolf Co...
999863,101200283,[The Grey],[Bad Omens],"[metal, None]",[The Grey]
999907,101256806,[Soviet March (Cover)],[Radio Tapok],"[metal, None]",[Soviet March (Cover)]


In [15]:
list1 = clickstream['track_id'].unique().tolist()
list2 = items['track_id'].unique().tolist()
array1 = np.array(list1)
array2 = np.array(list2)

count = np.sum(np.isin(array1, array2, invert=True))

print(count)

0


# Выводы

Приведём выводы по первому знакомству с данными:
- данных много, и явных проблем с ними не наблюдается
- был собран набор данных items, в которую из каталога перетащили информацию по трекам, это было сделано во-первых,   
для лучшего восприятия человеком, во-вторых, векторизация текстовых данных даст нам более полезные признаки, чем   
простое порядковое кодирование;  
- несколько смущает то, что в каждой ячейке items получился список, но пока это не создаёт проблем, посмотрим на этапе преобработки,  
может быть и поменяем типы;
- пропусков в данных практически нет, обнаружено что некоторые треки (менее 5%) отнесены к жанру None, но такк как это жанр не единственный, то  
сильно это картину не испортит;
- самое приятное, что в истории пользователей нет треков, которых не было бы в каталоге

# === ЭТАП 2 ===

# EDA

Распределение количества прослушанных треков.

In [16]:
print("Всего прослушано уникальных терков: ", clickstream["track_id"].nunique())

Всего прослушано уникальных терков:  1000000


Наиболее популярные треки

In [17]:
top10_tracks=clickstream["track_id"].value_counts().nlargest(10).index.tolist()
items[items['track_id'].isin(top10_tracks)]

Unnamed: 0,track_id,albums,artists,genres,track_name
9098,53404,"[Nevermind, Nirvana, Nevermind, Nevermind, Nev...",[Nirvana],"[alternative, rock, allrock]",[Smells Like Teen Spirit]
26665,178529,"[Meteora, Meteora, Meteora, Meteora, 00s Rock ...",[Linkin Park],"[numetal, metal]",[Numb]
90461,795836,"[Ten Summoner's Tales, 25 Years, The Best Of 2...",[Sting],"[pop, rock, allrock]",[Shape Of My Heart]
368072,24692821,"[Way down We Go, Summer Music 2016, A/B, DFM D...",[KALEO],[indie],[Way Down We Go]
475289,32947997,"[Shape of You, ÷, ÷, Summer Vibes, Pop]",[Ed Sheeran],[pop],[Shape of You]
483876,33311009,"[Shape Of Pop, NOW That's What I Call Music, E...",[Imagine Dragons],"[rock, allrock]",[Believer]
512157,35505245,"[I Got Love, I Got Love]","[Miyagi & Эндшпиль, Рем Дигга]","[rusrap, rap]",[I Got Love]
647237,45499814,"[Life, Life, Made in Russia, Fresh Dance, Life...",[Zivert],"[pop, ruspop]",[Life]
696106,51241318,"[In the End, Christian TikTok, Trending Now 20...","[Tommee Profitt, Fleurie, Mellen Gi]",[rnb],[In The End]
829320,65851540,[Юность],[Dabro],"[pop, ruspop]",[Юность]


Наиболее популярные жанры

In [18]:
di = dict(zip(items['track_id'], items['genres']))
out = pd.DataFrame(columns=['genres','count'])
for i in range(5):
    sub = clickstream.sample(10000)
    sub['genres'] = sub['track_id'].map(di)
    sub['genres'] = sub['genres'].apply(lambda x: str(x))
    out1 = pd.DataFrame(dict(sub['genres'].value_counts().nlargest(10)).items(), columns=['genres','count'])
    out = pd.merge(out, out1, how='outer', on='genres', suffixes=(None, str(i)))

In [19]:
out['total'] = out.sum(axis=1)
out.sort_values(by='total',ascending=False).head(10)

Unnamed: 0,genres,count,count0,count1,count2,count3,count4,total
0,['pop' 'ruspop'],,1197,1155,1151,1209,1221,5933
1,['rusrap' 'rap'],,1114,1146,1169,1171,1176,5776
2,['pop'],,972,1021,1029,1007,1000,5029
3,['dance'],,641,685,671,666,682,3345
4,['rusrock' 'allrock'],,600,598,602,597,558,2955
5,['foreignrap' 'rap'],,533,516,519,518,502,2588
6,['electronics'],,520,489,505,502,492,2508
7,['rock' 'allrock'],,399,409,397,394,415,2014
8,['alternative'],,340,337,332,358,337,1704
9,['indie'],,245,252,234,230,260,1221


Треки, которые никто не прослушал

In [20]:
clickstream[clickstream['track_seq']==0]

Unnamed: 0,user_id,track_id,track_seq,started_at


In [21]:
list1 = clickstream['track_id'].unique().tolist()
list2 = items['track_id'].unique().tolist()
array1 = np.array(list1)
array2 = np.array(list2)

count = np.sum(np.isin(array2, array1, invert=True))

print(count)

0


# Преобразование данных

Преобразуем данные в формат, более пригодный для дальнейшего использования в расчётах рекомендаций.

In [7]:
for col in ['albums', 'artists', 'genres','track_name']:
    items[col]=items[col].apply(lambda x: str(x))

# Сохранение данных

Сохраним данные в двух файлах в персональном S3-бакете по пути `recsys/data/`:
- `items.parquet` — все данные о музыкальных треках,
- `events.parquet` — все данные о взаимодействиях.

import threading
# Разделяем DataFrame на несколько частей
n_parts = 40
df_parts = np.array_split(clickstream, n_parts)
def save_dataframe(df, filename):
    df.to_parquet(filename)
# Создаем потоки для сохранения каждой части DataFrame
threads = []
for i, df_part in enumerate(df_parts):
    filename = f'df_part_{i}.parquet'
    thread = threading.Thread(target=save_dataframe, args=(df_part, filename))
    threads.append(thread)
    thread.start()

# Ждем завершения всех потоков
for thread in threads:
    thread.join()

print("DataFrame saved")

In [23]:
#items.to_parquet("data/items.parquet")
#clickstream.to_parquet("data/events.parquet")

In [24]:
import boto3
import os

# Убедитесь, что у вас есть правильный AWS Access Key Id и AWS Secret Access Key, и у вас есть доступ к S3-бакету
s3_resource = boto3.resource('s3', aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"))

# Проверяем подключение к S3-бакету
try:
    s3_resource.meta.client.head_bucket(Bucket=os.getenv("S3_BUCKET_NAME"))
    print("Connected to S3 bucket")
except Exception as e:
    print("Error connecting to S3 bucket:", e)

# Проверяем возможность записи файлов в S3-бакет
try:
    s3_resource.Object(os.getenv("S3_BUCKET_NAME"), 'test.txt').put(Body='test')
    print("File written to S3 bucket")
except Exception as e:
    print("Error writing file to S3 bucket:", e)

Error connecting to S3 bucket: An error occurred (403) when calling the HeadBucket operation: Forbidden
Error writing file to S3 bucket: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.


from io import BytesIO # python3; python2: BytesIO
import boto3
buffer = BytesIO()
items.to_parquet(buffer)
# Загружаем данные в S3-бакет
s3_resource = boto3.resource('s3', aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"))
s3_resource.Object(os.getenv("S3_BUCKET_NAME"), 'recsys/data/items.parquet').put(Body=buffer.getvalue())


# Очистка памяти

Здесь, может понадобится очистка памяти для высвобождения ресурсов для выполнения кода ниже. 

Приведите соответствующие код, комментарии, например:
- код для удаление более ненужных переменных,
- комментарий, что следует перезапустить kernel, выполнить такие-то начальные секции и продолжить с этапа 3.

In [8]:
#del items, clickstream
del catalog_names
del list1, list2, array1, array2
del top10_tracks, di, out, out1
del count
del sub

NameError: name 'list1' is not defined

# === ЭТАП 3 ===

# Загрузка данных

Если необходимо, то загружаем items.parquet, events.parquet.

In [None]:
#items = pd.read_parquet("data/items.parquet")
#events = pd.read_parquet("data/events.parquet")

# Разбиение данных

Разбиваем данные на тренировочную, тестовую выборки.

In [9]:
# В качестве точки разбиения используйте 16 декабря 2022 года
# зададим точку разбиения
train_test_global_time_split_date = pd.to_datetime("2022-12-16").date()

train_test_global_time_split_idx = clickstream["started_at"] < train_test_global_time_split_date
events_train = clickstream[train_test_global_time_split_idx]
events_test = clickstream[~train_test_global_time_split_idx]

# количество пользователей в train и test
users_train = events_train["user_id"].drop_duplicates()
users_test = events_test["user_id"].drop_duplicates()
# количество пользователей, которые есть и в train, и в test
common_users = list(set(users_train.values).intersection(users_test.values))
print(len(users_train), len(users_test), len(common_users))

1342566 783525 752870


In [10]:
clickstream["started_at"].min(), clickstream["started_at"].max()

(Timestamp('2022-01-01 00:00:00'), Timestamp('2022-12-31 00:00:00'))

In [11]:
del clickstream

# Топ популярных

Рассчитаем рекомендации как топ популярных.

Так как у нас очень большие наборы данных, и да и киленты не самые одинаковые, то есть смысл разделить клиенты на кластеры, и для каждого кластера вывбрать топ популярных.

In [12]:
def get_popular_tracks(user_ids, clickstream, items):
    """
    Возвращает самые популярные треки для заданных пользователей.

    Аргументы:
    user_ids (list): Список идентификаторов пользователей.
    clickstream (DataFrame): DataFrame с информацией о прослушивании треков.
    items (DataFrame): DataFrame с информацией о треках.

    Возвращает:
    DataFrame: DataFrame с информацией о самых популярных треках для заданных пользователей.
    """
    # Фильтруем DataFrame clickstream по заданным идентификаторам пользователей
    clickstream_filtered = clickstream[clickstream['user_id'].isin(user_ids)]

    # Группируем данные по жанрам и подсчитываем количество прослушиваний
    genre_counts = pd.merge(clickstream_filtered, items[['track_id','genres']], how='left', on='track_id')
    genre_counts = genre_counts.groupby('genres')['track_seq'].sum().nlargest(10)
    # Получаем самые популярные жанры
    most_popular_genres = genre_counts.index.tolist()
    

    # Фильтруем DataFrame items по самым популярным жанрам
    items_filtered = items[items['genres'].isin(most_popular_genres)]['track_id']

    # Группируем данные по трекам и подсчитываем количество прослушиваний
    track_counts = clickstream_filtered[clickstream_filtered['track_id'].isin(items_filtered)]

    track_counts = track_counts.groupby('track_id')['track_seq'].sum()

    # Получаем самые популярные треки
    most_popular_tracks = track_counts.index.tolist()

    # Возвращаем DataFrame с информацией о самых популярных треках
    return items[items['track_id'].isin(most_popular_tracks)].head(10)

In [13]:
def get_most_common_genre(user_id, df):
    """
    Возвращает наиболее часто упоминаемый 'genre' для заданного 'user_id'.

    Аргументы:
    user_id (int): Идентификатор пользователя.
    df (DataFrame): DataFrame с информацией о жанрах.

    Возвращает:
    str: Наиболее часто упоминаемый 'genre' для заданного 'user_id'.
    """
    # Фильтруем DataFrame по заданному 'user_id'
    df_filtered = df[df['user_id'] == user_id]

    # Находим наиболее часто упоминаемый 'genre'
    most_common_genre = mode(df_filtered['genre'])

    return most_common_genre

In [18]:
train_genres = pd.merge(events_train, items[['track_id','genres']], how='left', on='track_id')
train_genres = train_genres[train_genres['user_id'].isin(common_users)].head(300)


: 

In [17]:
favouroite_genres = train_genres.groupby('user_id')['genres'].apply(lambda x: mode(x))
favouroite_genres

user_id
3    ['pop' 'ruspop']
Name: genres, dtype: object

In [32]:
train_genres = pd.merge(events_train, items[['track_id','genres']], how='left', on='track_id')
train_genres = train_genres[train_genres['user_id'].isin(common_users)]
# Создаем потоки для нахождения наиболее часто упоминаемого 'genre' для каждого 'user_id'
threads = []
for user_id in common_users[:10]:
    thread = threading.Thread(target=get_most_common_genre, args=(user_id, train_genres))
    threads.append(thread)
    thread.start()

# Ждем завершения всех потоков
for thread in threads:
    thread.join()

print("Most common genres found")
thread, threads

: 

In [None]:
train_genres['favouroite_genre'] = threads

In [None]:
train_genres = pd.merge(events_train, items[['track_id','genres']], how='left', on='track_id')

In [None]:
train_genres.groupby('user_id')['genres'].apply(lambda x: statistics.mode(x))

In [None]:
victims = list(events_test['user_id'].unique())

In [None]:
#get_popular_tracks(victims, events_train, items)

In [None]:
from sklearn.preprocessing import MinMaxScaler


item_popularity = events_train \
    .groupby(["track_id"]).agg(users=("user_id", "nunique"), seqs=("track_seq", "sum")).reset_index()

# нормализация пользователей и среднего рейтинга, требуется для их приведения к одному масштабу
scaler = MinMaxScaler()
item_popularity[["users_norm", "seqs_norm"]] = scaler.fit_transform(
    item_popularity[["users", "seqs"]]
)

# вычисляем popularity_score, как скор популярности со штрафом за низкий рейтинг
item_popularity["popularity_score"] = (
    item_popularity["users_norm"] * item_popularity["seqs_norm"]
)

# сортируем по убыванию popularity_score
item_popularity = item_popularity.sort_values("popularity_score", ascending=False)
top_k_pop_items = item_popularity.head(100)["item_id"].tolist()

# Персональные

Рассчитаем персональные рекомендации.

# Похожие

Рассчитаем похожие, они позже пригодятся для онлайн-рекомендаций.

# Построение признаков

Построим три признака, можно больше, для ранжирующей модели.

# Ранжирование рекомендаций

Построим ранжирующую модель, чтобы сделать рекомендации более точными. Отранжируем рекомендации.

# Оценка качества

Проверим оценку качества трёх типов рекомендаций: 

- топ популярных,
- персональных, полученных при помощи ALS,
- итоговых
  
по четырем метрикам: recall, precision, coverage, novelty.

# === Выводы, метрики ===

Основные выводы при работе над расчётом рекомендаций, рассчитанные метрики.