In [ ]:
#!curl -LO http://files.grouplens.org/datasets/movielens/ml-100k.zip
#!unzip ml-100k.zip

http://files.grouplens.org/datasets/movielens/ml-100k-README.txt

In [ ]:
target_userid = 130

In [ ]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

In [ ]:
from operator import add
import operator as op
import json
import re
from datetime import datetime
from pyspark.sql import Row
from pyspark.sql import functions as F
import numpy as np
import math
from IPython.display import clear_output

In [ ]:
!head -n3 ml-100k/u.data

The full u data set, 100000 ratings by 943 users on 1682 items.
              Each user has rated at least 20 movies.  Users and items are
              numbered consecutively from 1.  The data is randomly
              ordered. This is a tab separated list of 
	         user id | item id | rating | timestamp. 
              The time stamps are unix seconds since 1/1/1970 UTC

Найдите количество всех пользователей и количество всех фильмов в данных (общее число пользователей и фильмов в датасете).

In [ ]:
#!hadoop fs -put ml-100k /user/valery.baranov

In [ ]:
rdd_ratings = sc.textFile("/user/valery.baranov/ml-100k/u.data")\
    .map(lambda line: [int(d) for d in line.split('\t')])\
    .map(lambda ds: Row(userid=ds[0], movieid=ds[1], rating=ds[2], ts=ds[3]))

In [ ]:
df_ratings = rdd_ratings.toDF()

In [ ]:
df_ratings.show(n=3)

In [ ]:
num_users = rdd_ratings.map(lambda row: row.userid).distinct().count()
num_users

In [ ]:
def get_userid_iter():
    return range(1, num_users + 1)

In [ ]:
num_movies = rdd_ratings.map(lambda row: row.movieid).distinct().count()
num_movies

Сколько пользователь в среднем ставит рейтингов? Подсчитать `количество рейтингов / количество пользователей`. 
   Поле `average_user_ratings`.

In [ ]:
num_ratings = rdd_ratings.count()
num_ratings

In [ ]:
average_user_ratings = num_ratings / num_users
average_user_ratings

Сколько фильм в среднем имеет рейтингов?  Подсчитать`количество рейтингов / количество фильмов`. 
   Поле `average_film_ratings`.

In [ ]:
average_film_ratings = num_ratings / num_movies
average_film_ratings

Найдите процент заполненных ячеек в данных: `количество рейтингов / (количество пользователей * количество фильмов)`. 
   Поле `completeness`.

In [ ]:
completeness = num_ratings / (num_users * num_movies)
completeness

#1 Для каждого пользователя найдите его средний рейтинг (сумма рейтингов пользователя/количество рейтингов пользователя). Здесь I<sub>a</sub> — множество фильмов, по которым у пользователя есть рейтинги r<sub>ui</sub>. Здесь и далее |I<sub>a</sub>| обозначает количество элементов в множестве I<sub>a</sub>.

<img width="100px" src="http://data.newprolab.com/public-newprolab-com/laba08_r_a_avg.png">

In [ ]:
df_avg_user_rating = df_ratings.groupby('userid').mean('rating')

In [ ]:
user_avg = df_avg_user_rating.rdd.map(lambda row: (row.userid, row['avg(rating)'])).collect()

In [ ]:
user_avg[:3]

In [ ]:
avg_ratings = np.array([v for k,v in sorted(user_avg)])
avg_ratings[:10]

#2 Посчитайте меру близости Пирсона выданного вам пользователя со всеми остальными пользователями. Обратите внимание, что корреляция Пирсона считается только на пересечении, то есть вклад вносят только фильмы, оцененные совместно (I<sub>a</sub>, I<sub>u</sub> — множества оцененных пользователями `a` и `u` фильмов). Корреляция с константой (ситуация, когда у пользователя все оценки одинаковые) равна нулю.

Формат: `ID пользователя; корреляция Пирсона`.

<img width="350px" src="http://data.newprolab.com/public-newprolab-com/laba08_pearson.png">

In [ ]:
df_pivot_ratings = df_ratings.groupBy("movieid").pivot("userid", range(1, num_users + 1)).sum("rating")\
    .coalesce(1).cache()

In [ ]:
df_pivot_rows = df_ratings.groupBy("userid").pivot("movieid", range(1, num_movies + 1)).sum("rating").cache()

In [ ]:
target_row = df_pivot_rows.rdd.filter(lambda row: row.userid == target_userid).collect()[0]
target_values =[v for k,v in sorted(target_row.asDict().items()) if k != 'userid']

In [ ]:
len(target_values)

In [ ]:
target_values[:10]

In [ ]:
br_target_values = sc.broadcast(target_values)
br_avg_ratings = sc.broadcast(avg_ratings)

In [ ]:
len(br_avg_ratings.value)

In [ ]:
def pearson_byrows(row):
    row_ = row.asDict()
    userid = int(row_.pop('userid'))
    uid = userid - 1
    tid = target_userid - 1
    user_values =[v for k,v in sorted(row_.items())]
    ratings = [(user_values[i], br_target_values.value[i]) for i in range(len(br_target_values.value))\
               if user_values[i] is not None and br_target_values.value[i] is not None]
    
    data = [((v1 - br_avg_ratings.value[uid]),(v2 - br_avg_ratings.value[tid])) for v1, v2 in ratings]
        
    c = sum([v1*v2 for v1,v2 in data])
    z1 = sum([v1 ** 2 for v1,v2 in data])
    z2 = sum([v2 ** 2 for v1,v2 in data])
    if c == 0:
        return 0
    return (userid, c / (math.sqrt(z1) * math.sqrt(z2)))

In [ ]:
row1 = df_pivot_rows.rdd.take(1)[0]

In [ ]:
pearson_byrows(row1)

In [ ]:
df_pivot_rows.rdd.map(pearson_byrows).take(3)

In [ ]:
pearson_t = df_pivot_rows.rdd.map(pearson_byrows).collect()

In [ ]:
pearson = [v for k,v in sorted(pearson_t)]

In [ ]:
pearson[0]

In [ ]:
df_pivot_ratings.corr(str(1), str(target_userid), method='pearson')

In [ ]:
len(pearson)

#3. Посчитайте поправочный коэффициент для корреляции Пирсона на нехватку данных:

<img width="150px" src="http://data.newprolab.com/public-newprolab-com/laba08_reg_coef.png">

In [ ]:
target_user_movies = set(df_ratings\
    .filter(df_ratings.userid==target_userid)\
    .select('movieid')\
    .rdd.map(lambda row: row.movieid)\
    .collect())

In [ ]:
len(target_user_movies)

In [ ]:
coef = df_ratings.groupby('userid')\
    .agg(F.collect_list(df_ratings.movieid).alias('movieids'))\
    .rdd.map(lambda row: (row.userid, min(1, len(set(row.movieids) & target_user_movies) / 50)))\
    .collect()

In [ ]:
coef = sorted(coef)
coef[:3]

#4. Найдите 30 ближайших пользователей-соседей данного пользователя (`pearson_neighbours`), используя поправленную корреляцию Пирсона: 

<img width="500px" src="http://data.newprolab.com/public-newprolab-com/laba08_pearson_reg_coef.png">

In [ ]:
corr_pearson = [coef[i][1] * pearson[i] for i in range(len(pearson))]
corr_pearson[:3]

In [ ]:
top30 =[(k,v) for k,v in sorted(zip(corr_pearson, get_userid_iter()), reverse=True)[:31] if v != target_userid]

In [ ]:
top30userids = {v for k,v in top30}

In [ ]:
' '.join([str(x) for x in top30userids])

#5. Для всех фильмов найдите прогноз оценки по формуле ниже. Здесь *N(a)* — множество пользователей-соседей, *s(a,u)* — мера близости пользователей из предыдущих пунктов, *|s(a,u)|* — модуль меры близости. 

<img width="250px" src="http://data.newprolab.com/public-newprolab-com/laba08_user_user_cf.png">


In [ ]:
target_user_avg_rating = avg_ratings[target_userid - 1]
target_user_avg_rating

In [ ]:
# отбираем только рейтинги для топ 30, по которым полное совпадение с чекером
df_pivot_top = df_ratings.rdd.filter(lambda row: row.userid in top30userids)\
    .toDF().groupBy("movieid").pivot("userid", list(top30userids)).sum("rating")\
    .cache()

In [ ]:
def predict_rating_row(row):
    row_ = row.asDict()
    movieid = int(row_.pop('movieid'))
    ratings = [(int(k),v) for k,v in row_.items() if v is not None]
    if len(ratings) == 0:
        return target_user_avg_rating
    c = 0
    z = 0
    for userid, rating in ratings:
        uid = userid - 1 # так как нумерация пользователей и фильмов с 1
        sau = corr_pearson[uid]
        rui = rating
        ru = avg_ratings[uid]
        c += sau * (rui - ru)
        z += abs(sau)
    return (target_user_avg_rating + c / z, movieid)

In [ ]:
row = df_pivot_top.rdd.take(1)[0]

In [ ]:
row

In [ ]:
predict_rating_row(row)

In [ ]:
predicted_ratings = df_pivot_top.rdd.map(predict_rating_row).collect()

In [ ]:
predicted_ratings[:3]

In [ ]:
import pandas as pd

In [ ]:
df_predicted = pd.DataFrame(predicted_ratings, columns=['prating', 'movieid'])

In [ ]:
df_predicted.sort_values(['prating', 'movieid'], ascending=[False, True], inplace=True)

In [ ]:
df_predicted.head(n=10)

In [ ]:
pearson_top10 = [int(v) for v in df_predicted['movieid'] if v not in target_user_movies][:10]
sorted(pearson_top10)

In [ ]:
lab8result = {
    "average_film_ratings": average_film_ratings,
    "average_user_ratings": average_user_ratings,
    "completeness": completeness,
    "pearson_neighbours": sorted(top30userids),
    "pearson_top10": sorted(pearson_top10)
}

In [ ]:
with open('lab08.json', 'w') as fw:
    fw.write(json.dumps(lab8result))

In [ ]:
!head lab08.json

### **Часть 3. Базовые предикторы:**

#### 1. Глобальное среднее 𝞵 (`average_rating`) по всему датасету. `Сумма всех оценок по всем фильмам / Количество всех оценок по всем фильмам.`

In [ ]:
sum_all_ratings = df_ratings.select('rating')\
    .agg(F.sum(df_ratings.rating).alias('sum')).rdd.map(lambda row: row.sum).collect()[0]
sum_all_ratings

In [ ]:
num_all_ratings = df_ratings.count()
num_all_ratings

In [ ]:
average_rating = sum_all_ratings / num_all_ratings
average_rating

#### 2. Базовый предиктор для каждого пользователя (суммирование по фильмам, оцененным данным пользователем). Здесь I<sub>a</sub> — множество фильмов, по которым у пользователя есть рейтинги, а |I<sub>a</sub>| — их количество.

<img width="200px" src="http://data.newprolab.com/public-newprolab-com/laba08s_base_u.png">


In [ ]:
def base_predict_user(row):
    row_ = row.asDict()
    userid = row_.pop('userid')
    values = [v for v in row_.values() if v is not None]
    return (userid, 1 / (len(values) + 10) * sum([rui - average_rating for rui in values]))

In [ ]:
bp_users = df_pivot_rows.rdd.map(base_predict_user).collect()

In [ ]:
bp_users = sorted(bp_users)
bp_users[:3]

#### 3. Базовый предиктор для каждого фильма (суммирование по пользователям, поставившим оценку данному фильму). Здесь U<sub>i</sub> — множество пользователей, которые оценили данный фильм, а |U<sub>i</sub>| — их количество.

<img width="250px" src="http://data.newprolab.com/public-newprolab-com/laba08s_base_i.png">


In [ ]:
df_pivot_movies = df_ratings.groupBy("movieid").pivot("userid", range(1, num_users + 1)).sum("rating").cache()

In [ ]:
def base_predict_movie(row):
    row_ = row.asDict()
    movieid = row_.pop('movieid')
    ratings = [(int(k), v) for k,v in row_.items() if v is not None]
    return (movieid,\
            1 / (len(ratings) + 25) * sum([rui - bp_users[userid - 1][1] - average_rating for userid, rui in ratings]))

In [ ]:
r = df_pivot_movies.take(1)[0]

In [ ]:
base_predict_movie(r)

In [ ]:
bp_movies = df_pivot_movies.rdd.map(base_predict_movie).collect()

In [ ]:
bp_movies = sorted(bp_movies)
bp_movies[:3]

#### 4. Базовый предиктор для каждого пользователя и каждого фильма:

<img width="140px" src="http://data.newprolab.com/public-newprolab-com/laba08s_base_ui.png">


In [ ]:
bp_users_movies = np.matrix([[average_rating + bu + bi for km, bu in bp_movies] for ku, bi in bp_users])

In [ ]:
bp_users_movies.shape

In [ ]:
bp_users_movies[:5,:5]

### **Часть 4. Item-item CF:**

#### 1. Вычесть из всех рейтингов r<sub>ui</sub> базовый предиктор b<sub>ui</sub> из пункта 4, часть 3 (для всей таблицы рейтингов). Если рейтинга нет, то можно поставить 0.


In [ ]:
rui = np.matrix(
    [v for k,v in sorted(df_pivot_rows.rdd.map(lambda row: (row.userid, 
        [v for k,v in sorted([(int(k), v) for k,v in row.asDict().items() if k != 'userid'])])
    ).collect())])

In [ ]:
rui.shape

In [ ]:
blanks = rui == None

In [ ]:
blanks

In [ ]:
rui[rui == None] = 0

In [ ]:
rui[:5,:5]

In [ ]:
rui[1,1] == 0

In [ ]:
cf = (rui - bp_users_movies).transpose()

In [ ]:
cf[rui.transpose() == 0] = 0

In [ ]:
cf.shape

In [ ]:
cf[:5,:5]

In [ ]:
def get_csr():
    data = []
    indices = []
    indptr = [0]
    for i in range(rui_.shape[0]):
        ptr = 0
        for j in range(rui_.shape[1]):
            if rui_[i,j] == 0:
                continue
            data.append(cf_[i,j])
            indices.append(j)
            ptr += 1
        indptr.append(ptr)

In [ ]:
from scipy.sparse import csr_matrix

In [ ]:
csr = csr_matrix(cf, dtype='float')

#### 2. Найдите попарные меры близости (косинус) для всех фильмов, используя очищенные оценки из пункта 1, часть 4. Суммирование идет по всем пользователям.

<img width="350px" src="http://data.newprolab.com/public-newprolab-com/laba08s_cosine_items.png">

In [ ]:
def custom_cossim(m1, m2):
    c = cf[:,[m1,m2]].prod(axis=1).sum()
    z1 = math.sqrt(cf[:,[m1,m1]].prod(axis=1).sum())
    z2 = math.sqrt(cf[:,[m2,m2]].prod(axis=1).sum())
    return c / (z1 * z2)

In [ ]:
from sklearn.metrics.pairwise import cosine_similarity

In [ ]:
cossim = cosine_similarity(csr, csr)

In [ ]:
cossim.shape

In [ ]:
df_cos = pd.DataFrame(cossim)

In [ ]:
df_cos

#### 3. Для каждого фильма, по которому у данного пользователя не стоит рейтинг, найдите:

##### * [a] 30 ближайших фильмов-соседей для этого фильма (среди всех фильмов, а не фильмов, оценённых пользователем).

In [ ]:
target_movies = set(range(1, num_movies + 1)) - target_user_movies 

In [ ]:
target_movies = list(target_movies)

In [ ]:
len(target_user_movies )

In [ ]:
len(target_movies)

In [ ]:
def get_movie_recs(movieid):
    mid = movieid - 1
    recs = [v for k,v in sorted(zip(cossim[mid,:], range(1, num_movies + 1)), reverse=True)[:31] if v != movieid]
    return recs

In [ ]:
sorted(zip(cossim[0,:], range(1, num_movies + 1)), reverse=True)[:5]

In [ ]:
len(cossim[1,:])

In [ ]:
' '.join([str(x) for x in get_movie_recs(5)])

In [ ]:
target_movies_rec = []
for movieid in target_movies:
    target_movies_rec.append(get_movie_recs(movieid))

In [ ]:
[' '.join([str(m) for m in recs]) for recs in target_movies_rec[:3]]

##### * [b] прогноз оценки пользователя по формуле (базовый предиктор из пункта 4, часть 3).  Здесь *S(i)*- множество фильмов-соседей для фильма *i*, по которым у данного пользователя есть оценка.
<img width="300px" src="http://data.newprolab.com/public-newprolab-com/laba08s_item_item_cf.png">

     Заметим, что суммирование идет только по тем фильмам-соседям, которые оценил пользователь.

In [ ]:
def calc_target_rating(movieid, filter_neg=False):
    mid = movieid - 1
    uid = target_userid - 1
    bui = bp_users_movies[uid, mid]
    rec_id = target_movies.index(movieid)
    movies_intersection = set(target_movies_rec[rec_id]) & target_user_movies
    if len(movies_intersection) == 0:
        return (bui, movieid)
    c = 0
    z = 0
    for movieid_temp in movies_intersection:
        tid = movieid_temp - 1
        sij = cossim[mid, tid]
        if filter_neg and sij < 0:
            continue
        ruj = rui[uid, tid]
        buj = bp_users_movies[uid, tid]
        c += sij * (ruj - buj)
        z += abs(sij)
    if c == 0 or z == 0:
        return (bui, movieid)
    return (bui + c / z, movieid)

In [ ]:
target_predicts = [calc_target_rating(movieid) for movieid in target_movies]

In [ ]:
target_predicts[:3]

#### 4. Рекомендуйте пользователю 10 фильмов (`predicators_top10`) с самыми высокими оценками из пункта 3, часть 4.

In [ ]:
top_movies = sorted(target_predicts, reverse=True)[:10]
top_movies

In [ ]:
predicators_top10 = [v for k,v in top_movies]
predicators_top10

#### 5. При подсчете прогноза по формуле из пункта 3, часть 4 отфильтруйте всех соседей с отрицательной близостью.

In [ ]:
target_predicts_pos = [calc_target_rating(movieid, True) for movieid in target_movies]

In [ ]:
target_predicts_pos[:3]

#### 6. Рекомендуйте пользователю 10 фильмов (`predicators_positive_top10`) с самыми высокими оценками из пункта 5, часть 4.

In [ ]:
top_pos = sorted(target_predicts_pos,reverse=True)[:10]
top_pos

In [ ]:
predicators_positive_top10 = [v for k,v in top_pos]
predicators_positive_top10

In [ ]:
lab8s_result = {
    "average_rating": average_rating,
    "predicators_positive_top10": predicators_positive_top10,
    "predicators_top10": predicators_top10
}

In [ ]:
with open('lab08s.json', 'w') as fw:
    fw.write(json.dumps(lab8s_result))

In [ ]:
!head lab08s.json