In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--packages com.databricks:spark-csv_2.10:1.2.0 pyspark-shell'
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.6 (default, Oct 26 2016 20:30:19)
SparkSession available as 'spark'.


In [2]:
path_data = '/user/alexey.astafiev/movielens/u.data'
path_item = '/user/alexey.astafiev/movielens/u.item'
my_user_id = 727

In [3]:
from pyspark.sql.types import *
from pyspark.sql.types import Row
from pyspark.sql import functions as F
import pyspark.ml.linalg as linalg

In [4]:
d_fields = [StructField('user_id', LongType(), True),
          StructField('movie_id', LongType(), True),
          StructField('rating', LongType(), True),
          StructField('timestamp', LongType(), True)]

In [5]:
u_fields = [StructField('movie_id', LongType(), True),
            StructField('movie_title', StringType(), True),
            StructField('releas_date', StringType(), True),
            StructField('video_release_date', StringType(), True),
            StructField('IMDb_URL', StringType(), True),
            StructField('unknown', LongType(), True),
            StructField('Action', LongType(), True),
            StructField('Adventure', LongType(), True),
            StructField('Animation', LongType(), True),
            StructField('Children', LongType(), True),
            StructField('Comedy', LongType(), True),
            StructField('Crime', LongType(), True),
            StructField('Documentary', LongType(), True),
            StructField('Drama', LongType(), True),
            StructField('Fantasy', LongType(), True),
            StructField('Film_Noir', LongType(), True),
            StructField('Horror', LongType(), True),
            StructField('Musical', LongType(), True),
            StructField('Mystery', LongType(), True),
            StructField('Romance', LongType(), True),
            StructField('Sci_Fi', LongType(), True),
            StructField('Thriller', LongType(), True),
            StructField('War', LongType(), True),
            StructField('Western', LongType(), True)
           ]

In [6]:
schema_d = StructType(d_fields)
schema_u = StructType(u_fields)

In [7]:
u_data = spark.read.csv(path_data, sep='\t', schema=schema_d)
u_item = spark.read.csv(path_item, sep='|', schema=schema_u)

In [None]:
u_data.printSchema()

In [None]:
u_item.printSchema()

In [None]:
# 1
user_count = u_data.select('user_id').groupBy(u_data.user_id).count().select(F.count('user_id')).first()[0]
item_count = u_item.select('movie_id').count()

In [None]:
user_count, item_count

(943, 1682)

In [None]:
rating_count = u_data.count()

In [None]:
# 2
average_user_ratings = rating_count / float(user_count)

In [None]:
average_user_ratings

106.04453870625663

In [None]:
# 3
average_film_ratings = rating_count / float(item_count)

In [None]:
average_film_ratings

59.45303210463734

In [None]:
# 4
completeness = rating_count / float(user_count * item_count)

In [None]:
completeness

0.06304669364224531

In [None]:
# Part 2, 1
respective_means = u_data.select('user_id', 'rating').groupby(u_data.user_id) \
                    .agg(F.sum(u_data.rating).alias('sum_rating'), F.count(u_data.rating).alias('count_rating')) \
                    .withColumn('mean_rating', F.col('sum_rating') / F.col('count_rating'))

In [None]:
# Part 2, 2

users_data = u_data.join(respective_means.select('user_id', 'mean_rating'), on='user_id', how='inner') \
    .withColumn('diff_r_mr', F.col('rating') - F.col('mean_rating'))

In [54]:
my_user_data = users_data.where(users_data.user_id == my_user_id).cache()

NameError: name 'users_data' is not defined

In [None]:
users_data = users_data.where(users_data.user_id <> my_user_id).cache()

In [None]:
dd = my_user_data.select(my_user_data.user_id.alias('user_id'), 'movie_id', my_user_data.diff_r_mr.alias('my_r_mr')) \
    .join(users_data.select(users_data.user_id.alias('u_user_id'), 'movie_id', users_data.diff_r_mr.alias('u_r_mr')), on='movie_id', how='inner') \
    .withColumn('mult', F.col('my_r_mr')*F.col('u_r_mr')).cache()

In [None]:
PearsonSim = dd.select('u_user_id','mult', 'my_r_mr', 'u_r_mr').groupby('u_user_id') \
    .agg(F.sum('mult').alias('sum1'), F.sqrt(F.sum(F.col('my_r_mr')**2)).alias('sqrt1'), F.sqrt(F.sum(F.col('u_r_mr')**2)).alias('sqrt2')) \
    .withColumn('PearsonSim', F.col('sum1') / (F.col('sqrt1')*F.col('sqrt2')))

In [None]:
# Part 2, (3, 4)

dd_count_movie_id = dd.select('u_user_id', 'movie_id') \
    .groupBy(dd.u_user_id) \
    .agg(F.count(dd.movie_id).alias('count_movie_id'))

In [None]:
PearsonSim_coeff = PearsonSim.join(dd_count_movie_id, on='u_user_id') \
    .withColumn('coeff',F.when(F.col('count_movie_id') / 50 <= 1, F.col('count_movie_id') / 50).otherwise(1)) \
    .withColumn('PearsonSim_coeff', F.col('coeff') * F.col('PearsonSim')) \
    .select('u_user_id', 'PearsonSim', 'coeff', 'PearsonSim_coeff')

In [None]:
pearson_neighbours = PearsonSim_coeff.select('u_user_id', 'PearsonSim_coeff') \
                      .orderBy(PearsonSim_coeff.PearsonSim_coeff.desc(), PearsonSim_coeff.u_user_id.asc()).limit(30)

In [None]:
# Part 2, 5

ra = respective_means.select('mean_rating').where(respective_means.user_id == my_user_id).first()[0]

In [None]:
ra

3.0372670807453415

In [None]:
no_my_user_rating = u_item.select('movie_id', 'movie_title') \
    .join(my_user_data, on='movie_id', how='left') \
    .select('movie_id', 'movie_title').where(F.col('rating').isNull())

In [None]:
dd = no_my_user_rating.join(users_data.select('user_id', 'movie_id', 'diff_r_mr'), on='movie_id',how='left') \
        .join(pearson_neighbours, F.col('user_id') == F.col('u_user_id'),how='left') \
        .select('movie_id','user_id', 'diff_r_mr', 'PearsonSim_coeff') \
        .where('PearsonSim_coeff is not null').cache()

In [None]:
pearson_top10 = dd.withColumn('numerator', dd.diff_r_mr*dd.PearsonSim_coeff) \
    .select('movie_id', 'PearsonSim_coeff', 'numerator') \
    .groupBy('movie_id') \
    .agg(F.sum(F.abs(F.col('PearsonSim_coeff'))).alias('sum_abs_PearsonSim_coeff'), F.sum(F.col('numerator')).alias('sum_numerator')) \
    .where('sum_abs_PearsonSim_coeff != 0.0') \
    .withColumn('r_ai', ra + F.col('sum_numerator') / F.col('sum_abs_PearsonSim_coeff')) \
    .orderBy(F.col('r_ai').desc()).limit(10)

In [None]:
# Result Lab08

average_film_ratings

59.45303210463734

In [None]:
average_user_ratings

106.04453870625663

In [None]:
completeness

0.06304669364224531

In [None]:
pearson_neighbours_list = [i[0] for i in pearson_neighbours.select('u_user_id').collect()]

KeyboardInterrupt: 

In [None]:
pearson_top10_list = [i[0] for i in pearson_top10.collect()]

In [None]:
import json
open('../lab08.json', 'w').write(
    json.dumps(
        {
            "average_film_ratings": '%.4f' % average_film_ratings,
            "average_user_ratings": '%.4f' % average_user_ratings,
            "completeness": '%.4f' % completeness,
            "pearson_neighbours": pearson_neighbours_list,
            "pearson_top10": pearson_top10_list
         }
    )
)

In [8]:
# Lab08s

# Part 3, 1
average_rating = u_data.select(F.avg('rating')).first()[0]

In [10]:
# Part 3, 2

bu = u_data.withColumn('diff_r_mu', u_data.rating - average_rating) \
    .select('user_id', 'movie_id', 'diff_r_mu') \
    .groupBy('user_id') \
    .agg(F.count('movie_id').alias('count_movie_id'), F.sum('diff_r_mu').alias('sum_diff_r_mu')) \
    .withColumn('bu', (1 / (F.col('count_movie_id') + 10)) * F.col('sum_diff_r_mu')) \
    .select('user_id', 'bu').cache()

In [11]:
# Part 3, 3

bi = u_data.join(bu, on='user_id', how='inner') \
    .withColumn('diff_r_bu_mu', F.col('rating') - F.col('bu') - average_rating) \
    .select('movie_id', 'user_id', 'diff_r_bu_mu').groupBy('movie_id') \
    .agg(F.count('user_id').alias('count_user_id'), F.sum('diff_r_bu_mu').alias('sum_diff_r_bu_mu')) \
    .withColumn('bi', (1 / (F.col('count_user_id') + 25)) * F.col('sum_diff_r_bu_mu')) \
    .select('movie_id', 'bi').cache()

In [12]:
# Part 3, 4

cross_join = u_data.select('user_id').distinct().crossJoin(u_item.select('movie_id'))
cross_join = cross_join.withColumn('um', F.concat(F.col('user_id'),F.lit('_'), F.col('movie_id')))

In [13]:
u_data = u_data.withColumn('um', F.concat(F.col('user_id'),F.lit('_'), F.col('movie_id')))

In [14]:
bui_r = cross_join.join(u_data, on='um', how='left') \
        .select(cross_join.um, cross_join.user_id, cross_join.movie_id, 'rating') \
        .join(bu, on='user_id', how='left').join(bi, on='movie_id', how='left') \
        .withColumn('bui', average_rating + F.col('bu') + F.col('bi')).cache()

In [15]:
bui_r = bui_r.select('movie_id', 'user_id', 'rating', 'bui')

In [16]:
# Part 4, 1

clean_ratings = bui_r.withColumn('diff_r_bui', F.when(F.col('rating').isNull(), 0).otherwise(F.col('rating') - F.col('bui'))) \
                    .select('user_id', 'movie_id', 'diff_r_bui', 'bui')

In [17]:
# Part 4, 2 

dd3 = clean_ratings.select('user_id', 'movie_id', 'diff_r_bui').groupBy('movie_id') \
    .agg(F.collect_list('user_id').alias('user_id_list'), F.collect_list('diff_r_bui').alias('diff_r_bui_list')).cache()

In [18]:
def _udf_toSparseVector(x1, x2):
    dd = {}
    ziped = zip(x1, x2)
    for i in ziped:
        if i[1]:
            dd[i[0]-1] = i[1]
    return linalg.SparseVector(len(ziped), dd)

In [19]:
udf_toSparseVector = F.udf(_udf_toSparseVector, linalg.VectorUDT())

In [20]:
dd4 = dd3.select('movie_id', udf_toSparseVector('user_id_list', 'diff_r_bui_list').alias('sparse_vector'))

In [21]:
# CrossJoin как ты мне попил крови :<<<<<

all_vectors_bc = sc.broadcast(dd4.collect())

In [22]:
def _udf_cos_sim(v1):
    dd = {}
    norm_v1 = v1.norm(2)
    bc_len = len(all_vectors_bc.value)
    for i in xrange(bc_len):
        r = all_vectors_bc.value[i]
        dd[r.movie_id] = float(v1.dot(r.sparse_vector)/norm_v1/r.sparse_vector.norm(2))
    return dd

In [23]:
udf_cos_sim = F.udf(_udf_cos_sim, MapType(IntegerType(), FloatType())) 

In [24]:
cosSim_data = dd4.withColumn('cosSim', udf_cos_sim('sparse_vector')).select('movie_id', 'cosSim')

In [25]:

no_movie_my_user_rating = u_item.select('movie_id').subtract(u_data.where(u_data.user_id == my_user_id) \
                            .select('movie_id')).collect()

In [26]:
no_movie_my_user_rating_list = [i[0] for i in no_movie_my_user_rating]

In [96]:
# Part 4, 3a
# Берем косинусы по фильмам, который пользователь не оценивал
my_user_cosSim_with_no_rating = cosSim_data.where(cosSim_data.movie_id.isin(no_movie_my_user_rating_list))

In [29]:
schema = ArrayType(StructType([
    StructField("movie_id", IntegerType(), False),
    StructField("cosSim", FloatType(), False)
]))

In [69]:
def _udf_top30(cos_sim):
    return sorted(cos_sim.items(), key=lambda x: x[1], reverse=True)[1:31]

In [70]:
udf_top30 = F.udf(_udf_top30, schema)

In [107]:
my_user_top30_cosSim_with_no_rating = my_user_cosSim_with_no_rating.withColumn('top30', udf_top30('cosSim'))

In [108]:
# Part 4, 3b
udf_get_movie_id = F.udf(lambda x: x.movie_id, IntegerType())
udf_get_cosSim = F.udf(lambda x: x.cosSim, FloatType())

In [109]:
# Кризис названия переменных :( Раздербаним косинусы соседей
dd5 = my_user_top30_cosSim_with_no_rating.select('movie_id', F.explode('top30').alias('cosSim'))

In [110]:
dd6 = dd5.select('movie_id', udf_get_movie_id('cosSim').alias('n_movie_id'), udf_get_cosSim('cosSim').alias('cosSim'))

In [137]:
# Нет смысла делать 5 пункт
dd6.orderBy('movie_id', 'n_movie_id').where('cosSim < 0').show(5)

+--------+----------+------+
|movie_id|n_movie_id|cosSim|
+--------+----------+------+
+--------+----------+------+



In [112]:
dd7 = dd6.join(clean_ratings.select(F.col('movie_id').alias('n_movie_id'), 'diff_r_bui') \
    .where((clean_ratings.user_id == my_user_id) & (~clean_ratings.movie_id.isin(no_movie_my_user_rating_list)))\
        , on='n_movie_id')

In [116]:
dd8 = dd7.select('movie_id', 'cosSim', (dd7.cosSim*dd7.diff_r_bui).alias('mult_cosSim_diff_r_bui'))\
    .groupBy('movie_id') \
    .agg(F.sum(F.abs(F.col('cosSim'))).alias('denominator')\
         , F.sum(F.col('mult_cosSim_diff_r_bui')).alias('numerator'))

In [122]:
dd9 = dd8.join(clean_ratings.select('movie_id', 'bui') \
    .where((clean_ratings.user_id == my_user_id) & (clean_ratings.movie_id.isin(no_movie_my_user_rating_list)))\
    .orderBy('movie_id'), on='movie_id').cache()

In [123]:
# Закэшируем
dd9.count()

947

In [127]:
# Ну наконец-то!
rui = dd9.select('movie_id', (dd9.bui + dd9.numerator / dd9.denominator).alias('rui'))

In [130]:
# Part 4, 4&5 потому что отрицательных косинусов нет
predicators_top10 = rui.orderBy(rui.rui.desc()).limit(10)

In [134]:
predicators_top10_list = [i[0] for i in predicators_top10.select('movie_id').collect()]

In [135]:
predicators_top10_list

[512, 489, 1611, 945, 1123, 48, 514, 937, 64, 1278]

In [136]:
import json
open('../lab08s.json', 'w').write(
    json.dumps(
        {
            "average_rating": '%.4f' % average_rating,
            "predicators_top10": predicators_top10_list,
            "predicators_positive_top10": predicators_top10_list
         }
    )
)