In [1]:
import os
import sys

os.environ["PYSPARK_SUBMIT_ARGS"] = '--num-executors 4 pyspark-shell'
os.environ["PYSPARK_PYTHON"] = '/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"] = '/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Import data

In [3]:
!hadoop fs -head /labs/lab05data/ml-100k/u.data

196	242	3	881250949
186	302	3	891717742
22	377	1	878887116
244	51	2	880606923
166	346	1	886397596
298	474	4	884182806
115	265	2	881171488
253	465	5	891628467
305	451	3	886324817
6	86	3	883603013
cat: Unable to write to output stream.


In [6]:
ratings_schema = StructType([
    StructField("user_id", IntegerType()),
    StructField("item_id", IntegerType()),
    StructField("rating", IntegerType()),
    #StructField("timestamp", IntegerType()),
])

ratings = (
    spark.read
    .option("sep", "\t")
    .schema(ratings_schema)
    .format("csv")
    .load("/labs/lab05data/ml-100k/u.data")
)

In [7]:
ratings.show(5)

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|    196|    242|     3|
|    186|    302|     3|
|     22|    377|     1|
|    244|     51|     2|
|    166|    346|     1|
+-------+-------+------+
only showing top 5 rows



In [9]:
movies_schema = StructType([
    StructField("movie_id", IntegerType()),
    StructField("title", StringType()),
    StructField("release_date", StringType()),
])

movies = (
    spark.read
    .option("sep", "|")
    .schema(movies_schema)
    .format("csv")
    .load("/labs/lab05data/ml-100k/u.item")
)

In [10]:
movies.show(5)

+--------+-----------------+------------+
|movie_id|            title|release_date|
+--------+-----------------+------------+
|       1| Toy Story (1995)| 01-Jan-1995|
|       2| GoldenEye (1995)| 01-Jan-1995|
|       3|Four Rooms (1995)| 01-Jan-1995|
|       4|Get Shorty (1995)| 01-Jan-1995|
|       5|   Copycat (1995)| 01-Jan-1995|
+--------+-----------------+------------+
only showing top 5 rows



# Part 1. Dataset analysis

Найдите количество всех пользователей и количество всех фильмов в данных (общее число пользователей и фильмов в датасете).

In [9]:
total_users = ratings.select(f.col("user_id")).distinct().count()

total_users

943

In [10]:
total_movies = movies.select(f.col("movie_id")).distinct().count()

total_movies

1682

Сколько пользователь в среднем ставит рейтингов? Подсчитать `количество рейтингов` / `количество пользователей`. Поле average_user_ratings.

In [12]:
total_ratings = ratings.count()

total_ratings

100000

In [13]:
average_user_ratings = total_ratings / total_users

average_user_ratings

106.04453870625663

Сколько фильм в среднем имеет рейтингов? Подсчитать `количество рейтингов` / `количество фильмов`. Поле average_film_ratings.

In [14]:
average_film_ratings = total_ratings / total_movies

average_film_ratings

59.45303210463734

Найдите процент заполненных ячеек в данных: `количество рейтингов` / (`количество пользователей` * `количество фильмов`). Поле completeness.

In [15]:
completeness = total_ratings / total_movies / total_users

completeness

0.06304669364224533

# Part 2. User-user CF

Для каждого пользователя найдите его средний рейтинг (`сумма рейтингов пользователя` / `количество рейтингов пользователя`). Здесь `Ia` — множество фильмов, по которым у пользователя есть рейтинги `rui`. Здесь и далее `|Ia|` обозначает количество элементов в множестве `Ia`.

In [27]:
users_avg = (
    ratings
    .groupBy("user_id")
    .agg(
        f.count("rating").alias("cnt_ratings"), 
        f.sum("rating").alias("sum_ratings"),
        #f.mean("rating").alias("avg_rating_2"),
        #(f.sum("rating") / f.count("rating")).alias("avg_rating3"), 
        
    )
    .withColumn("avg_rating_user", f.col("sum_ratings") / f.col("cnt_ratings"))
    .select("user_id", "avg_rating_user")
         
)

users_avg.show(5)

+-------+------------------+
|user_id|   avg_rating_user|
+-------+------------------+
|    148|               4.0|
|    463|2.8646616541353382|
|    471|3.3870967741935485|
|    496|3.0310077519379846|
|    833| 3.056179775280899|
+-------+------------------+
only showing top 5 rows



### My user

In [14]:
user_id = 804

Посчитайте меру близости Пирсона выданного вам пользователя со всеми остальными пользователями. Обратите внимание, что корреляция Пирсона считается только на пересечении, то есть вклад вносят только фильмы, оцененные совместно (Ia, Iu — множества оцененных пользователями a и u фильмов). Корреляция с константой (ситуация, когда у пользователя все оценки одинаковые) равна нулю.

Формат: `ID пользователя`; `корреляция Пирсона`.

In [15]:
my_user_data = ratings.filter(f.col("user_id") == user_id)

my_user_data.show(5)

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|    804|     11|     4|
|    804|    546|     3|
|    804|     31|     4|
|    804|    204|     4|
|    804|     82|     5|
+-------+-------+------+
only showing top 5 rows



In [16]:
users_avg.filter(f.col("user_id") == user_id).collect()

[Row(user_id=804, cnt_ratings=332, sum_ratings=1217, avg_rating_2=3.6656626506024095, avg_rating3=3.6656626506024095, avg_rating_user=3.6656626506024095)]

In [21]:
my_user_average_rating = users_avg.filter(f.col("user_id") == user_id).collect()[0]["avg_rating_user"]

my_user_average_rating

3.6656626506024095

In [22]:
corr_matrix = (
    ratings.alias("r")
    .join(my_user_data.alias("u"), on="item_id", how="inner")
    .join(users_avg.alias("a"), on="user_id", how="left")
    .select(
        f.col("r.user_id"),
        f.col("r.item_id"),
        f.col("r.rating"),
        f.col("u.user_id").alias("my_user_id"),
        f.col("u.rating").alias("my_rating"),
        f.col("a.avg_rating_user")
    )
)

corr_matrix.show(5)

+-------+-------+------+----------+---------+------------------+
|user_id|item_id|rating|my_user_id|my_rating|   avg_rating_user|
+-------+-------+------+----------+---------+------------------+
|    298|    474|     4|       804|        4| 4.031496062992126|
|    115|    265|     2|       804|        4|3.9347826086956523|
|    305|    451|     3|       804|        2|  3.40990990990991|
|     62|    257|     2|       804|        5|3.3017241379310347|
|    200|    222|     5|       804|        5| 4.032407407407407|
+-------+-------+------+----------+---------+------------------+
only showing top 5 rows



Посчитайте поправочный коэффициент для корреляции Пирсона на нехватку данных

In [23]:
all_neighbours = (
    corr_matrix
    .filter(f.col("user_id") != user_id)
    .withColumn("pearson_a", (f.col("rating") - f.col("avg_rating_user")))
    .withColumn("pearson_u", (f.col("my_rating") - my_user_average_rating))
    .groupBy("user_id")
    .agg(
        (
            f.sum(f.col("pearson_a") * f.col("pearson_u")) /
            f.sum(f.col("pearson_a") ** 2) ** 0.5 / 
            f.sum(f.col("pearson_u") ** 2) ** 0.5
        ).alias("pearson_corr"),      
        (f.count("item_id") / 50).alias("pearson_coef")
    )
    .withColumn("pearson_coef", f.when(f.col("pearson_coef") < 1, f.col("pearson_coef")).otherwise(1))
    .withColumn("pearson_sua", f.col("pearson_coef") * f.col("pearson_corr"))
)

all_neighbours.show(5)

+-------+-------------------+------------+--------------------+
|user_id|       pearson_corr|pearson_coef|         pearson_sua|
+-------+-------------------+------------+--------------------+
|    148|0.03400975940401584|        0.88|0.029928588275533942|
|    471|0.08072405395165885|         0.3|0.024217216185497655|
|    463| 0.3346271675106021|        0.64| 0.21416138720678535|
|    496|0.17869619437114675|         1.0| 0.17869619437114675|
|    833|0.15207442669742285|         1.0| 0.15207442669742285|
+-------+-------------------+------------+--------------------+
only showing top 5 rows



Найдите 30 ближайших пользователей-соседей данного пользователя (pearson_neighbours), используя поправленную корреляцию Пирсона

In [24]:
pearson_neighbours = (
    all_neighbours
    .orderBy(f.col("pearson_sua"), ascending=False)
    .limit(30)
    .select("user_id", "pearson_sua")
)

pearson_neighbours.show(5)

+-------+------------------+
|user_id|       pearson_sua|
+-------+------------------+
|     22|0.4865992050587391|
|     57|0.4730087223914742|
|    676|0.4716426914553552|
|    851|0.4698629665905538|
|     87|0.4670373480164366|
+-------+------------------+
only showing top 5 rows



Для всех фильмов найдите прогноз оценки

Здесь `N(a)` — множество пользователей-соседей, `s(a,u)` — мера близости пользователей из предыдущих пунктов, `|s(a,u)|` — модуль меры близости.

In [29]:
predicted_ratings = (
    ratings
    .join(pearson_neighbours, "user_id", "inner")
    .join(users_avg, "user_id", "left")
    .groupBy("item_id")
    .agg(
        (
            f.sum(f.col("pearson_sua") * (f.col("rating") - f.col("avg_rating_user"))) / 
            f.sum(f.abs(f.col("pearson_sua")))
        ).alias("predicted_rating")
    )
    .withColumn("predicted_rating", my_user_average_rating + f.col("predicted_rating"))
    
    .join(my_user_data, "item_id", "left")
    .filter(f.col("user_id").isNull())
)

predicted_ratings.show(5)

+-------+------------------+-------+------+
|item_id|  predicted_rating|user_id|rating|
+-------+------------------+-------+------+
|    148| 3.537618538552194|   null|  null|
|    463| 3.779659821930994|   null|  null|
|    471|3.9096954823179924|   null|  null|
|    833|3.3859082444452375|   null|  null|
|   1088| 2.628395569857068|   null|  null|
+-------+------------------+-------+------+
only showing top 5 rows



In [53]:
ratings.join(pearson_neighbours, ["user_id", "item_id"], "inner").show(5)

+-------+-------+------+-------------------+
|user_id|item_id|rating|        pearson_sua|
+-------+-------+------+-------------------+
|    186|    302|     3|0.38564109870497687|
|     22|    377|     1| 0.4865992050587391|
|    276|    796|     1| 0.3971373433091707|
|    276|    564|     3| 0.3971373433091707|
|     99|      4|     5| 0.4234990888967132|
+-------+-------+------+-------------------+
only showing top 5 rows



In [58]:
(
    ratings
    .join(pearson_neighbours, f.col("user_id") == f.col("account_id"), "left")
    .select(
        f.col("user_id"),
        f.col("account_id"),
    )
    .show(5)
)

+-------+-------+
|user_id|user_id|
+-------+-------+
|    186|    186|
|     22|     22|
|    276|    276|
|    276|    276|
|     99|     99|
+-------+-------+
only showing top 5 rows



In [62]:
spark.stop()