## Import modules and create Spark session


In [1]:
#import module
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
#create session
appName = "Recommender system in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Read file into dataFrame


5.	Load the u.data, and u.user files into Apache Spark as DataFrames named df_udata and df_uuser

In [2]:
#read file into dataFrame using automatically inferred schema
df_udata = spark.read.csv('D:/New folder (3)/movielens/u_data.csv', inferSchema=True, header=True)
df_uuser = spark.read.csv('D:/New folder (3)/movielens/u_user.csv', inferSchema=True, header=True)
df_uuser = df_uuser.selectExpr('user_id as userid','age','gender','occupation','zip_code')

In [3]:
df_udata.show(2)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|    196|    242|     3|881250949|
|    186|    302|     3|891717742|
+-------+-------+------+---------+
only showing top 2 rows



In [4]:
df_udata.describe()

DataFrame[summary: string, user_id: string, item_id: string, rating: string, timestamp: string]

In [5]:
df_uuser.show(2)

+------+---+------+----------+--------+
|userid|age|gender|occupation|zip_code|
+------+---+------+----------+--------+
|     1| 24|     M|technician|   85711|
|     2| 53|     F|     other|   94043|
+------+---+------+----------+--------+
only showing top 2 rows



a.	How many unique occupations

In [6]:
df_uuser.select("occupation");

unique= df_uuser.select('occupation').distinct()

unique.orderBy('occupation').show(100)
unique.orderBy('occupation').count()

+-------------+
|   occupation|
+-------------+
|administrator|
|       artist|
|       doctor|
|     educator|
|     engineer|
|entertainment|
|    executive|
|   healthcare|
|    homemaker|
|       lawyer|
|    librarian|
|    marketing|
|         none|
|        other|
|   programmer|
|      retired|
|     salesman|
|    scientist|
|      student|
|   technician|
|       writer|
+-------------+



21

what is the frequency of each occupation?

In [7]:
freq = df_uuser.groupBy('occupation').count()
freq = freq.selectExpr('occupation as Occupation', 'count as Frequency')
freq = freq.orderBy('Frequency').show(100)

+-------------+---------+
|   Occupation|Frequency|
+-------------+---------+
|    homemaker|        7|
|       doctor|        7|
|         none|        9|
|       lawyer|       12|
|     salesman|       12|
|      retired|       14|
|   healthcare|       16|
|entertainment|       18|
|    marketing|       26|
|   technician|       27|
|       artist|       28|
|    scientist|       31|
|    executive|       32|
|       writer|       45|
|    librarian|       51|
|   programmer|       66|
|     engineer|       67|
|administrator|       79|
|     educator|       95|
|        other|      105|
|      student|      196|
+-------------+---------+



# Find the number of recommendations corresponding to each occupation. 

In [8]:
Recommendation = df_udata.join(df_uuser,df_udata.user_id == df_uuser.userid)
Recommendation = Recommendation.select('user_id','item_id','gender','age','rating','occupation','zip_code')
Recommendation.describe()

DataFrame[summary: string, user_id: string, item_id: string, gender: string, age: string, rating: string, occupation: string, zip_code: string]

## Data preparation

In [9]:
#use only column data of "userId", "movieId", dan "rating"
data = Recommendation.select("user_id", "item_id", "rating")

#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")

#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, ", number of testing data rows:", test_rows)

number of training data rows: 69818 , number of testing data rows: 30182


## Define model and train it

In [10]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01, userCol="user_id",itemCol="item_id", ratingCol="label")
#train our ALS model
model = als.fit(train)
model =model.setColdStartStrategy("drop");
print("Training is done!")

Training is done!


## Predict testing data

In [11]:
prediction = model.transform(test)
# prediction.show()
# prediction = predictions.selectExpr("user_id as userid","item_id","trueLable","predictions")
# print("testing is done!")
# prediction = prediction.selectExpr('user_id as id','item_id','trueLabel','prediction').show()

In [12]:
prediction.join(df_udata, "item_id")#.show(n=5, truncate=False)

# .select("user_id", "occupation", "gender").show(n=10, truncate=False)


DataFrame[item_id: int, user_id: int, trueLabel: int, prediction: float, user_id: int, rating: int, timestamp: int]

## Evaluate the accuracy of our model

In [13]:
#import RegressionEvaluator since we also want to calculate RMSE (Root Mean Square Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
model =model.setColdStartStrategy("drop")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 1.1414492988449596


In [14]:
prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  30123
number of rows after dropping data with missing value:  30123
number of missing data:  0


In [15]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 1.1414492988449607


In [16]:
user1 = test.filter(test['user_id'] == 1).select(['user_id','item_id'])

In [17]:
rec = model.transform(user1)
rec.orderBy('prediction',ascending = False).show(10)

+-------+-------+----------+
|user_id|item_id|prediction|
+-------+-------+----------+
|      1|     53| 5.7811384|
|      1|    152| 5.2109337|
|      1|    250| 5.0954776|
|      1|    168|   5.08208|
|      1|     92| 5.0252438|
|      1|    224|  4.956689|
|      1|    115| 4.7802987|
|      1|    156|  4.759315|
|      1|     59|  4.702451|
|      1|    268|  4.625274|
+-------+-------+----------+
only showing top 10 rows



In [18]:
UserRecommendation = model.recommendForAllUsers(5)



In [19]:
# UserRecommendation.show();

In [20]:
UserRecommendation.printSchema()

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [21]:
UserRecommendationfinal=UserRecommendation.selectExpr('user_id as user_id','recommendations.item_id as Recommended_Movies')
# .show(30,False)
UserRecommendationfinal.describe();


In [22]:
df_uuser= df_uuser.selectExpr('userid AS user_id','occupation','age')

In [23]:
final = (df_uuser.join(UserRecommendationfinal,['user_id'],'inner'))

# .selectExpr('user_id','recommendations.item_id as Recommended_Movies').show(30,False)

In [35]:
final.selectExpr('user_id','occupation','Recommended_Movies as Recommended_Movies_IDs').show(30,False)

+-------+-------------+------------------------------+
|user_id|occupation   |Recommended_Movies_IDs        |
+-------+-------------+------------------------------+
|1      |technician   |[880, 1093, 1184, 1069, 1311] |
|3      |writer       |[6, 953, 1311, 458, 695]      |
|5      |other        |[253, 1094, 899, 1589, 834]   |
|6      |executive    |[1368, 1204, 1643, 320, 865]  |
|12     |other        |[1159, 960, 1456, 915, 502]   |
|13     |educator     |[574, 974, 557, 837, 955]     |
|16     |entertainment|[865, 630, 1107, 530, 1066]   |
|19     |librarian    |[601, 1204, 497, 493, 644]    |
|20     |homemaker    |[989, 1297, 580, 1213, 1389]  |
|22     |writer       |[989, 422, 130, 1085, 1120]   |
|26     |engineer     |[1643, 1598, 64, 1169, 605]   |
|27     |librarian    |[962, 634, 74, 276, 6]        |
|28     |writer       |[1589, 1169, 1269, 1483, 1218]|
|31     |artist       |[1205, 1005, 916, 1065, 962]  |
|34     |administrator|[1643, 1176, 865, 1166, 1134] |
|40     |s

In [36]:
 from pyspark.sql.functions import *
# final2 = final.withColumn("col5", final["Recommended_Movies"].getItem(1)).withColumn("Recommended_Movies", final["Recommended_Movies"].getItem(0))
final3 = final.withColumn("colcounter", size(final["Recommended_Movies"]))
final3.printSchema()
final3.show()

root
 |-- user_id: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Recommended_Movies: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- colcounter: integer (nullable = false)

+-------+-------------+---+--------------------+----------+
|user_id|   occupation|age|  Recommended_Movies|colcounter|
+-------+-------------+---+--------------------+----------+
|      1|   technician| 24|[880, 1093, 1184,...|         5|
|      3|       writer| 23|[6, 953, 1311, 45...|         5|
|      5|        other| 33|[253, 1094, 899, ...|         5|
|      6|    executive| 42|[1368, 1204, 1643...|         5|
|     12|        other| 28|[1159, 960, 1456,...|         5|
|     13|     educator| 47|[574, 974, 557, 8...|         5|
|     16|entertainment| 21|[865, 630, 1107, ...|         5|
|     19|    librarian| 40|[601, 1204, 497, ...|         5|
|     20|    homemaker| 42|[989, 1297, 580, ...|         5|
|     22|     

In [38]:
final3 = final3.groupBy("occupation").agg(count("colcounter").alias('totalNumberOfMovies'))
# .alias("totalNumberOfRecommendadMovies")


AnalysisException: cannot resolve 'colcounter' given input columns: [occupation, totalNumberOfMovies];
'Aggregate [occupation#43], [occupation#43, count('colcounter) AS totalNumberOfMovies#4155]
+- Aggregate [occupation#43], [occupation#43, count(colcounter#3692) AS totalNumberOfMovies#4136L]
   +- Project [user_id#1888, occupation#43, age#41, Recommended_Movies#1763, size(Recommended_Movies#1763, true) AS colcounter#3692]
      +- Project [user_id#1888, occupation#43, age#41, Recommended_Movies#1763]
         +- Join Inner, (user_id#1888 = user_id#1762)
            :- Project [userid#50 AS user_id#1888, occupation#43, age#41]
            :  +- Project [user_id#40 AS userid#50, age#41, gender#42, occupation#43, zip_code#44]
            :     +- Relation [user_id#40,age#41,gender#42,occupation#43,zip_code#44] csv
            +- Project [user_id#1758 AS user_id#1762, recommendations#1759.item_id AS Recommended_Movies#1763]
               +- Project [id#1754 AS user_id#1758, cast(recommendations#1755 as array<struct<item_id:int,rating:float>>) AS recommendations#1759]
                  +- Project [key#1748 AS id#1754, TopByKeyAggregator(scala.Tuple3)#1753 AS recommendations#1755]
                     +- Aggregate [value#1740], [value#1740 AS key#1748, topbykeyaggregator(org.apache.spark.ml.recommendation.TopByKeyAggregator@358934b9, Some(newInstance(class scala.Tuple3)), Some(class scala.Tuple3), Some(StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false), StructField(_3,FloatType,false))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], org.apache.spark.util.BoundedPriorityQueue, true), mapobjects(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 37), if (isnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 37))) null else named_struct(_1, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 37))._1, _2, knownnotnull(lambdavariable(MapObject, ObjectType(class scala.Tuple2), true, 37))._2), input[0, [Lscala.Tuple2;, true], None), ArrayType(StructType(StructField(_1,IntegerType,false), StructField(_2,FloatType,false)),true), true, 0, 0) AS TopByKeyAggregator(scala.Tuple3)#1753]
                        +- AppendColumns org.apache.spark.ml.recommendation.ALSModel$$Lambda$4180/1069984848@33e6f3fa, class scala.Tuple3, [StructField(_1,IntegerType,false), StructField(_2,IntegerType,false), StructField(_3,FloatType,false)], newInstance(class scala.Tuple3), [input[0, int, false] AS value#1740]
                           +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple3, true]))._1 AS _1#1729, knownnotnull(assertnotnull(input[0, scala.Tuple3, true]))._2 AS _2#1730, knownnotnull(assertnotnull(input[0, scala.Tuple3, true]))._3 AS _3#1731]
                              +- MapPartitions org.apache.spark.ml.recommendation.ALSModel$$Lambda$4178/290297471@16c2b579, obj#1728: scala.Tuple3
                                 +- DeserializeToObject newInstance(class scala.Tuple4), obj#1727: scala.Tuple4
                                    +- Join Cross
                                       :- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(IntegerType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#1698, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#1699]
                                       :  +- MapPartitions org.apache.spark.ml.recommendation.ALSModel$$Lambda$4175/1183495715@bee075c, obj#1697: scala.Tuple2
                                       :     +- DeserializeToObject newInstance(class scala.Tuple2), obj#1696: scala.Tuple2
                                       :        +- Project [_1#1497 AS id#1502, _2#1498 AS features#1503]
                                       :           +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#1497, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#1498]
                                       :              +- ExternalRDD [obj#1496]
                                       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(IntegerType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#1709, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#1710]
                                          +- MapPartitions org.apache.spark.ml.recommendation.ALSModel$$Lambda$4175/1183495715@7ac1b8f2, obj#1708: scala.Tuple2
                                             +- DeserializeToObject newInstance(class scala.Tuple2), obj#1707: scala.Tuple2
                                                +- Project [_1#1509 AS id#1514, _2#1510 AS features#1515]
                                                   +- SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#1509, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#1510]
                                                      +- ExternalRDD [obj#1508]


In [44]:
final3.orderBy('occupation').show(30,False)
unique= final3.select('occupation').distinct()
print('Total Unique Number of occupation  --->', unique.orderBy('occupation').count())

+-------------+-------------------+
|occupation   |totalNumberOfMovies|
+-------------+-------------------+
|administrator|79                 |
|artist       |28                 |
|doctor       |7                  |
|educator     |95                 |
|engineer     |67                 |
|entertainment|18                 |
|executive    |32                 |
|healthcare   |16                 |
|homemaker    |7                  |
|lawyer       |12                 |
|librarian    |51                 |
|marketing    |26                 |
|none         |9                  |
|other        |105                |
|programmer   |66                 |
|retired      |14                 |
|salesman     |12                 |
|scientist    |31                 |
|student      |196                |
|technician   |27                 |
|writer       |45                 |
+-------------+-------------------+

Total Unique Number of occupation  ---> 21
