In [2]:
if True:
    import os
    os.environ["PYSPARK_SUBMIT_ARGS"]='--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'
    execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))
    import os
    import sys
else:
    os.environ['SPARK_HOME'] = '/usr/lib/spark'
    sys.path.insert(0, '/usr/lib/spark/python/lib/py4j-0.9-src.zip')
    sys.path.insert(0, '/usr/lib/spark/python/')
    sys.path.insert(0, '/usr/local/lib64/python2.7/site-packages')
    sys.path.insert(0,'/usr/local/lib/python2.7/site-packages')
    
    from pyspark import SparkContext
    from pyspark.sql import SQLContext, HiveContext
    
    try: sc = SparkContext()
    except: None    
    sqlc = SQLContext(sc)
    spark = sqlc

In [4]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [5]:
import seaborn as sns

In [6]:
import numpy as np
import pandas as pd
import time
import json

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, MapType
import pyspark.sql.functions as F

In [7]:
from operator import add
from pyspark.sql.types import *

In [8]:
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
from datetime import datetime

# Load data

In [10]:
strain_predictions = spark.read.parquet("lab_12/cache/full_train_bp")


In [11]:
#strain_predictions = strain_predictions.select("userId", "movieId", "rating")
strain_predictions.show(2)

+------+-------+------+--------------+--------------+-------------------+-------------------+
|userId|movieId|rating|n_user_ratings|n_item_ratings|bp_predicted_rating|             bp_err|
+------+-------+------+--------------+--------------+-------------------+-------------------+
| 20833|     31|   4.5|           425|           161|  3.787585287229863|  0.712414712770137|
|201833|     31|   3.5|            23|           161|  4.136999434381541|-0.6369994343815408|
+------+-------+------+--------------+--------------+-------------------+-------------------+
only showing top 2 rows



In [12]:
stest_predictions = spark.read.parquet("lab_12/cache/full_test_bp")

In [13]:
stest_predictions.show(2)

+------+-------+--------------+--------------+-------------------+
|userId|movieId|n_user_ratings|n_item_ratings|bp_predicted_rating|
+------+-------+--------------+--------------+-------------------+
|223036|     31|          1376|           161|  3.548151868862599|
|  1051|     31|          1108|           161| 2.1218463332398656|
+------+-------+--------------+--------------+-------------------+
only showing top 2 rows



# Calculate normalised user ratings

## avg user rating with normalisation

In [14]:
avg_rating = strain_predictions.agg({"rating":"mean"}).collect()[0][0]
print avg_rating

3.52186384662


In [15]:
strain_predictions.registerTempTable("strain")

In [16]:
buser = spark.sql("""
      select
          userId,
          (sum_rating + cast({avg_rating} as double)) / (n_ratings + 1.0) as user_avg_rating
      from (    
                select 
                    userId,
                    sum(rating) as sum_rating,
                    count(*) as n_ratings
                from strain
                group by userId
      ) as x          
""".format(avg_rating=avg_rating))

In [17]:
buser.show(2)

+------+------------------+
|userId|   user_avg_rating|
+------+------------------+
| 15831|2.9065248364588783|
| 40631|3.4174071477149424|
+------+------------------+
only showing top 2 rows



In [18]:
buser.cache()

DataFrame[userId: int, user_avg_rating: double]

In [19]:
buser.registerTempTable("buser")

In [20]:
q = spark.sql("""
       select 
           t.userId,
           t.movieId,
           (t.rating - u.user_avg_rating) as user_norm_rating
       from strain t
       join buser u on u.userId = t.userId 
""")

In [21]:
q.show(2)

+------+-------+-------------------+
|userId|movieId|   user_norm_rating|
+------+-------+-------------------+
|    31|   5681| 1.0444607342445456|
|    31|  11482|0.04446073424454555|
+------+-------+-------------------+
only showing top 2 rows



In [22]:
strain_predictions = q

In [23]:
strain_predictions.registerTempTable("strain")

In [24]:
strain_predictions.cache()

DataFrame[userId: int, movieId: int, user_norm_rating: double]

## add avg user rating for all records in test(submit) set

In [25]:
stest_predictions.registerTempTable("stest")

In [26]:
q = spark.sql("""
           select
              t.userId,
              t.movieId,
              coalesce(u.user_avg_rating, cast({avg_rating} as double)) as user_avg_rating
           from stest t
           left join buser u on t.userId = u.userId 
           
""".format(avg_rating=avg_rating))

In [27]:
q.show(2)

+------+-------+------------------+
|userId|movieId|   user_avg_rating|
+------+-------+------------------+
|223036|     31| 3.181933089213232|
|  1051|     31|1.7502451432341026|
+------+-------+------------------+
only showing top 2 rows



In [28]:
stest_predictions = q

In [29]:
stest_predictions.registerTempTable("stest")

In [30]:
stest_predictions.cache()

DataFrame[userId: int, movieId: int, user_avg_rating: double]

In [31]:
buser.unpersist()

DataFrame[userId: int, user_avg_rating: double]

In [32]:
stest_predictions.count()

10531564

In [33]:
strain_predictions.count()

10531564

# Load similarity matrix

In [34]:
def convert_u2u_tupple(line):
    r = line.split(',')
    return (int(r[0]), int(r[1]), float(r[2]), int(r[3]))

def load_u2u_sim():
    raw = sc.textFile("lab_12/u2u_top100to200.csv")\
            .filter(lambda x: not x.startswith("user"))\
            .map(convert_u2u_tupple)
    return raw.toDF(schema=StructType([StructField("user1",  IntegerType()),
                                       StructField("user2", IntegerType()),
                                       StructField("sim",  FloatType()),
                                       StructField("sim_rank",  IntegerType())])) 
    return raw

In [35]:
u2u = load_u2u_sim()

In [36]:
u2u.show(2)

+-----+------+----------+--------+
|user1| user2|       sim|sim_rank|
+-----+------+----------+--------+
|    1|179650|0.93893325|       0|
|    1|227097|0.93850267|       1|
+-----+------+----------+--------+
only showing top 2 rows



In [37]:
u2u.cache()

DataFrame[user1: int, user2: int, sim: float, sim_rank: int]

In [38]:
u2u.registerTempTable("u2u_sim")

# Make predictions on test

In [39]:
q = spark.sql("""
        select
          pr.userId,
          pr.movieId,
          avg(pr.user_avg_rating) + sum(kr.user_norm_rating * s.sim) /  sum(s.sim)  as predicted_rating,
          count(*) as n_used_ratings
        from stest pr
        join u2u_sim s on pr.userId = s.user1 and s.sim_rank <= 100
        join strain kr on pr.movieId = kr.movieId and kr.userId = s.user2
        group by pr.userId, pr.movieId
""")

In [40]:
q.show(2)

+------+-------+------------------+--------------+
|userId|movieId|  predicted_rating|n_used_ratings|
+------+-------+------------------+--------------+
| 54727|     48|3.4749070004135874|            23|
| 69727|     48|3.9890742730391455|            28|
+------+-------+------------------+--------------+
only showing top 2 rows



In [41]:
predicted_on_test = q

In [42]:
predicted_on_test.registerTempTable("predicted_test")

In [43]:
predicted_on_test.cache()

DataFrame[userId: int, movieId: int, predicted_rating: double, n_used_ratings: bigint]

In [44]:
predicted_on_test.count()

9646862

### clear caches

In [45]:
u2u.unpersist()
strain_predictions.unpersist()
stest_predictions.unpersist()

DataFrame[userId: int, movieId: int, user_avg_rating: double]

## load bp predictions once again and join for missed one

In [46]:
stest_predictions = spark.read.parquet("lab_12/cache/full_test_bp")

In [47]:
stest_predictions.registerTempTable("stest")

In [48]:
eval_data = spark.sql("""
        select
          tbase.*,
          coalesce(pt.n_used_ratings, 0) as n_used_user_ratings,
          coalesce(predicted_rating, bp_predicted_rating) as predicted_u2u_rating,
          if(pt.n_used_ratings is not null, 1, 0) as has_u2u_rating
        from stest tbase
        left join predicted_test  pt on tbase.userId = pt.userId and tbase.movieId = pt.movieId
""")

In [49]:
eval_data.cache()

DataFrame[userId: int, movieId: int, n_user_ratings: bigint, n_item_ratings: bigint, bp_predicted_rating: double, n_used_user_ratings: bigint, predicted_u2u_rating: double, has_u2u_rating: int]

In [50]:
eval_data.show(2)

+------+-------+--------------+--------------+-------------------+-------------------+--------------------+--------------+
|userId|movieId|n_user_ratings|n_item_ratings|bp_predicted_rating|n_used_user_ratings|predicted_u2u_rating|has_u2u_rating|
+------+-------+--------------+--------------+-------------------+-------------------+--------------------+--------------+
|    28|   2111|           335|            37| 3.1485874853613227|                  0|  3.1485874853613227|             0|
|    30|  22037|            28|         32123| 3.1463153925797798|                 35|   3.886345295450649|             1|
+------+-------+--------------+--------------+-------------------+-------------------+--------------------+--------------+
only showing top 2 rows



# Save for ensemble cheks

In [51]:
eval_data = eval_data.coalesce(8)

In [52]:
#eval_data.write.parquet("lab_12/cache/full_test_bp_u2u_top100", mode='overwrite')

# Export as csv for submission

In [53]:
def cut_dot5to5(x):
    if x < .5:
        return .5
    if x > 5:
        return 5.0
    return x

In [54]:
res_pd = eval_data.select("userId", "movieId", F.col("predicted_u2u_rating").alias("rating")).toPandas()

In [55]:
res_pd = res_pd.sort_values(by=["userId", "movieId"])

In [56]:
res_pd.shape

(10531564, 3)

In [57]:
res_pd['rating'] = res_pd['rating'].map(cut_dot5to5)

In [58]:
res_pd['rating'].describe()

count    1.053156e+07
mean     3.551392e+00
std      7.726598e-01
min      5.000000e-01
25%      3.113202e+00
50%      3.639083e+00
75%      4.092681e+00
max      5.000000e+00
Name: rating, dtype: float64

In [59]:
#res_pd.to_csv("lab_12/res/full_test_bp_u2u_top100.csv", index=False)

# get rating stats

In [61]:
#q = spark.read.parquet("lab_12/cache/full_test_bp_u2u_top100")


In [62]:
#q.show(2)

In [63]:
#q.count()

In [64]:
#z = q.select("userId", "movieId", "n_user_ratings", "n_item_ratings")

In [65]:
#zpd = z.toPandas()

In [None]:
#zpd.to_csv("/data/home/taras.svirsky/lab12/res/test_subm_rating_counts.csv")