[Original post](http://www.learnbymarketing.com/644/recsys-pyspark-als/)

In [1]:
import os
import sys

spark_path = "/Users/flavio.clesio/Documents/spark-2.1.0" 

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path

sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip") # Must be the same version of your Spark Version

In [2]:
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
conf = (SparkConf()
 .setMaster("local")
 .setAppName("My app")
 .set("spark.executor.memory", "1g"))

In [4]:
sc = SparkContext(conf = conf)

In [None]:
sc

In [5]:
ROOT_PATH = ('/Users/flavio.clesio/Downloads/ml-100k')

In [6]:
movielens = sc.textFile(ROOT_PATH + "/u.data")

[Fonte](http://files.grouplens.org/datasets/movielens/ml-100k-README.txt)


u.data     -- The full u data set, 100000 ratings by 943 users on 1682 items.
              Each user has rated at least 20 movies.  Users and items are
              numbered consecutively from 1.  The data is randomly
              ordered. This is a tab separated list of 
	         user id | item id | rating | timestamp. 

In [7]:
movielens.first()

u'196\t242\t3\t881250949'

In [8]:
movielens.count()

100000

In [9]:
#Clean up the data by splitting it
#Movielens readme says the data is split by tabs and
#is user product rating timestamp
clean_data = movielens.map(lambda x:x.split('\t'))

In [10]:
clean_data.take(10)

[[u'196', u'242', u'3', u'881250949'],
 [u'186', u'302', u'3', u'891717742'],
 [u'22', u'377', u'1', u'878887116'],
 [u'244', u'51', u'2', u'880606923'],
 [u'166', u'346', u'1', u'886397596'],
 [u'298', u'474', u'4', u'884182806'],
 [u'115', u'265', u'2', u'881171488'],
 [u'253', u'465', u'5', u'891628467'],
 [u'305', u'451', u'3', u'886324817'],
 [u'6', u'86', u'3', u'883603013']]

In [11]:
#As an example, extract just the ratings to its own RDD
#rate.first() is 3
rate = clean_data.map(lambda y: int(y[2]))

In [12]:
rate.mean() #Avg rating is 3.52986

3.529859999999947

In [13]:
#Extract just the users
users = clean_data.map(lambda y: int(y[0]))

In [14]:
users.distinct().count() #943 users

943

In [15]:
#You don't have to extract data to its own RDD
#This command counts the distinct movies
#There are 1,682 movies
clean_data.map(lambda y: int(y[1])).distinct().count() 

1682

In [16]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark.mllib.recommendation import Rating

In [17]:
#We'll need to map the movielens data to a Ratings object 
#A Ratings object is made up of (user, item, rating)
mls = movielens.map(lambda l: l.split('\t'))

In [18]:
ratings = mls.map(lambda x: Rating(int(x[0]),int(x[1]), float(x[2])))

In [19]:
#Need a training and test set
train, test = ratings.randomSplit([0.8,0.2],7856)

In [20]:
print 'The number of traning instances is:', train.count()

The number of traning instances is: 79909


In [21]:
print 'The number of traning instances is:', test.count() 

The number of traning instances is: 20091


In [22]:
#Need to cache the data to speed up training
train.cache()

PythonRDD[18] at RDD at PythonRDD.scala:48

In [23]:
test.cache()

PythonRDD[19] at RDD at PythonRDD.scala:48

In [24]:
#Setting up the parameters for ALS
rank = 5           # Latent Factors to be made
numIterations = 10 # Times to repeat process

In [25]:
#Create the model on the training data
model = ALS.train(train, rank, numIterations)

In [26]:
#Examine the latent features for one product
model.productFeatures().first()

(1,
 array('d', [0.18627141416072845, 0.35220956802368164, 0.4609048068523407, -2.304429054260254, -0.06592054665088654]))

In [27]:
#Examine the latent features for one user
model.userFeatures().first()

(1,
 array('d', [0.23921744525432587, 0.4624086618423462, 1.2235586643218994, -1.3425638675689697, 0.19017109274864197]))

In [28]:
# For Product X, Find N Users to Sell To
model.recommendUsers(242,100)

[Rating(user=169, product=242, rating=5.906993115535968),
 Rating(user=97, product=242, rating=5.8877262624830475),
 Rating(user=443, product=242, rating=5.615114079579772),
 Rating(user=895, product=242, rating=5.490619320476549),
 Rating(user=353, product=242, rating=5.388828767130553),
 Rating(user=98, product=242, rating=5.357527553816799),
 Rating(user=4, product=242, rating=5.271224238730753),
 Rating(user=34, product=242, rating=5.237695136852757),
 Rating(user=583, product=242, rating=5.20319254876104),
 Rating(user=511, product=242, rating=5.176521175388039),
 Rating(user=22, product=242, rating=5.173710902830484),
 Rating(user=270, product=242, rating=5.172565262002639),
 Rating(user=165, product=242, rating=5.162151298220143),
 Rating(user=770, product=242, rating=5.157765735022165),
 Rating(user=240, product=242, rating=5.1574108425414895),
 Rating(user=888, product=242, rating=5.151970502353445),
 Rating(user=180, product=242, rating=5.1426624031315376),
 Rating(user=842, 

In [29]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)

[Rating(user=196, product=593, rating=6.984630282120642),
 Rating(user=196, product=1664, rating=6.325721017278127),
 Rating(user=196, product=361, rating=5.85662119959493),
 Rating(user=196, product=867, rating=5.751188326142186),
 Rating(user=196, product=1426, rating=5.739967376648849),
 Rating(user=196, product=1207, rating=5.650164178615768),
 Rating(user=196, product=1166, rating=5.444729524362961),
 Rating(user=196, product=1155, rating=5.44236415401739),
 Rating(user=196, product=1643, rating=5.363915512556903),
 Rating(user=196, product=1594, rating=5.271297493439602)]

In [30]:
#Predict Single Product for Single User
model.predict(196, 242)

3.8662024781956785

In [31]:
# Predict Multi Users and Multi Products
# Pre-Processing
pred_input = train.map(lambda x:(x[0],x[1]))   

In [32]:
# Lots of Predictions
#Returns Ratings(user, item, prediction)
pred = model.predictAll(pred_input) 

In [33]:
#Get Performance Estimate
#Organize the data to make (user, product) the key)
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))

In [34]:
#Do the actual join
true_pred = true_reorg.join(pred_reorg)

In [35]:
#Need to be able to square root the Mean-Squared Error
from math import sqrt

In [36]:
MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)#Results in 0.7629908117414474

In [37]:
#Test Set Evaluation
#More dense, but nothing we haven't done before
test_input = test.map(lambda x:(x[0],x[1])) 
pred_test = model.predictAll(test_input)
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))
test_pred = test_reorg.join(pred_reorg)
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
test_RMSE = sqrt(test_MSE)#1.0145549956596238

In [38]:
#If you're happy, save your model!
#model.save(sc,ROOT_PATH + "/ml-model")
#sameModel = MatrixFactorizationModel.load(sc, ROOT_PATH + "/ml-model)


In [43]:
RMSE

0.772560452603972

- [Alternating Least Squares for Low-Rank Matrix Reconstruction](https://arxiv.org/pdf/1206.2493.pdf)
- [Amazon.com Recommendations Item-to-Item Collaborative Filtering](http://www.cin.ufpe.br/~idal/rs/Amazon-Recommendations.pdf)
- [Matrix Factorization Techniques for Recommender Systems](https://datajobs.com/data-science-repo/Recommender-Systems-%5BNetflix%5D.pdf)
- [Collaborative Filtering for Implicit Datasets](http://yifanhu.net/PUB/cf.pdf)
- [Contextual Recommendation (long-term and short-term memory for recos)](http://eprints.dcs.warwick.ac.uk/676/1/am-contextual-recommendation-07.pdf)
- [Matrix Factorization: A Simple Tutorial and Implementation in Python](http://www.quuxlabs.com/blog/2010/09/matrix-factorization-a-simple-tutorial-and-implementation-in-python/)
- [Collaborative Filtering with Spark](https://www.slideshare.net/MrChrisJohnson/collaborative-filtering-with-spark)
