In [1]:
import findspark
import pyspark
from pyspark import SparkContext
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS
findspark.init('/home/ubuntu/spark')

## Data Preparation

In [4]:
# check the spark mode 
# if it runs in 'local' mode, read the data directly from this EC2 instance
# otherwise, read data from HDFS
global path
if sc.master[0:5]=="local":
    path="file:/home/ubuntu/pythonwork/pythonproject"
else:
    path="hdfs://master:9000/user/hduser"

In [8]:
sc = SparkContext()
rawUserData = sc.textFile(path + "/data/u.data")

In [11]:
rawUserData.take(5)

[u'196\t242\t3\t881250949',
 u'186\t302\t3\t891717742',
 u'22\t377\t1\t878887116',
 u'244\t51\t2\t880606923',
 u'166\t346\t1\t886397596']

In [13]:
# take the first 3 columns (user_id,movie_id,rating)
rawRatings = rawUserData.map(lambda line: line.split("\t")[:3] )
rawRatings.take(5)

[[u'196', u'242', u'3'],
 [u'186', u'302', u'3'],
 [u'22', u'377', u'1'],
 [u'244', u'51', u'2'],
 [u'166', u'346', u'1']]

In [14]:
ratingsRDD = rawRatings.map(lambda x: (x[0],x[1],x[2]))
ratingsRDD .take(5)

[(u'196', u'242', u'3'),
 (u'186', u'302', u'3'),
 (u'22', u'377', u'1'),
 (u'244', u'51', u'2'),
 (u'166', u'346', u'1')]

In [15]:
numRatings = ratingsRDD.count()
numRatings

100000

In [16]:
numUsers = ratingsRDD.map(lambda x: x[0] ).distinct().count()
numUsers 

943

In [17]:
numMovies = ratingsRDD.map(lambda x: x[1]).distinct().count() 
numMovies

1682

In [18]:
ratingsRDD.persist()

PythonRDD[21] at RDD at PythonRDD.scala:48

## Train Model

In [20]:
model = ALS.train(ratingsRDD, 10, 10, 0.01)

## Apply Model for Recommendation

In [21]:
model.recommendProducts(100,5)

[Rating(user=100, product=1512, rating=6.067130785092368),
 Rating(user=100, product=1172, rating=5.818270834736916),
 Rating(user=100, product=1209, rating=5.384422489457628),
 Rating(user=100, product=1169, rating=5.275450994850957),
 Rating(user=100, product=667, rating=5.112158585255072)]

In [22]:
model.predict(100, 1141)

2.953753659361607

In [23]:
model.recommendUsers(product=200,num=5)

[Rating(user=688, product=200, rating=6.726187820590676),
 Rating(user=252, product=200, rating=6.1515887260656905),
 Rating(user=362, product=200, rating=6.119920461428741),
 Rating(user=811, product=200, rating=5.957379123533007),
 Rating(user=696, product=200, rating=5.928632589629215)]

## Show Names of Movies

In [26]:
itemRDD = sc.textFile(path+"/data/u.item")
itemRDD.count()

1682

In [27]:
movieTitle= itemRDD.map( lambda line : line.split("|")).map(lambda a: (float(a[0]),a[1])).collectAsMap()
len(movieTitle)

1682

In [28]:
movieTitle.items()[:5]

[(1.0, u'Toy Story (1995)'),
 (2.0, u'GoldenEye (1995)'),
 (3.0, u'Four Rooms (1995)'),
 (4.0, u'Get Shorty (1995)'),
 (5.0, u'Copycat (1995)')]

In [29]:
for i in range(1,6): 
    print str(i)+":"+movieTitle[i]

1:Toy Story (1995)
2:GoldenEye (1995)
3:Four Rooms (1995)
4:Get Shorty (1995)
5:Copycat (1995)


In [34]:
recommendP= model.recommendProducts(100,5) 
for p in recommendP:
    print  "for user "+ str(p[0]) + \
           "recommend ' "+ str(movieTitle[p[1]]) + \
           "' , recommendation_score : "+ str(p[2])         

for user 100 recommend ' World of Apu, The (Apur Sansar) (1959)' , recommendation_score : 6.06713078509
for user 100 recommend ' Women, The (1939)' , recommendation_score : 5.81827083474
for user 100 recommend ' Mixed Nuts (1994)' , recommendation_score : 5.38442248946
for user 100 recommend ' Fresh (1994)' , recommendation_score : 5.27545099485
for user 100 recommend ' Audrey Rose (1977)' , recommendation_score : 5.11215858526
