# project for the course: Big Data Mining

## Preliminary data processing

In [1]:
%pprint
# imports for pyspark
import findspark
findspark.init()
import pyspark

# other imports
from pyspark.sql.types import StructType, StructField 
from pyspark.sql.types import DoubleType, IntegerType 
from pyspark.ml.feature import VectorAssembler 
from pyspark.sql import SQLContext 
from pyspark.ml.linalg import VectorUDT,Vectors
from pyspark import SparkContext
sc = SparkContext()
import pyspark.sql.functions as sqlf
import re 
import unicodedata
import math
from pyspark.sql.types import StringType
sqlContext = SQLContext(sc)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

Pretty printing has been turned OFF


In [2]:
# first define the user targetted by recommender system!
user = '1'

# define the hyperparameters
w1 = 0.25
w2 = 0.25
w3 = 0.25
w4 = 0.25
alpha = 0.0005
delta = 0.05

In [3]:
# functions to clean the data
future_pattern = re.compile("""([^,"]+|"[^"]+")(?=,|$)""")

def parseCSV(line):
    return future_pattern.findall(line) 

In [4]:
# loading the file rating.csv
path_data = "/home/romain/movie_small"
rt = sc.textFile(path_data+"/ratings.csv").map(parseCSV)
rt.take(5)

[['userId', 'movieId', 'rating', 'timestamp'], ['1', '1', '4.0', '964982703'], ['1', '3', '4.0', '964981247'], ['1', '6', '4.0', '964982224'], ['1', '47', '5.0', '964983815']]

In [5]:
# loading the file movies.csv
mv = sc.textFile(path_data+"/movies.csv").map(parseCSV) 
# show
mv.take(5)

[['movieId', 'title', 'genres'], ['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'], ['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy'], ['3', 'Grumpier Old Men (1995)', 'Comedy|Romance'], ['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance']]

In [6]:
# obtain a clean movie file rdd

# starting to clean and transform the movie file
mv2 = mv.filter(lambda x: x[0]!="movieId").map(lambda x: (str(x[0]), str(x[1]), str(x[2])))
# split movie titles and years
mv3 = mv2.map(lambda x: [x[0],x[1].rsplit("(",1)[0].strip(),x[1].split("(")[-1].split(")")[0],x[2]])
# filter for anomalies
mv4 = mv3.filter(lambda x: x[2].isdigit()).map(lambda x: [x[0], x[1], int(x[2]), x[3]])
# eliminate rows for which "no genres" is listed
mv5 = mv4.filter(lambda x: "genre" not in x[3])
# display loss
print(str(mv4.count() - mv5.count()) + " movies have been removed from the data set because they have no genres listed.")
# show
mv5.take(5) 

25 movies have been removed from the data set because they have no genres listed.


[['1', 'Toy Story', 1995, 'Adventure|Animation|Children|Comedy|Fantasy'], ['2', 'Jumanji', 1995, 'Adventure|Children|Fantasy'], ['3', 'Grumpier Old Men', 1995, 'Comedy|Romance'], ['4', 'Waiting to Exhale', 1995, 'Comedy|Drama|Romance'], ['5', 'Father of the Bride Part II', 1995, 'Comedy']]

## Part 1: a similarity-based model

In [7]:
# now, with a clean basis, start transforming for our purpose

# function to convert the genres column to binaries
def convert_binary(genres):
    binaries = [0] * 18
    if "Action" in genres:
        binaries[0] = 1
    if "Adventure" in genres:
        binaries[1] = 1
    if "Animation" in genres:
        binaries[2] = 1
    if "Children" in genres:
        binaries[3] = 1
    if "Comedy" in genres:
        binaries[4] = 1
    if "Crime" in genres:
        binaries[5] = 1
    if "Documentary" in genres:
        binaries[6] = 1
    if "Drama" in genres:
        binaries[7] = 1
    if "Fantasy" in genres:
        binaries[8] = 1
    if "Film-Noir" in genres:
        binaries[9] = 1
    if "Horror" in genres:
        binaries[10] = 1
    if "Musical" in genres:
        binaries[11] = 1
    if "Mystery" in genres:
        binaries[12] = 1
    if "Romance" in genres:
        binaries[13] = 1
    if "Sci" in genres:
        binaries[14] = 1
    if "Thriller" in genres:
        binaries[15] = 1
    if "War" in genres:
        binaries[16] = 1
    if "Western" in genres:
        binaries[17] = 1
    return binaries
        
# apply to movies, get rid of titles
mv6 = mv5.map(lambda x: [x[0], (x[2], convert_binary(x[3]))])
# show
mv6.take(5) 

[['1', (1995, [0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])], ['2', (1995, [0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])], ['3', (1995, [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])], ['4', (1995, [0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])], ['5', (1995, [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])]]

In [8]:
# obtain the total number of ratings for each movie
rt2 = rt.filter(lambda x: x[0]!="userId").map(lambda x: (str(x[0]), str(x[1]), float(x[2]), int(x[3])))
rt3 = rt2.map(lambda x: [x[1], 1]).reduceByKey(lambda a,b: a+b).sortBy(lambda x: int(x[0]))
# obtain the ratings for each movie
rt4 = rt2.map(lambda x: [x[1], x[2]]).reduceByKey(lambda a,b: a+b)
# obtain rdd with total number of ratings and average rating for each movie
rt5 = rt4.join(rt3).map(lambda x: [x[0], (x[1][1], round(x[1][0]/x[1][1],4))])
# show
rt3.take(5)

[('1', 215), ('2', 110), ('3', 52), ('4', 7), ('5', 49)]

In [9]:
# join rt5 with mv6: obtain rdd with 5 columns
# id, associated to (year, average rating, number of ratings, genre)
mv7 = rt5.join(mv6).map(lambda x: [x[0], (x[1][1][0], x[1][0][1], x[1][0][0], x[1][1][1])])
mv7.take(2)

[['70', (1996, 3.5091, 55, [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0])], ['441', (1993, 3.9286, 42, [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])]]

## Focus on user preferences

In [10]:
# obtain the list of movies rated by user
mr = rt.filter(lambda x: x[0]==user).map(lambda x: (x[1], x[2]))
# join with movies
mr2 = mv7.join(mr).map(lambda x: [x[1][0][0], float(x[1][1][0]), x[1][0][2], x[1][0][3]])
# show
mr2.take(5)

[[1993, 4.0, 42, [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [1996, 4.0, 36, [0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [1975, 5.0, 136, [0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]], [1998, 4.0, 31, [0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0]], [1990, 4.0, 88, [0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1]]]

In [11]:
# for convenience, also obtain a display of movies rated by user
mr3 = rt2.map(lambda x: [x[0], x[1]]).filter(lambda x: x[0]==user).map(lambda x:  (x[1],x[0])).join(mv5.map(lambda x: (x[0],[x[1], x[2], x[3]]))).map(lambda x: [x[1][1][0], x[1][1][1], x[1][1][2]])
# show
mr3.take(20)

for i in mr3.take(20):
    print(str(i[0])+ "\t \t" + str(i[1]) + "\t \t" + str(i[2]))
    

"Usual Suspects, The	 	1995	 	Crime|Mystery|Thriller
From Dusk Till Dawn	 	1996	 	Action|Comedy|Horror|Thriller
Braveheart	 	1995	 	Action|Drama|War
Canadian Bacon	 	1995	 	Comedy|War
Billy Madison	 	1995	 	Comedy
Tommy Boy	 	1995	 	Comedy
Forrest Gump	 	1994	 	Comedy|Drama|Romance|War
Dazed and Confused	 	1993	 	Comedy
"Three Musketeers, The	 	1993	 	Action|Adventure|Comedy|Romance
Tombstone	 	1993	 	Action|Drama|Western
Dances with Wolves	 	1990	 	Adventure|Drama|Western
Pinocchio	 	1940	 	Animation|Children|Fantasy|Musical
Fargo	 	1996	 	Comedy|Crime|Drama|Thriller
Mission: Impossible	 	1996	 	Action|Adventure|Mystery|Thriller
"Rock, The	 	1996	 	Action|Adventure|Thriller
Twister	 	1996	 	Action|Adventure|Romance|Thriller
"Ghost and Mrs. Muir, The	 	1947	 	Drama|Fantasy|Romance
Escape to Witch Mountain	 	1975	 	Adventure|Children|Fantasy
"Three Caballeros, The	 	1945	 	Animation|Children|Musical
"Sword in the Stone, The	 	1963	 	Animation|Children|Fantasy|Musical


In [12]:
# for later, define all movies not rated by the user
# all movies rated by the user
temp1 = rt.filter(lambda x: x[0]==user).map(lambda x: (x[1], 'whatever')).groupByKey().map(lambda x: (x[0])).sortBy(lambda x: int(x))
# all the movies
temp2 = rt.filter(lambda x: x[0]!="userId").map(lambda x: (x[1], 'whatever')).groupByKey().map(lambda x: (x[0])).sortBy(lambda x: int(x))
# all movies - movies rated = all movies not rated
temp3 = temp2.subtract(temp1).sortBy(lambda x: int(x))
temp3.take(10)
# add a second entry to allow for later join
temp4 = temp3.map(lambda x: [x, 'whatever'])

In [13]:
# define function to aggregate genres
def aggregate_genre(binaries1, binaries2):
    cum_binaries = [0] * 18
    for i in range (18):
        cum_binaries[i] = binaries1[i] + binaries2[i]
    return cum_binaries

# define the individual characteristics of user:
el1 = int(round(mr2.map(lambda x: x[0]).reduce(lambda a,b: a+b)/mr2.count()))
el2 = round(mr2.map(lambda x: x[1]).reduce(lambda a,b: a+b)/mr2.count(),2)
el3 = int(round(mr2.map(lambda x: x[2]).reduce(lambda a,b: a+b)/mr2.count()))
el4 = [round(x/mr2.count(),2) for x in  mr2.map(lambda x: x[3]).reduce(aggregate_genre)]
# now obtain the benchmark vector for user:
bv = sc.parallelize([(el1, el2, el3, el4)])
bv.collect()

[(1984, 4.37, 70, [0.39, 0.37, 0.12, 0.18, 0.36, 0.19, 0.0, 0.29, 0.2, 0.0, 0.07, 0.09, 0.08, 0.11, 0.17, 0.24, 0.09, 0.03])]

## Measures of similarities with other movies

In [14]:
# define first the z function
def z(val, scale):
    return math.atan(scale*val)/(math.pi/2)

# define the genre proximity function
def gp(genre1, genre2):
    cum = 0
    for i in range (18):
        cum += genre1[i]*genre2[i]
    return cum / genre1.count(1)

In [15]:
# exemple value of z function
print(round(z(100,0.05),3))

0.874


In [16]:
# then convert mv7 to a format similar to bv (with movie id additionally)
mv8 = mv7.map(lambda x: [x[0], x[1][0], x[1][1], x[1][2], x[1][3]])
# run a cartesian with the benchmark to evaluate the metrics with every movie
mv9 = bv.cartesian(mv8)
# reorganize
mv10 = mv9.map(lambda x: [x[1][0], (x[1][1], x[0][0]), (x[1][2], x[0][1]), (x[1][3], x[0][2]), (x[1][4], x[0][3])])
# show
mv10.take(1)

[['70', (1996, 1984), (3.5091, 4.37), (55, 70), ([1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0], [0.39, 0.37, 0.12, 0.18, 0.36, 0.19, 0.0, 0.29, 0.2, 0.0, 0.07, 0.09, 0.08, 0.11, 0.17, 0.24, 0.09, 0.03])]]

In [17]:
# now run the maps to get a measure of proximity
mv11 = mv10.map(lambda x: [x[0], 1 - z(abs(x[1][0] - x[1][1]), alpha) , 1 - abs(x[2][0] - x[2][1]) / 5, 1 - z(abs(x[3][0] - x[3][1]), delta), gp(x[4][0], x[4][1])])
mv12 = mv11.map(lambda x: [x[0], w1*x[1] + w2*x[2] + w3*x[3] + w4*x[4]])
# remove movies rated by user
mv13 = temp4.join(mv12).map(lambda x: (x[0], x[1][1]))
# recover remaining movies and sort descending
mv14 = mv5.map(lambda x: (x[0], [x[1], x[2], x[3]]))
mv15 = mv14.join(mv13).map(lambda x: [round(x[1][1],3), x[1][0][0], x[1][0][1], x[1][0][2]]).sortBy(lambda x: -x[0])

# display recommandations
for i in mv15.take(20):
    print(str(i[0])+ "\t" + str(i[1]) + "\t" + str(i[2]) + "\t" + str(i[3]))

0.794	In the Line of Fire	1993	Action|Thriller
0.792	"Avengers, The	2012	Action|Adventure|Sci-Fi|IMAX
0.791	This Is Spinal Tap	1984	Comedy
0.788	"Fish Called Wanda, A	1988	Comedy|Crime
0.787	"Good, the Bad and the Ugly, The (Buono, il brutto, il cattivo, Il)	1966	Action|Adventure|Western
0.785	Traffic	2000	Crime|Drama|Thriller
0.78	"Royal Tenenbaums, The	2001	Comedy|Drama
0.78	Harry Potter and the Goblet of Fire	2005	Adventure|Fantasy|Thriller|IMAX
0.779	Erin Brockovich	2000	Drama
0.778	"American President, The	1995	Comedy|Drama|Romance
0.778	Django Unchained	2012	Action|Drama|Western
0.777	Scarface	1983	Action|Crime|Drama
0.772	City of God (Cidade de Deus)	2002	Action|Adventure|Crime|Drama|Thriller
0.769	Die Hard 2	1990	Action|Adventure|Thriller
0.769	Robin Hood: Men in Tights	1993	Comedy
0.769	Pirates of the Caribbean: Dead Man's Chest	2006	Action|Adventure|Fantasy
0.768	Mary Poppins	1964	Children|Comedy|Fantasy|Musical
0.767	Amadeus	1984	Drama
0.764	Lost in Translation	2003	Comedy|D

## Part 2: a simple linear regression model

In [18]:
# obtain the forecast set (the set with the features but no predicted value)

In [19]:
# obtain square rating for each movie
rt6 = rt2.map(lambda x: [x[1], x[2]**2]).reduceByKey(lambda a,b: a+b)
# obtain exp(standard deviation) for each movie
rt7 = rt6.join(rt5).map(lambda x: [x[0], (x[1][0]/x[1][1][0], x[1][1][1]**2)]).map(lambda x: (x[0], (x[1][0]-x[1][1])))
# show
rt7.take(5)

[('70', 1.013489917272727), ('441', 0.839911563809526), ('648', 0.807532234567903), ('1024', 0.8886444433333338), ('1060', 1.1708530844444454)]

In [20]:
# redefine year features
mv16 = mv5.map(lambda x: (x[0], x[2] - 2000))
# show
mv16.take(3)

[('1', -5), ('2', -5), ('3', -5)]

In [21]:
# for each movie, gather average rating, log(num rating), exp(sd), year, and 18 dummy features
rt8 = mv7.join(rt7).join(mv16).map(lambda x: (x[0], [x[1][0][0][1], round(math.log(x[1][0][0][2]),4), round(x[1][0][1],4), x[1][1], x[1][0][0][3][0], x[1][0][0][3][1], x[1][0][0][3][2], x[1][0][0][3][3], x[1][0][0][3][4], x[1][0][0][3][5], x[1][0][0][3][6], x[1][0][0][3][7], x[1][0][0][3][8], x[1][0][0][3][9], x[1][0][0][3][10], x[1][0][0][3][11], x[1][0][0][3][12], x[1][0][0][3][13], x[1][0][0][3][14], x[1][0][0][3][15], x[1][0][0][3][16], x[1][0][0][3][17]]))
# show
rt8.take(3)

[('70', [3.5091, 4.0073, 1.0135, -4, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0]), ('1220', [3.8095, 4.4308, 0.821, -20, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]), ('1805', [3.0161, 3.434, 1.0402, -2, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0])]

In [22]:
# convert prediction set to dataframe
ps1 = rt8.map(lambda x: [x[0], x[1][0], x[1][1], x[1][2], x[1][3], x[1][4], x[1][5], x[1][6], x[1][7], x[1][8], x[1][9], x[1][10], x[1][11], x[1][12], x[1][13], x[1][14], x[1][15], x[1][16], x[1][17], x[1][18], x[1][19], x[1][20], x[1][21]])
ps2 = ps1.toDF(['movieId','ar','nr','sdr','year','g1','g2','g3','g4','g5','g6','g7','g8','g9','g10','g11','g12','g13','g14','g15','g16','g17','g18'])
# show
ps2.show(5)
# convert to format compatible with ml
vectorAssembler = VectorAssembler(inputCols = ['ar','nr','sdr','year','g1','g2','g3','g4','g5','g6','g7','g8','g9','g10','g11','g12','g13','g14','g15','g16','g17','g18'], outputCol = 'features')
ps3 = vectorAssembler.transform(ps2)
ps4 = ps3.select(['movieId','features'])
ps4.show(5)


+-------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|movieId|    ar|    nr|   sdr|year| g1| g2| g3| g4| g5| g6| g7| g8| g9|g10|g11|g12|g13|g14|g15|g16|g17|g18|
+-------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|     70|3.5091|4.0073|1.0135|  -4|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|
|   1220|3.8095|4.4308| 0.821| -20|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|   1805|3.0161| 3.434|1.0402|  -2|  0|  0|  0|  0|  0|  1|  0|  1|  0|  0|  0|  0|  1|  0|  0|  1|  0|  0|
|   2078|3.8302|3.9703|0.7164| -33|  0|  0|  1|  1|  1|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|   3508|  4.25|2.8904|0.3958| -24|  1|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  1|  0|  1|
+-------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
only showing top 5 rows

+--

In [23]:
# now obtain the training set for the user
# it is basically the forecast set restricted to the movies rated by the user
# where we now have a predicted value, using the user rating for this specific movie

In [24]:
# gather user's result in a single rdd where the first column is explained variable rating, and the others are the regressors
rt9 = mr.join(rt8).map(lambda x: (float(x[1][0]),x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3],x[1][1][4],x[1][1][5],x[1][1][6],x[1][1][7],x[1][1][8],x[1][1][9],x[1][1][10],x[1][1][11],x[1][1][12],x[1][1][13],x[1][1][14],x[1][1][15],x[1][1][16],x[1][1][17],x[1][1][18],x[1][1][19],x[1][1][20],x[1][1][21]))
# show
rt9.take(3)

[(5.0, 4.2377, 5.3181, 0.6387, -5, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0), (5.0, 3.3265, 3.8918, 1.1079, -5, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), (4.0, 3.9286, 3.7377, 0.8399, -7, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)]

In [25]:
# convert to dataframe
ts = rt9.toDF(['rating','ar','nr','sdr','year','g1','g2','g3','g4','g5','g6','g7','g8','g9','g10','g11','g12','g13','g14','g15','g16','g17','g18'])
# show
ts.show(5)

+------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|rating|    ar|    nr|   sdr|year| g1| g2| g3| g4| g5| g6| g7| g8| g9|g10|g11|g12|g13|g14|g15|g16|g17|g18|
+------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|   5.0|4.2377|5.3181|0.6387|  -5|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  1|  0|  0|  1|  0|  0|
|   5.0|3.3265|3.8918|1.1079|  -5|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|   4.0|3.9286|3.7377|0.8399|  -7|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|   4.0|3.2623|4.1109|0.7099|  -7|  1|  1|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|   5.0|3.8154|4.1744|0.9966|  -7|  1|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
+------+------+------+------+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
only showing top 5 rows



In [26]:
# convert to format compatible with ml
vectorAssembler = VectorAssembler(inputCols = ['ar','nr','sdr','year','g1','g2','g3','g4','g5','g6','g7','g8','g9','g10','g11','g12','g13','g14','g15','g16','g17','g18'], outputCol = 'features')
ts1 = vectorAssembler.transform(ts)
ts1 = ts1.select(['rating','features'])
ts1.show(5)

+------+--------------------+
|rating|            features|
+------+--------------------+
|   5.0|(22,[0,1,2,3,9,16...|
|   5.0|(22,[0,1,2,3,8],[...|
|   4.0|(22,[0,1,2,3,8],[...|
|   4.0|(22,[0,1,2,3,4,5,...|
|   5.0|(22,[0,1,2,3,4,11...|
+------+--------------------+
only showing top 5 rows



In [27]:
# create linear regression
lr = LinearRegression(featuresCol='features',labelCol='rating',predictionCol='prediction')
# train
model = lr.fit(ts1)
# obtain prediction for prediction set
rating_predictions = model.transform(ps4)
ps5 = rating_predictions.select("movieId","prediction","features")
# show
ps5.sort('prediction', ascending=False).show(5)

+-------+-----------------+--------------------+
|movieId|       prediction|            features|
+-------+-----------------+--------------------+
|  32892|6.896993514812584|(22,[0,1,2,3,11,2...|
|   2068|6.838854832899267|(22,[0,1,2,3,11,1...|
|    484|6.584029584480806|(22,[0,1,2,3,5,7]...|
|  26171|6.439741589808314|(22,[0,1,2,3,8],[...|
|  25825|6.417288349375511|(22,[0,1,2,3,11,1...|
+-------+-----------------+--------------------+
only showing top 5 rows



In [28]:
# display regression coefficients
print("beta_1  =  " + str(round(model.coefficients[0], 3)) + '       ' + "delta_5 =  " + str(round(model.coefficients[8], 3)) + '       ' + "delta_13 =  " + str(round(model.coefficients[16], 3)))
print("beta_2  = " + str(round(model.coefficients[1], 3)) + '        ' + "delta_6 =  " + str(round(model.coefficients[9], 3)) + '       ' + "delta_14 = " + str(round(model.coefficients[17], 3)))
print("beta_3  =  " + str(round(model.coefficients[2], 3)) + '       ' + "delta_7 =   " + str(round(model.coefficients[10], 3)) + '         ' + "delta_15 = " + str(round(model.coefficients[18], 3)))
print("beta_4  = " + str(round(model.coefficients[3], 3)) + '       ' + "delta_8 =  " + str(round(model.coefficients[11], 3)) + '       ' + "delta_16 = " + str(round(model.coefficients[19], 3)))
print("delta_1 = " + str(round(model.coefficients[4], 3)) + '       ' + "delta_9 =  " + str(round(model.coefficients[12], 3)) + '       ' + "delta_17 = " + str(round(model.coefficients[20], 3)))
print("delta_2 =  " + str(round(model.coefficients[5], 3)) + '       ' + "delta_10 =  " + str(round(model.coefficients[13], 3)) + '       ' + "delta_18 = " + str(round(model.coefficients[21], 3)))
print("delta_3 =  " + str(round(model.coefficients[6], 3)) + '       ' + "delta_11 = " + str(round(model.coefficients[14], 3)))
print("delta_4 =  " + str(round(model.coefficients[7], 3)) + '       ' + "delta_12 = " + str(round(model.coefficients[15], 3)))

beta_1  =  0.919       delta_5 =  -0.107       delta_13 =  0.066
beta_2  = -0.08        delta_6 =  -0.011       delta_14 = -0.173
beta_3  =  0.685       delta_7 =   0.0         delta_15 = -0.2
beta_4  = -0.006       delta_8 =  -0.114       delta_16 = -0.176
delta_1 = -0.007       delta_9 =  -0.144       delta_17 = -0.131
delta_2 =  0.015       delta_10 =  0.277       delta_18 = -0.195
delta_3 =  0.065       delta_11 = -0.915
delta_4 =  0.103       delta_12 = -0.093


In [29]:
# convert mv5 rdd to dataframe
mv17 = mv5.toDF(['movieId', 'title', 'year', 'genres'])
# join with the results
mv18 = mv17.join(ps5, mv17.movieId == ps5.movieId).select("prediction", "title", "year", "genres").sort('prediction', ascending=False)
mv18.show(20, truncate = False)

+-----------------+-----------------------------------------------------------+----+----------------------------+
|prediction       |title                                                      |year|genres                      |
+-----------------+-----------------------------------------------------------+----+----------------------------+
|6.896993514812584|Ivan's Childhood (a.k.a. My Name is Ivan) (Ivanovo detstvo)|1962|Drama|War                   |
|6.838854832899267|Fanny and Alexander (Fanny och Alexander)                  |1982|Drama|Fantasy|Mystery       |
|6.584029584480806|Lassie                                                     |1994|Adventure|Children          |
|6.439741589808314|Play Time (a.k.a. Playtime)                                |1967|Comedy                      |
|6.417288349375511|Fury                                                       |1936|Drama|Film-Noir             |
|6.40235945681551 |"Zed & Two Noughts, A                                      |1985|Dram