The goal of this project is give you practice beginning to work with a distributed recommender system.

It is sufficient for this assignment to build out your application on a single node.

Adapt one of your recommendation systems to work with Apache Spark and compare
the performance with your previous iteration. Consider the efficiency of the system and the added complexity of using Spark. You may complete the assignment using PySpark (Python), SparkR (R) , sparklyr (R), or Scala.

Please include in your conclusion: For your given recommender system’s data,
algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

This project, uses data from kaggle: https://www.kaggle.com/uciml/restaurant-data-with-consumer-ratings/data

In [1]:
!pip install pyspark
!java -version
!sudo update-alternatives --config java
!java -version

!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null #install openjdk
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #set environment variable
!java -version #check java version

openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)
There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: 
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.6+10-post-Ubuntu-1ubuntu118.04.1, mixed mode, sharing)
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime 

In [0]:
#load libraries
import pandas as pd
import numpy as np
import random

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
from pyspark.sql.types import *
from pyspark.sql.types import IntegerType

In [0]:
schema = StructType([StructField("userID", IntegerType(), True),StructField("placeID", IntegerType(), True),StructField("rating", IntegerType(), True)])

#import data
restaurants = pd.read_csv('rating_final.csv')
restaurants['placeID'] = restaurants['placeID'].astype(int)

sp_df = spark.createDataFrame(restaurants, schema=schema)
data_df = sp_df.withColumn("placeID", sp_df["placeID"].cast(IntegerType()))

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

(training, test) = sp_df.randomSplit([0.8, 0.2])
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="placeID", ratingCol="rating")

In [0]:
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="placeID", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [6]:
#https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each restaurant
placeRecs = model.recommendForAllItems(10)

Root-mean-square error = 1.1716398312294891


In [7]:
#previous method

import numpy as np

user_matrix = restaurants.pivot(index='UserID', columns='placeID', values='rating')

TRAIN_SIZE = 0.80
msk = np.random.rand(len(user_matrix)) < TRAIN_SIZE

train = user_matrix[msk]  
test = user_matrix[~msk]
average = train.unstack().mean()
SE = (test - average)*(test - average)
MSE = SE.mean().mean()
RMSE = MSE ** (1/2)
"RMSE is " + str(RMSE)

'RMSE is 0.7709595972198889'

For your given recommender system’s data, algorithm(s), and (envisioned) implementation, at what point would you see moving to a distributed platform such as Spark becoming necessary?

As datasets become bigger and more difficult to manage, the fact that spark is incredibly fast and can cache data allows for the possibility of running large recommender systems such as this one. 