In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
spark = SparkSession\
    .builder\
    .appName("Recommendation_Systems")\
    .getOrCreate()
    
sqlContext = SQLContext(spark)

#### 1. Recommendation

Here I use 3 datasets -

  (i) Movie ratings

In [4]:
# Reading the data
set1 = spark.read.csv("recommendation_system_train.csv", header=True)

In [5]:
# Looking at the first five rows of the Dataframe
set1.show(5)

# Checking the dimensions of the Dataframe
print("Number of rows = " + str(set1.count()))
print("Number of columns = " + str(len(set1.columns)))

# Column names
set1.columns

+----+-----+------+---------+
|user|movie|rating|       id|
+----+-----+------+---------+
|2783| 1253|     5|2783_1253|
|2783|  589|     5| 2783_589|
|2783| 1270|     4|2783_1270|
|2783| 1274|     4|2783_1274|
|2783|  741|     5| 2783_741|
+----+-----+------+---------+
only showing top 5 rows

Number of rows = 500100
Number of columns = 4


['user', 'movie', 'rating', 'id']

In [6]:
# Summary statistics of rating
set1.describe('rating').show()

# Number of distinct users in the dataset
set1.select('user').distinct().count()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            500100|
|   mean|3.6022235552889423|
| stddev| 1.114687505188166|
|    min|                 1|
|    max|                 5|
+-------+------------------+



3255

In [7]:
# read.csv reads everything into string format
# Pre-processing the data to match the required format
set1 = set1.withColumn("user", set1["user"].cast("double"))
set1 = set1.withColumn("movie", set1["movie"].cast("double"))
set1 = set1.withColumn("rating", set1["rating"].cast("double"))

# Splitting the data into train (75%) and test (25%)
(train1, test1) = set1.randomSplit([0.75,0.25], seed=0)

In [8]:
# Fit the recommendation model on training set
als1 = ALS(maxIter=5, regParam=0.05, userCol="user", itemCol="movie", ratingCol="rating",
          coldStartStrategy="drop")
model1 = als1.fit(train1)

In [9]:
# Model evaluator on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Prediction and model evaluation
predict1 = model1.transform(test1)
rmse1 = evaluator.evaluate(predict1)
print("Root mean-squared error = " + str(rmse1))

Root mean-squared error = 0.8878323656201714


In [10]:
# Generate top 3 movie recommendations for each user
userRecs1 = model1.recommendForAllUsers(3)
userRecs1.show(5)

# Generate top 3 user recommendations for each movie
movieRecs1 = model1.recommendForAllItems(3)
movieRecs1.show(5)

+----+--------------------+
|user|     recommendations|
+----+--------------------+
|4900|[[2493, 6.0862775...|
|5300|[[1570, 6.615103]...|
|4101|[[2834, 6.4760494...|
|5803|[[2128, 5.557115]...|
|3794|[[3720, 4.901862]...|
+----+--------------------+
only showing top 5 rows

+-----+--------------------+
|movie|     recommendations|
+-----+--------------------+
| 1580|[[3902, 5.275666]...|
|  471|[[2988, 6.0639215...|
| 1591|[[2867, 4.310448]...|
| 1342|[[4801, 5.0251055...|
| 2122|[[5670, 5.182676]...|
+-----+--------------------+
only showing top 5 rows



In [11]:
# Generate top 3 movie recommendations for a specified user
user1 = set1.select(als1.getUserCol()).distinct().limit(1)
userSubsetRecs1 = model1.recommendForUserSubset(user1, 3)
userSubsetRecs1.show()

# Generate top 3 user recommendations for a specified movie
movie1 = set1.select(als1.getItemCol()).distinct().limit(1)
movieSubSetRecs1 = model1.recommendForItemSubset(movie1, 3)
movieSubSetRecs1.show()

+----+--------------------+
|user|     recommendations|
+----+--------------------+
|2815|[[2197, 5.0955935...|
+----+--------------------+

+-----+--------------------+
|movie|     recommendations|
+-----+--------------------+
| 2734|[[5756, 4.7449493...|
+-----+--------------------+



(ii) Musical Instruments Ratings

In [12]:
# Read the data
set2 = spark.read.csv("ratings_Musical_Instruments.csv")

In [13]:
# Looking at the first five rows of the Dataframe
set2.show(5)

# Checking the dimensions of the Dataframe
print("Number of rows = " + str(set2.count()))
print("Number of columns = " + str(len(set2.columns)))

# Column names
set2.columns

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A1YS9MDZP93857|0006428320|3.0|1394496000|
|A3TS466QBAWB9D|0014072149|5.0|1370476800|
|A3BUDYITWUSIS7|0041291905|5.0|1381708800|
|A19K10Z0D2NTZK|0041913574|5.0|1285200000|
|A14X336IB4JD89|0201891859|1.0|1350432000|
+--------------+----------+---+----------+
only showing top 5 rows

Number of rows = 500176
Number of columns = 4


['_c0', '_c1', '_c2', '_c3']

In [14]:
# Processing the Dataframe
# Rename the columns
set2 = set2.withColumnRenamed("_c0", "user")
set2 = set2.withColumnRenamed("_c1", "item")
set2 = set2.withColumnRenamed("_c2", "rating")
set2 = set2.withColumnRenamed("_c3", "timestamp")
# Change the column types to match the required format
set2 = set2.withColumn("item", set2["item"].cast("integer"))
set2 = set2.withColumn("rating", set2["rating"].cast("double"))

# Converting userID to numeric
stringIndexer = StringIndexer(inputCol="user", outputCol="userID")
model = stringIndexer.fit(set2)
indexed = model.transform(set2)
indexed = indexed.drop("user")
indexed.show(5)

# Dropping the rows with NULL values
indexed = indexed.filter(indexed.item. isNotNull())
indexed = indexed.filter(indexed.rating. isNotNull())
indexed = indexed.filter(indexed.userID. isNotNull())

+---------+------+----------+--------+
|     item|rating| timestamp|  userID|
+---------+------+----------+--------+
|  6428320|   3.0|1394496000|  5092.0|
| 14072149|   5.0|1370476800| 15198.0|
| 41291905|   5.0|1381708800| 50865.0|
| 41913574|   5.0|1285200000|309908.0|
|201891859|   1.0|1350432000| 56317.0|
+---------+------+----------+--------+
only showing top 5 rows



In [15]:
# Splitting the data into train (75%) and test (25%)
(train2, test2) = indexed.randomSplit([0.75,0.25], seed=0)

# Fit the recommendation model on training set
als2 = ALS(maxIter=5, regParam=0.05, userCol="userID", itemCol="item", ratingCol="rating",
          coldStartStrategy="drop")
model2 = als2.fit(train2)

# Prediction and model evaluation
predict2 = model2.transform(test2)
rmse2 = evaluator.evaluate(predict2)
print("Root mean-squared error = " + str(rmse2))

Root mean-squared error = 3.905666118018867


In [16]:
# Generate top 3 musical instruments recommendations for each user
userRecs2 = model2.recommendForAllUsers(3)
userRecs2.show(5)

# Generate top 3 user recommendations for each musical instrument
musicRecs2 = model2.recommendForAllItems(3)
musicRecs2.show(5)

+------+--------------------+
|userID|     recommendations|
+------+--------------------+
|245390|[[634061801, 14.5...|
| 68202|[[14072149, 4.351...|
|131213|[[634061801, 5.81...|
| 63964|[[634061801, 14.5...|
|194974|[[634061801, 8.72...|
+------+--------------------+
only showing top 5 rows

+----------+--------------------+
|      item|     recommendations|
+----------+--------------------+
|1933098465|[[166, 6.145127],...|
| 634029355|[[285214, 4.98478...|
|1423465180|[[278066, 3.98176...|
| 739046500|[[79730, 4.975225...|
| 767851013|[[35421, 4.961478...|
+----------+--------------------+
only showing top 5 rows



(iii) Restaurant Data with Consumer Ratings

In [17]:
# Read the data
# No pre-processing is required for data structure
set3 = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('rating_final.csv')

# Looking at the first five rows of the Dataframe
set3.show(5)

# Checking the dimensions of the Dataframe
print("Number of rows = " + str(set3.count()))
print("Number of columns = " + str(len(set3.columns)))

# Column names
set3.columns

+------+-------+------+-----------+--------------+
|userID|placeID|rating|food_rating|service_rating|
+------+-------+------+-----------+--------------+
| U1077| 135085|     2|          2|             2|
| U1077| 135038|     2|          2|             1|
| U1077| 132825|     2|          2|             2|
| U1077| 135060|     1|          2|             2|
| U1068| 135104|     1|          1|             2|
+------+-------+------+-----------+--------------+
only showing top 5 rows

Number of rows = 1161
Number of columns = 5


['userID', 'placeID', 'rating', 'food_rating', 'service_rating']

In [18]:
# Processing the Dataframe
# Mapping userID to numeric type
stringIndexer = StringIndexer(inputCol="userID", outputCol="user")
model = stringIndexer.fit(set3)
indexed3 = model.transform(set3)
indexed3 = indexed3.drop("userID")
indexed3.show(5)

+-------+------+-----------+--------------+-----+
|placeID|rating|food_rating|service_rating| user|
+-------+------+-----------+--------------+-----+
| 135085|     2|          2|             2|112.0|
| 135038|     2|          2|             1|112.0|
| 132825|     2|          2|             2|112.0|
| 135060|     1|          2|             2|112.0|
| 135104|     1|          1|             2| 81.0|
+-------+------+-----------+--------------+-----+
only showing top 5 rows



In [19]:
# Splitting the data into train (75%) and test (25%)
(train3, test3) = indexed3.randomSplit([0.75,0.25], seed=0)

# Fit the recommendation model on training set
als3 = ALS(maxIter=5, regParam=0.02, userCol="user", itemCol="placeID", ratingCol="rating",
          coldStartStrategy="drop")
model3 = als3.fit(train3)

# Prediction and model evaluation
predict3 = model3.transform(test3)
rmse3 = evaluator.evaluate(predict3)
print("Root mean-squared error = " + str(rmse3))

Root mean-squared error = 0.931323092008272


In [20]:
# Generate top 3 restaurant recommendations for each user
userRecs3 = model3.recommendForAllUsers(3)
userRecs3.show(5)

# Generate top 3 user recommendations for each restaurant
foodRecs3 = model3.recommendForAllItems(3)
foodRecs3.show(5)

+----+--------------------+
|user|     recommendations|
+----+--------------------+
|  31|[[135039, 1.87082...|
|  85|[[134986, 1.98203...|
| 137|[[135013, 1.70137...|
|  65|[[135057, 2.01050...|
|  53|[[135034, 2.45141...|
+----+--------------------+
only showing top 5 rows

+-------+--------------------+
|placeID|     recommendations|
+-------+--------------------+
| 135000|[[11, 2.2330902],...|
| 135027|[[52, 1.9609578],...|
| 135066|[[46, 2.0773256],...|
| 132663|[[48, 1.0813923],...|
| 135108|[[29, 2.020702], ...|
+-------+--------------------+
only showing top 5 rows

