In [16]:
# libraries
from pyspark.sql import SparkSession,Row,functions
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit


spark = SparkSession.builder.appName('movierecommendation').getOrCreate()

In [27]:
# moviedict - movieid:moviename
moviedict = {}
with open('ml-100k/u.item',encoding='ISO-8859-1') as f: 
    for line in f:     
        fields = line.split('|')    
        #print(fields[1])
        moviedict[int(fields[0])] = fields[1]                                             

In [28]:
moviedict

{1: 'Toy Story (1995)',
 2: 'GoldenEye (1995)',
 3: 'Four Rooms (1995)',
 4: 'Get Shorty (1995)',
 5: 'Copycat (1995)',
 6: 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 7: 'Twelve Monkeys (1995)',
 8: 'Babe (1995)',
 9: 'Dead Man Walking (1995)',
 10: 'Richard III (1995)',
 11: 'Seven (Se7en) (1995)',
 12: 'Usual Suspects, The (1995)',
 13: 'Mighty Aphrodite (1995)',
 14: 'Postino, Il (1994)',
 15: "Mr. Holland's Opus (1995)",
 16: 'French Twist (Gazon maudit) (1995)',
 17: 'From Dusk Till Dawn (1996)',
 18: 'White Balloon, The (1995)',
 19: "Antonia's Line (1995)",
 20: 'Angels and Insects (1995)',
 21: 'Muppet Treasure Island (1996)',
 22: 'Braveheart (1995)',
 23: 'Taxi Driver (1976)',
 24: 'Rumble in the Bronx (1995)',
 25: 'Birdcage, The (1996)',
 26: 'Brothers McMullen, The (1995)',
 27: 'Bad Boys (1995)',
 28: 'Apollo 13 (1995)',
 29: 'Batman Forever (1995)',
 30: 'Belle de jour (1967)',
 31: 'Crimson Tide (1995)',
 32: 'Crumb (1994)',
 33: 'Desperado (1995)',
 34: '

In [29]:
# userid movieid rating timestamp
moviedata = spark.sparkContext.textFile('ml-100k/u.data')

In [30]:
moviedata.collect()

['0\t50\t5\t881250949',
 '0\t172\t5\t881250949',
 '0\t133\t1\t881250949',
 '196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013',
 '62\t257\t2\t879372434',
 '286\t1014\t5\t879781125',
 '200\t222\t5\t876042340',
 '210\t40\t3\t891035994',
 '224\t29\t3\t888104457',
 '303\t785\t3\t879485318',
 '122\t387\t5\t879270459',
 '194\t274\t2\t879539794',
 '291\t1042\t4\t874834944',
 '234\t1184\t2\t892079237',
 '119\t392\t4\t886176814',
 '167\t486\t4\t892738452',
 '299\t144\t4\t877881320',
 '291\t118\t2\t874833878',
 '308\t1\t4\t887736532',
 '95\t546\t2\t879196566',
 '38\t95\t5\t892430094',
 '102\t768\t2\t883748450',
 '63\t277\t4\t875747401',
 '160\t234\t5\t876861185',
 '50\t246\t3\t877052329',
 '301\t98\t4\t882075827',
 '225\t193\t4\t879539727',
 '290\t88\t4\t880731963',
 '97\t194\t3\t88423886

In [77]:
def parsemoviedata(line):
    
    fields = line.split()
    return Row(userid = int(fields[0]),movieid = int(fields[1]),rating = float(fields[2]))

In [78]:
movieparsed = moviedata.map(parsemoviedata)

In [79]:
movieparsed.take(5)

[Row(movieid=50, rating=5.0, userid=0),
 Row(movieid=172, rating=5.0, userid=0),
 Row(movieid=133, rating=1.0, userid=0),
 Row(movieid=242, rating=3.0, userid=196),
 Row(movieid=302, rating=3.0, userid=186)]

In [80]:
df = spark.createDataFrame(movieparsed)

In [81]:
type(df)

pyspark.sql.dataframe.DataFrame

In [82]:
als = ALS(maxIter=5,regParam=0.01,userCol='userid',itemCol='movieid',ratingCol='rating')

In [91]:
model = als.fit(df)

In [98]:
moviecount = df.groupBy('movieid').count().filter('count>100')
popularmovies = moviecount.select('movieid').withColumn('userid',lit(0))

In [99]:
moviecount.collect()

[Row(movieid=474, count=194),
 Row(movieid=29, count=114),
 Row(movieid=65, count=115),
 Row(movieid=191, count=276),
 Row(movieid=418, count=129),
 Row(movieid=222, count=365),
 Row(movieid=293, count=147),
 Row(movieid=270, count=136),
 Row(movieid=367, count=170),
 Row(movieid=705, count=137),
 Row(movieid=243, count=132),
 Row(movieid=54, count=104),
 Row(movieid=926, count=101),
 Row(movieid=385, count=208),
 Row(movieid=241, count=128),
 Row(movieid=237, count=384),
 Row(movieid=347, count=137),
 Row(movieid=588, count=202),
 Row(movieid=198, count=127),
 Row(movieid=22, count=297),
 Row(movieid=196, count=251),
 Row(movieid=427, count=219),
 Row(movieid=77, count=151),
 Row(movieid=7, count=392),
 Row(movieid=184, count=116),
 Row(movieid=188, count=170),
 Row(movieid=274, count=190),
 Row(movieid=202, count=280),
 Row(movieid=228, count=244),
 Row(movieid=325, count=128),
 Row(movieid=50, count=584),
 Row(movieid=94, count=137),
 Row(movieid=421, count=106),
 Row(movieid=264, c

In [100]:
popularmovies.collect()

[Row(movieid=474, userid=0),
 Row(movieid=29, userid=0),
 Row(movieid=65, userid=0),
 Row(movieid=191, userid=0),
 Row(movieid=418, userid=0),
 Row(movieid=222, userid=0),
 Row(movieid=293, userid=0),
 Row(movieid=270, userid=0),
 Row(movieid=367, userid=0),
 Row(movieid=705, userid=0),
 Row(movieid=243, userid=0),
 Row(movieid=54, userid=0),
 Row(movieid=926, userid=0),
 Row(movieid=385, userid=0),
 Row(movieid=241, userid=0),
 Row(movieid=237, userid=0),
 Row(movieid=347, userid=0),
 Row(movieid=588, userid=0),
 Row(movieid=198, userid=0),
 Row(movieid=22, userid=0),
 Row(movieid=196, userid=0),
 Row(movieid=427, userid=0),
 Row(movieid=77, userid=0),
 Row(movieid=7, userid=0),
 Row(movieid=184, userid=0),
 Row(movieid=188, userid=0),
 Row(movieid=274, userid=0),
 Row(movieid=202, userid=0),
 Row(movieid=228, userid=0),
 Row(movieid=325, userid=0),
 Row(movieid=50, userid=0),
 Row(movieid=94, userid=0),
 Row(movieid=421, userid=0),
 Row(movieid=264, userid=0),
 Row(movieid=229, useri

In [101]:
recommendation = model.transform(popularmovies)

In [102]:
# to remove cartesian product error
spark.conf.set('spark.sql.crossJoin.enabled','true')

top10 = recommendation.sort(recommendation.prediction.desc()).take(10)

In [103]:
top10

[Row(movieid=184, userid=0, prediction=5.876082420349121),
 Row(movieid=228, userid=0, prediction=5.709455490112305),
 Row(movieid=169, userid=0, prediction=5.290432453155518),
 Row(movieid=455, userid=0, prediction=5.211927890777588),
 Row(movieid=173, userid=0, prediction=5.2040934562683105),
 Row(movieid=96, userid=0, prediction=5.125892162322998),
 Row(movieid=431, userid=0, prediction=5.011352062225342),
 Row(movieid=50, userid=0, prediction=4.980900764465332),
 Row(movieid=172, userid=0, prediction=4.972667694091797),
 Row(movieid=174, userid=0, prediction=4.941874027252197)]

In [104]:
for recommend in top10:
    print(moviedict[recommend['movieid']],recommend['prediction'])

Army of Darkness (1993) 5.876082420349121
Star Trek: The Wrath of Khan (1982) 5.709455490112305
Wrong Trousers, The (1993) 5.290432453155518
Jackie Chan's First Strike (1996) 5.211927890777588
Princess Bride, The (1987) 5.2040934562683105
Terminator 2: Judgment Day (1991) 5.125892162322998
Highlander (1986) 5.011352062225342
Star Wars (1977) 4.980900764465332
Empire Strikes Back, The (1980) 4.972667694091797
Raiders of the Lost Ark (1981) 4.941874027252197
