# ДЗ

In [1]:
from pyspark.sql import SparkSession

# создаём сессию Spark
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .getOrCreate()
)

In [3]:
# Загружаем предрасчитанную матрицу
from scipy.sparse import load_npz
user_item_matrix = load_npz("/data/other/user_item_lastfm.npz")

In [5]:
import numpy as np

# делим разреженную матрицу на обучающую и тестовую
total_len = user_item_matrix.data.size
train_len = int(total_len * 0.8)
all_indices = np.arange(total_len) ## Создаем массив длинной непустых значений
np.random.seed(42) 
train_indices = np.random.choice(all_indices, train_len, replace=False) ##случайно выбираем номера непустых значений для трейна
train_mask = np.in1d(all_indices, train_indices)  ## получаем массив из длинной непустых значений и маркеруем его тру фолс, если он присутствует в трейне

In [6]:
from scipy.sparse import coo_matrix

def get_masked(arr, mask):
    return coo_matrix(
        (
            [np.float32(item) for item in arr.data[mask]],
            (arr.row[mask], arr.col[mask])
        ),
        arr.shape
    )

In [143]:
# Делим на train и test
train_csr = get_masked(user_item_matrix, train_mask).tocsr()
train = train_csr.T
test_coo = get_masked(user_item_matrix, ~train_mask)
test_csr = test_coo.tocsr()
test = test_csr.tocsr()

# пробуем несколько моделей из Implicit. В итоге не одна из них не дала точность больше 0.1

In [9]:
# Пробуем ALS
from implicit.als import AlternatingLeastSquares
import os

# автор пакета утверждает, что так быстрее
os.environ["OPENBLAS_NUM_THREADS"] = "1"
# обучаемся на тех же параметрах, что и в Spark
als = AlternatingLeastSquares(
    factors=10,
    iterations=10,
    regularization=0.1
)

In [89]:
from implicit.nearest_neighbours  import BM25Recommender

bm25 = BM25Recommender(
     K1 =  100
     
)

In [137]:
from implicit.bpr import BayesianPersonalizedRanking
bpr= BayesianPersonalizedRanking(
    learning_rate = 0.1,
    regularization = 0.01,
    factors =40
)

In [10]:
%%time

als.fit(train)

CPU times: user 3.07 s, sys: 10 ms, total: 3.08 s
Wall time: 3.17 s


In [90]:
bm25.fit(train)

In [138]:
bpr.fit(train)

In [11]:
import pickle

pickle_filename = "/data/other/implicit_top50.pkl"
users = set(test_coo.row)

In [139]:
%%time

def get_recs(users, model):
    return {
        user: model.recommend(userid=user, user_items=train_csr, N=50)
        for user in users
    }

# посчитаем по 50 рекомендаций для каждого пользователя из тестовой выборки
recs = get_recs(users, bpr)
# сохраним предрасчёт рекомендаций
with open(pickle_filename, "wb") as f:
    pickle.dump(recs, f)

CPU times: user 1min 50s, sys: 270 ms, total: 1min 51s
Wall time: 1min 52s


In [140]:
# загрузим сохранённый предрасчёт
with open(pickle_filename, "rb") as f:
    recs = pickle.load(f)

In [141]:
## создаем Spark объект полученных рекомендаций для функции precision
import pandas as pd

## удаляем из массива score оставляя только items
recs_mod1 = {}
i = 1
for user in recs:
    items_list = []
    for items in recs[user]:
        items_list.append(items[0])
    recs_mod1[user] = items_list
    
recs_df =  pd.DataFrame.from_dict(recs_mod1,orient='index') # создаем DF из словаря
recs_df ['prediction'] =  recs_df.values.tolist() ### объеденямем все колонки в один список
recs_df_list = recs_df['prediction'].reset_index().rename(index=str, columns={"index": "user_id"}) ## получаем нужный формат
##recs_df_list.head()
predictions = spark.createDataFrame(recs_df_list)
predictions.show()

+-------+--------------------+
|user_id|          prediction|
+-------+--------------------+
|  85266|[137312, 153191, ...|
|  85267|[145362, 71187, 1...|
|  85268|[159390, 31532, 8...|
|  85269|[8603, 4612, 5991...|
|  85270|[35301, 94672, 11...|
|  85271|[154626, 20798, 4...|
|  85272|[7092, 74405, 119...|
|  85273|[57960, 56580, 74...|
|  85274|[20530, 74296, 41...|
|  85275|[128208, 80495, 2...|
|  85276|[29680, 99476, 28...|
|  85277|[78755, 57626, 11...|
|  85278|[5875, 26983, 245...|
|  85279|[75376, 154756, 1...|
|  85280|[144597, 49054, 1...|
|  85281|[132887, 109676, ...|
|  85282|[134385, 61576, 6...|
|  85283|[107658, 22858, 6...|
|  85284|[65504, 147624, 1...|
|  85285|[69745, 100622, 6...|
+-------+--------------------+
only showing top 20 rows



In [76]:
## создаем набор объект spark test для дальнейшего искользования в функции precision
test_df =  pd.DataFrame({'user_id': test_coo.row, 'item': test_coo.col, 'data': test_coo.data}
                 )[['user_id', 'item', 'data']].sort_values(['user_id', 'item']
                 ).reset_index(drop=True)
test_spark = spark.createDataFrame(test_df)
test_spark.show()

+-------+------+----------+
|user_id|  item|      data|
+-------+------+----------+
|  84406|    64|    4.5625|
|  84406|  8025|5.41796875|
|  84406| 30174| 5.0234375|
|  84406| 31403|4.75390625|
|  84406| 51209|4.31640625|
|  84406| 57969|   4.21875|
|  84406| 79365| 4.5234375|
|  84406| 87370|   4.21875|
|  84406| 89401|   4.34375|
|  84406|109744|4.27734375|
|  84406|112328|  4.734375|
|  84406|116497|5.31640625|
|  84406|126203| 4.8046875|
|  84406|129610|5.18359375|
|  84407| 42269|3.46484375|
|  84407| 49575|3.63671875|
|  84407| 50461|4.14453125|
|  84407| 55744| 4.4765625|
|  84408|  4759|3.43359375|
|  84408|  9596|3.43359375|
+-------+------+----------+
only showing top 20 rows



In [83]:
import pyspark.sql.functions as sql_func
from pyspark.mllib.evaluation import RankingMetrics

# используем стандартную Spark функцию для оценки точности
def get_precision(recs, k):
    
    
    predictions_and_labels = (
        test_spark
        .groupBy("user_id")
        .agg(sql_func.collect_list("item").alias("label"))
        .join(predictions, "user_id")
        .select("prediction", "label")
        .rdd
    )
    return RankingMetrics(predictions_and_labels).precisionAt(k)

In [142]:
# получаем точность. Это Максимум, что получилось, для модели BayesianPersonalizedRanking
get_precision(predictions, 1)

0.07537493952588298