In [1]:
import os
import sys

os.chdir("C:/dataanalytics/python")
os.curdir

#Configure the environment . Set this up to the directory where spark is installed
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = 'C:\\spark'
    
#create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exists. The names might change as
#versions change
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.6-src.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
 
#Initialize a spark context
from pyspark import SparkContext
from pyspark import SparkConf

#optionally configure spark
conf = (SparkConf().setAppName("ALS").setMaster("local[2]").set("spark.executor.memory", "1g"))

#Initalize spark context onl runs once
sc = SparkContext(conf=conf)

In [2]:
#load the data file in ALS Format
ratingsData = sc.textFile(r'C:\Users\jeffnerd\Desktop\spark\UserItemData.txt')

In [3]:
ratingsData.collect()

['1001,9001,10',
 '1001,9002,1',
 '1001,9003,9',
 '1002,9001,3',
 '1002,9002,5',
 '1002,9003,1',
 '1002,9004,10',
 '1003,9001,2',
 '1003,9002,6',
 '1003,9003,2',
 '1003,9004,9',
 '1003,9005,10',
 '1003,9006,8',
 '1003,9007,9',
 '1004,9001,9',
 '1004,9002,2',
 '1004,9003,8',
 '1004,9004,3',
 '1004,9010,10',
 '1004,9011,9',
 '1004,9012,8',
 '1005,9001,8',
 '1005,9002,3',
 '1005,9003,7',
 '1005,9004,1',
 '1005,9010,9',
 '1005,9011,10',
 '1005,9012,9',
 '1005,9013,8',
 '1005,9014,1',
 '1005,9015,1',
 '1006,9001,7',
 '1006,9002,4',
 '1006,9003,8',
 '1006,9004,1',
 '1006,9010,7',
 '1006,9011,6',
 '1006,9012,9']

In [4]:
#convert the strings into a proper vectors
ratingVector = ratingsData.map(lambda l: l.split(','))\
               .map(lambda l: (int(l[0]), int(l[1]), float(l[2])))

In [2]:
from pyspark.sql import DataFrame, SparkSession,Row

In [3]:
spark = SparkSession.builder.appName("ALS").master("local").config(conf=conf).getOrCreate()

In [7]:
#building a dataframe
ratingsDf = spark.createDataFrame(ratingVector, ["user", "item", "rating"])

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
#building the ALS model
from pyspark.ml.recommendation import ALS

In [20]:
als = ALS(maxIter=5,rank=10,userCol="user",itemCol="item",ratingCol="rating")
model = als.fit(ratingsDf)

In [21]:
model.userFactors.orderBy("id").collect()

[Row(id=1001, features=[0.19716785848140717, 0.7893197536468506, 0.08463260531425476, -0.09283218532800674, -0.11196012794971466, -0.20144282281398773, -0.18035586178302765, 1.4863395690917969, 0.27391910552978516, -0.5960622429847717]),
 Row(id=1002, features=[-0.7428120374679565, 1.3925304412841797, 0.14526265859603882, 0.26212969422340393, 0.7419789433479309, 0.3453725278377533, 0.02979317493736744, -0.3764108419418335, -0.1941036731004715, 0.5657660961151123]),
 Row(id=1003, features=[-1.0883636474609375, 0.8108349442481995, -0.08470693230628967, 0.058173540979623795, 0.5069043040275574, -0.08221159130334854, 0.14106173813343048, -0.22677047550678253, -0.11497081816196442, 0.8432344794273376]),
 Row(id=1004, features=[-0.17139729857444763, 1.1740148067474365, 0.01912745088338852, 0.16626419126987457, -0.1592596173286438, 0.05003264918923378, -0.25829216837882996, 1.049270510673523, 0.31132468581199646, -0.3208717107772827]),
 Row(id=1005, features=[0.025568177923560143, 0.529352188

In [23]:
#creae a test data set
testDf = spark.createDataFrame([(1001, 9003),(1001,9004),(1001,9005)], ["user","item"])

In [26]:
#predict
predictions = sorted(model.transform(testDf).collect())
                      

In [27]:
predictions

[Row(user=1001, item=9003, prediction=9.012858390808105),
 Row(user=1001, item=9004, prediction=-0.6469923257827759),
 Row(user=1001, item=9005, prediction=-1.9525516033172607)]

In [28]:
#predict
predictions1 = sorted(model.transform(testDf).collect(), key = lambda r: r[0])
                      

In [29]:
predictions1

[Row(user=1001, item=9004, prediction=-0.6469923257827759),
 Row(user=1001, item=9005, prediction=-1.9525516033172607),
 Row(user=1001, item=9003, prediction=9.012858390808105)]

In [8]:
#recommender system two
lines = spark.read.text(r'C:\spark\data\mllib\als\sample_movielens_ratings.txt').rdd

In [9]:
lines.collect()

[Row(value='0::2::3::1424380312'),
 Row(value='0::3::1::1424380312'),
 Row(value='0::5::2::1424380312'),
 Row(value='0::9::4::1424380312'),
 Row(value='0::11::1::1424380312'),
 Row(value='0::12::2::1424380312'),
 Row(value='0::15::1::1424380312'),
 Row(value='0::17::1::1424380312'),
 Row(value='0::19::1::1424380312'),
 Row(value='0::21::1::1424380312'),
 Row(value='0::23::1::1424380312'),
 Row(value='0::26::3::1424380312'),
 Row(value='0::27::1::1424380312'),
 Row(value='0::28::1::1424380312'),
 Row(value='0::29::1::1424380312'),
 Row(value='0::30::1::1424380312'),
 Row(value='0::31::1::1424380312'),
 Row(value='0::34::1::1424380312'),
 Row(value='0::37::1::1424380312'),
 Row(value='0::41::2::1424380312'),
 Row(value='0::44::1::1424380312'),
 Row(value='0::45::2::1424380312'),
 Row(value='0::46::1::1424380312'),
 Row(value='0::47::1::1424380312'),
 Row(value='0::48::1::1424380312'),
 Row(value='0::50::1::1424380312'),
 Row(value='0::51::1::1424380312'),
 Row(value='0::54::1::1424380312

In [10]:
parts = lines.map(lambda row: row.value.split("::"))

In [14]:
ratingsRDD = parts.map(lambda p: Row(userid = int(p[0]), movieid = int(p[1]), rating = float(p[2]),\
                                    timestamp=int(p[3])))

In [15]:
ratings = spark.createDataFrame(ratingsRDD)

In [16]:
ratings.select("userid","movieid","rating","timestamp").show(10)

+------+-------+------+----------+
|userid|movieid|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
+------+-------+------+----------+
only showing top 10 rows



In [18]:
#splitting this data
(training, test) = ratings.randomSplit([0.8,0.2])

In [19]:
#building the ALS model and setting drop set cold strategy to drop to ensure that we dont get NaN in evaluate metrics
als = ALS(maxIter=5, regParam=0.01,userCol="userid", itemCol="movieid",ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

In [20]:
#predicting this data
predictions = model.transform(test)

In [21]:
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root mean square error = " + str(rmse))

Root mean square error = 1.7732135500976252


In [22]:
#generate the top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
##generate the top 10 movie recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [26]:
print(movieRecs)

DataFrame[movieid: int, recommendations: array<struct<userid:int,rating:float>>]


In [30]:
#generate top 10 movie recommendations for a soecified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [32]:
print(userSubsetRecs)

DataFrame[userid: int, recommendations: array<struct<movieid:int,rating:float>>]


In [33]:
#generate top 10 movie recommendations for a soecified set of users
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRec = model.recommendForItemSubset(movies, 10)

In [34]:
userSubsetRecs.show()

+------+--------------------+
|userid|     recommendations|
+------+--------------------+
|    26|[[46, 6.353371], ...|
|    19|[[92, 5.135188], ...|
|    29|[[46, 4.9106455],...|
+------+--------------------+



In [35]:
movieSubSetRec.show()

+-------+--------------------+
|movieid|     recommendations|
+-------+--------------------+
|     65|[[23, 4.7517166],...|
|     26|[[14, 2.412052], ...|
|     29|[[7, 5.3741646], ...|
+-------+--------------------+



In [36]:
userRecs.show()

+------+--------------------+
|userid|     recommendations|
+------+--------------------+
|    28|[[12, 4.930484], ...|
|    26|[[46, 6.353371], ...|
|    27|[[49, 4.500244], ...|
|    12|[[48, 5.528244], ...|
|    22|[[51, 5.29352], [...|
|     1|[[22, 3.837206], ...|
|    13|[[93, 3.8170595],...|
|     6|[[25, 4.9474735],...|
|    16|[[90, 5.005152], ...|
|     3|[[30, 6.948371], ...|
|    20|[[22, 4.6876707],...|
|     5|[[46, 4.8820963],...|
|    19|[[92, 5.135188], ...|
|    15|[[92, 3.1816573],...|
|    17|[[83, 6.996744], ...|
|     9|[[27, 4.8569717],...|
|     4|[[90, 4.79123], [...|
|     8|[[29, 5.198118], ...|
|    23|[[32, 5.0998974],...|
|     7|[[29, 5.3741646],...|
+------+--------------------+
only showing top 20 rows



In [37]:
movieRecs.show()

+-------+--------------------+
|movieid|     recommendations|
+-------+--------------------+
|     31|[[23, 4.4718785],...|
|     85|[[16, 4.779093], ...|
|     65|[[23, 4.7517166],...|
|     53|[[21, 4.5583425],...|
|     78|[[23, 1.2490251],...|
|     34|[[19, 4.623765], ...|
|     81|[[28, 4.871212], ...|
|     28|[[18, 4.9915004],...|
|     76|[[14, 5.045039], ...|
|     26|[[14, 2.412052], ...|
|     27|[[9, 4.8569717], ...|
|     44|[[18, 3.844892], ...|
|     12|[[28, 4.930484], ...|
|     91|[[21, 4.1610866],...|
|     22|[[22, 5.0119066],...|
|     93|[[2, 5.0625925], ...|
|     47|[[23, 4.4135914],...|
|      1|[[11, 4.6125965],...|
|     52|[[14, 5.043913], ...|
|     13|[[3, 4.606368], [...|
+-------+--------------------+
only showing top 20 rows

