In [None]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--conf spark.sql.catalogImplementation=in-memory pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, desc, lit
from pyspark.sql.types import *
from math import sqrt import math
import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
import pandas as pd

In [None]:
!curl -LO http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip ml-100k.zip

In [None]:
u_data = u_data.selectExpr("_c0 as user_id", "_c1 as item_id", "_c2 as rating",  "_c3 as timestamp")

In [None]:
u_item = spark.read.csv('./u.item', sep='|')

In [None]:
u_item = u_item.selectExpr("_c0 as movie_id", "_c1 as movie_title")

In [None]:
average_rating = u_data.groupBy(u_data.user_id).agg(F.avg(u_data.rating).alias('avg_rating'))

In [None]:
average_rating_target_value = average_rating\
   .filter(average_rating['user_id'] == target_id)\
   .take(1)[0]['avg_rating']

In [None]:
target_user_ratings = u_data.filter(u_data.user_id == target_id).select(
    u_data.item_id,
    u_data.rating.alias('target_rating'))

In [None]:
target_items = [row.item_id for row in target_user_ratings.select(target_user_ratings.item_id).distinct().collect()]

In [None]:
u_data = u_data.select('user_id',
                        'item_id',
                        'rating',
                        F.when(u_data.item_id.isin(target_items), 1).otherwise(0).alias('is_in_target'))

In [None]:
u_data_common = u_data.filter(u_data.is_in_target == 1)

In [None]:
u_data_common = u_data_common.join(target_user_ratings,
                      on=u_data_common.item_id == target_user_ratings.item_id,
                      how='inner').select(u_data_common.user_id,
                                          u_data_common.item_id,
                                          u_data_common.rating,
                                          target_user_ratings.target_rating)

In [None]:
s = u_data_common.join(
    average_rating, on='user_id' 
).withColumn('avg_rating_target', lit(average_rating_target_value))

In [None]:
covariance = F.sum( (s['target_rating'] - s['avg_rating_target']) * (s['rating'] - s['avg_rating']) )

In [None]:
sigma_target = F.sqrt( F.sum( (s['target_rating'] - s['avg_rating_target']) * (s['target_rating'] - s['avg_rating_target']) ) )

In [None]:
sigma_user = F.sqrt( F.sum( (s['rating'] - s['avg_rating']) * (s['rating'] - s['avg_rating']) ) )

In [None]:
pearson_df = s.groupBy('user_id').agg(
     (covariance / sigma_target / sigma_user).alias('corr_with_target')
)

In [None]:
intersection_power = u_data.groupby('user_id').agg(F.sum('is_in_target').alias('intersection_power'))

In [None]:
def calculate_missing_data_coeff(intersection_power):
    result = min(intersection_power * 1.0 / 50, 1.0)
    if math.isnan(result):
        return 0.0
    return result

In [None]:
calculate_missing_data_coeff_udf = udf(calculate_missing_data_coeff, returnType=DoubleType())


In [None]:

data_miss_coeffs = intersection_power.select('user_id', calculate_missing_data_coeff_udf('intersection_power').alias('coeff'))

In [None]:

pearson_adjusted = pearson_df.join(
    data_miss_coeffs,
    on='user_id').select('user_id', (data_miss_coeffs.coeff * pearson_df.corr_with_target).alias('adjusted_corr'))

In [None]:
pearson_neighbours = pearson_adjusted.sort(desc('adjusted_corr')).limit(31).filter(pearson_adjusted['user_id'] != target_id)

In [None]:
unrated_films = u_item.select(
    'movie_id',
    'movie_title',
    F.when(u_item.movie_id.isin(target_items), 0).otherwise(1).alias('is_unrated')).filter('is_unrated = 1').select(
    'movie_id',
    'movie_title')

In [None]:
average_rating.filter(average_rating['user_id'] == target_id).take(1)[0]

In [None]:
t = unrated_films.join(
    u_data, on=u_data['item_id'] == unrated_films['movie_id'] # get unrated u_data
).join(
    average_rating, on=average_rating['user_id'] == u_data['user_id'] # get average ratings for each user
).join(
    pearson_neighbours, on='user_id' # get only neighbours u_data
).select('movie_id',
         'movie_title',
         pearson_neighbours['user_id'].alias('neighbour_id'),
         'rating',
         pearson_neighbours['adjusted_corr'].alias('corr'),
         'avg_rating'
).withColumn('avg_rating_target', lit(average_rating_target_value))

In [None]:

r = t.groupBy('movie_id').agg((
    F.first(t['avg_rating_target']) + F.sum(t['corr'] * (t['rating'] - t['avg_rating'])) / F.sum(t['corr'])
).alias('rating_prediction')
).withColumn('movie_id', t['movie_id'].cast(IntegerType()))

In [None]:
pearson_top10 = r.sort(['rating_prediction', 'movie_id'], ascending=[0, 1]).limit(10)

In [None]:
user_user_advise = pearson_top10.toPandas()
neighbours = [int(row.user_id) for row in pearson_neighbours.select('user_id').collect()]
advise = [row.movie_id for row in pearson_top10.select('movie_id').collect()]

d = {
    'average_film_ratings': average_film_ratings,
    'average_user_ratings': average_user_ratings,
    'completeness': completeness,
    'pearson_neighbours': neighbours,
    'pearson_top10': advise
}

In [None]:
data = {'pearson_neighbours': neighbours, 'pearson_top10': advise}

In [None]:
df = pd.DataFrame({'pearson_neighbours': neighbours, 'pearson_top10': advise},
                  index=list('abc'))
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example.parquet')