#DATA643 - Final Project
### Prashanth Padebettu, Parshu Rath, Adejare Windokun, Xingjia Wu
##### Summer 2016 
##### Instructor: Andrew Catlin

## Objectives

The main objective of this project is to build a context-aware recommender system using the Movielens dataset on Apache Spark cluster system. The goal is to incorporate contextual information (such as Time) in order to deliver not just movie recommendations but also unique user experiences shaped by user context, and take advantage of the distributed computing power to handle processing of large scale data and thus increasing relevance of the recommendation models to the users by reducing latency.

### Description of workflows:
1. Loading the data - the data was loaded by importing the 3 files and merged into a Pandas Dataframe.

2. Data exploration - The dataframe was converted to a Spark DataFrame object that allowed us to perform data using SQL.

3. Data visualization - Dataframe into a Resilient Distributed Dataset (RDD) to demonstrate how visualizations can be performed on large datasets.

4. Machine Learning, Recommendations and Predictions were performed using the Spark MLlib library with the Alternating Least Squares method. Models were evaluated based on RMSE. 

5. Contextual pre-filtering technique was enabled before training and testing the models. Contexts included the day of the week and time of the day that the user wished to watch a movie

6. Add new user ratings and re-train the model with combined dataset. Re-run the model to make recommendations and predict individual ratings

7. We also compared the results of using the 1,000,000 records as compared to using a much smaller set of records 100,000

In [5]:
# Required Python Packages
from time import time

tstart = time()

from math import sqrt
import pandas as pd
import numpy as np
from scipy import spatial
import scipy 
import matplotlib.pyplot as plt
import datetime
import os
from operator import add

### Create dataframe from 1M movielens data

The Movielens dataset is available at http://grouplens.org/datasets/movielens/, and consist of three text files containing data on 1 million ratings from 6000 users on 4000 movies, and was released in 2003 (movielens.com).

The files include the following:

1. Ratings.dat consisting of userid, movieid, timestamp and ratings (between 1 to 5)

2. Users.dat consisting of demographic data on the users

3. Movies.dat consisting of movieid, movietitle and genres of movies

These 3 files where download and unzipped and stored in Github for use in the application

In [8]:
# loading 1,000, 000 Movie lens data directly from github
path = "https://raw.githubusercontent.com/ppadebettu/CUNY/Master/IS_643_Recommender_Systems/Final_Project/Data/"
movies_fname = 'movies.dat'
url = path + movies_fname
pd_movies = pd.read_csv(url, sep = '::', engine='python', header = None, na_values='NaN', usecols = [0,1], 
                         names = ['movieid', 'movietitle'])

users_fname = 'users.dat'
url = path + users_fname
pd_users = pd.read_csv(url, sep = "::" , engine='python', header = None, na_values='NaN',
                        names = ['userid', 'gender', 'age', 'occupation', 'zipcode'])

ratings_fname = 'ratings.dat'
url = path + ratings_fname

pd_ratings = pd.read_csv(url, sep = "::" , engine='python', header = None, na_values='NaN', 
                        names = ['userid', 'movieid', 'rating', 'timestamp'])

# Merge three dataframe into one
result = pd.merge(pd_ratings,pd_movies, on = ['movieid'] )
movielens = pd.merge(result,pd_users, on = ['userid'] )


# Code to add the new columns that will contain the context data that is obtained from the timestamp
def fdate(x):   
    return datetime.datetime.fromtimestamp(
        int(str(x['timestamp']))).strftime('%Y-%m-%d') 

def ftime(x):   
    return datetime.datetime.fromtimestamp(
        int(str(x['timestamp']))).strftime('%H:%M:%S') 

def fweekday(x):   
    
    if (datetime.datetime.fromtimestamp(int(str(x['timestamp']))).weekday() >= 4):
        return 'Weekend'
    else:
        return 'Weekday'
    
def fagegroup(x):   
    
    if (x['age'] >= 45):
        return '45+'
    
    elif (x['age'] >= 30):
        return '30-44'
    
    elif (x['age'] >= 19):
        return '19-29'
    else:
        return 'below 18' 
    
def ftimeofday(x): 
    
    t = datetime.datetime.fromtimestamp(int(str(x['timestamp']))).strftime('%H:%M:%S')
    
    if (t >= '23:00:00'):
        return 'night'
    
    elif (t >= '18:00:00'):
        return 'evening'
    
    elif (t >= '12:00:00'):
        return 'afternoon'
    
    elif (t >= '08:00:00'):
        return 'morning'
    
    else:
        return 'night'
    
def flocation(x):   
    
    start = datetime.datetime.strptime(x['date'], '%Y-%m-%d')
    end = datetime.datetime.strptime(x['releasedate'], '%d-%b-%Y')
    
    if start - end >= datetime.timedelta(180):
        return 'home'
    else:
        return 'theater'       
    
    
movielens['date'] = movielens.apply(fdate, axis=1)
movielens['time'] = movielens.apply(ftime, axis=1)
movielens['weekday'] = movielens.apply(fweekday, axis=1)
movielens['timeofday'] = movielens.apply(ftimeofday, axis=1)
   


__Movielens Dataset Dimension (rowsXcolumns)__

In [10]:
movielens.shape

In [11]:
#movielens.head()
movielens.iloc[np.random.random_integers(1000209, size=5)]

__Create CSV and Save to Local Databricks Cloud (optional)__

In [13]:
# dbutils.fs.mkdirs("/moviedat/")
# display(dbutils.fs.ls("dbfs:/moviedat/"))
# #Remove the file if exists
# dbutils.fs.rm("dbfs:/moviedat/movielens.csv")
# #Save to csv file
# movielens.to_csv("/dbfs/moviedat/movielens.csv", header=True, index=False) # 93M
# display(dbutils.fs.ls("dbfs:/moviedat/"))
# # #Load csv file as Spark-RDD
# #movie_rdd = sc.textFile("dbfs:/movielens.csv")
# # header=movie1.first()
# # movie_rdd_split = movie1.filter(lambda x: x != header).map(lambda l: l.split(","))
# # movie_rdd_split.take(5)
# #Load csv file as Spark-SQLdf
# sqlCtx = sqlContext.getOrCreate(sc)
# movie_df = sqlCtx.read.format('com.databricks.spark.csv').options(header='true').load("dbfs:/movielens.csv")

### Data Exploration and Visualization

#### - Create Spark DataFrame

In [16]:
sqlCtx = sqlContext.getOrCreate(sc)
movie_df = sqlCtx.createDataFrame(movielens)

In [17]:
#movie_df.show(5)

#### - Data exploration

In [19]:
# Register as temp table in sqlContext
movie_df.registerTempTable("ratings_data_all")

- __Summary of data__

In [21]:
#Display summary from the table
movie_df.select("rating", "age").describe().show()

- __Top 10 users with most ratings__

In [23]:
#SQL can be run over DataFrames that have been registered as a table. 
#Top 10 users with most ratings
sqlContext.createDataFrame(sqlContext.sql("SELECT userid, COUNT(*) AS ratings_count FROM ratings_data_all GROUP BY userid \
ORDER BY ratings_count DESC LIMIT 10").collect()).show()

- __Number of movies by weekday and weekend__

In [25]:
# Number of movies by weekday and weekend
sqlContext.createDataFrame(sqlContext.sql("SELECT weekday, COUNT(distinct(movieid)) AS movie_count FROM ratings_data_all GROUP BY weekday").collect()).show()

- __Number of movies watched by time of day__

In [27]:
# Number of movies watched by time of day
sqlContext.createDataFrame(sqlContext.sql("SELECT timeofday, COUNT(distinct(movieid)) AS movie_count FROM ratings_data_all GROUP BY timeofday").collect()).show()

####- Visualization

In [29]:
# select needed columns
m_v = movie_df.map(lambda x: (x[0], x[2], x[4], x[5], x[11], x[12]))
sqlContext.createDataFrame(m_v.take(3), schema=('userid', 'rating', 'movietitle', 'genger', 'weekday', 'timeogday')).show()
#schema=('userid', 'rating', 'movietitle', 'genger', 'weekday', 'timeogday')


In [30]:
# Function to show histogram
def histogram(movies):
    count = map(lambda x: x[0], movies)
    movietitle = map(lambda x: x[1], movies)
    plt.barh(range(len(count)), count, color = 'blue')
    plt.yticks(range(len(count)), movietitle)
    f = plt.gcf()
    f.set_size_inches(16, 10)
    display(f)


- __Most watched movies__

In [32]:
x = m_v.map(lambda x: (x[2]))
x = x.map(lambda w: (w, 1))
x = x.reduceByKey(add)
x = x.map(lambda w: (w[1], w[0])).sortByKey(False)

In [33]:
# Most Watched Movies
histogram(x.take(5))

- __Ratings for movies__

In [35]:
# Bar Chart of Ratings for Moviesfig, ax = plt.subplots()

fig, ax = plt.subplots()
plt.hist(m_v.map(lambda x: (x[1])).collect(), facecolor='green', alpha=0.5)
display(fig)


- __Ratings by user__

In [37]:
# Ratings by User

x = m_v.map(lambda x: (x[0]))
x = x.map(lambda w : (w, 1))
x = x.reduceByKey(add)
x = x.map(lambda w: (w[1], w[0])).sortByKey(False)

x.take(5)
histogram(x.take(25))

- __Movie watching (weekday and weekend)__

In [39]:
x = m_v.map(lambda x: (x[4]))
x = x.map(lambda w: (w, 1))
x = x.reduceByKey(add)
x = x.map(lambda w: (w[1], w[0])).sortByKey(False)
# The slices will be ordered and plotted counter-clockwise.
labels = x.map(lambda w: w[1]).collect()
sizes =  x.map(lambda w: w[0]).collect()
colors = ['yellowgreen', 'gold']
explode = (0, 0.1,)  # only "explode" the 2nd slice (i.e. 'Hogs')

fig, ax = plt.subplots()
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
        autopct='%1.1f%%', shadow=True, startangle=90)
# Set aspect ratio to be equal so that pie is drawn as a circle.
plt.axis('equal')
display(fig)

- __Movie watching (Time of the day)__

In [41]:
x = m_v.map(lambda x: (x[5]))
x = x.map(lambda w: (w, 1))
x = x.reduceByKey(add)
x = x.map(lambda w: (w[1], w[0])).sortByKey(False)

# The slices will be ordered and plotted counter-clockwise.
labels = x.map(lambda w: w[1]).collect()
sizes =  x.map(lambda w: w[0]).collect()
colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral']
explode = (0, 0.1, 0, 0)  # only "explode" the 2nd slice (i.e. 'Hogs')

fig, ax = plt.subplots()
plt.pie(sizes, explode=explode, labels=labels, colors=colors, autopct='%1.1f%%', shadow=True, startangle=90)
#Set aspect ratio to be equal so that pie is drawn as a circle.
plt.axis('equal')
display(fig)

### Recommendation

__Save movie titles__

In [44]:
#Get movie titles
movies_titles = movielens[['movieid', 'movietitle']]
movies_titles =movies_titles.drop_duplicates()
print movies_titles.shape
movies_titles.head()

In [45]:
# Convert to RDD
#Get movie titles
movies = sqlCtx.createDataFrame(movies_titles)
print movies.take(3)
m = movies.map(lambda x: (int(x[0]),x[1]))


#### - Create RDD

In [47]:
movie_RDD = movie_df.rdd

In [48]:
#movie_RDD.take(3)
sqlContext.createDataFrame(movie_RDD.take(3)).show()

#### - Context prefiltering

__Prefilter with "Weekday" and "evening" as our example__

In [51]:
# Prefilter
prefilter = movie_RDD.filter(lambda x: x[11]=='Weekday' and x[12]=='evening')

In [52]:
#prefilter.take(3)
sqlContext.createDataFrame(prefilter.take(3)).show()


In [53]:
prefilter.count()

#### - Data split into Training, Validation and Test RDD

In [55]:
# Get only userid, movieid and rating
movie_rdd = prefilter.map(lambda x: (x[0], x[1], int(x[2])))

In [56]:
movie_rdd.take(3)

In [57]:
#Split data into training and test datasets
training_RDD, validation_RDD, test_RDD = movie_rdd.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [58]:
#test_for_predict_RDD.take(5)
training_RDD.take(3)

####- Collaborative Filtering implementation by using Alternating Least Squares

(Partially adapted from: https://github.com/jadianes/spark-movie-lens/blob/master/notebooks/building-recommender.ipynb )

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. <br>

The implementation in MLlib has the following parameters:<br>

numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).<br>
rank is the number of latent factors in the model.<br>
iterations is the number of iterations to run.<br>
lambda specifies the regularization parameter in ALS.<br>
implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.<br>
alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [61]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndpreds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(ratesAndpreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, round(error,4))
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

In [62]:
#Let's check our predictions
predictions.take(3)

In [63]:
#Let's compare predictions vs actuals (ratings)
#ratesAndpreds.take(10)
x = ratesAndpreds.map(lambda w: (w[0][0], w[0][1], w[1][0], round(w[1][1], 2)))
#x.take(5)
sqlContext.createDataFrame(x.take(10), schema=('userid', 'movieid', 'actual', 'predicted')).show()


In [64]:
#Let's test the selected model
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndpreds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(ratesAndpreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data (from 1M data) the RMSE is %s' % (round(error,4))

### Add new user into dataset and test recommendation model

In [66]:
#Create a list of ratings for a new user (998)
new_user_ID_1 = 70000

# The format of each line is (userID, movieID, rating)
new_user_ratings_1 = [
 (70000, 242, 4), # Kolya (1996)
 (70000, 51, 3),  # Legends of the Fall (1994)
 (70000, 465, 1),  # Jungle Book, The (1994)
 (70000 , 86, 2), # Remains of the Day, The (1993)
 (70000, 222, 3), # Star Trek: First Contact (1996)
 (70000, 274, 4), # Sabrina (1995)
 (70000, 1042, 3),  # Just Cause (1995)
 (70000, 1184, 3), # Endless Summer 2, The (1994)
 (70000, 265, 2), # Hunt for Red October, The (1990)
 (70000, 302, 3) # L.A. Confidential (1997)
]
new_user_ratings_RDD_1 = sc.parallelize(new_user_ratings_1)
print 'New user ratings: %s' % new_user_ratings_RDD_1.take(10)

In [67]:
# Merge new user ratings to the existing RDD
data_with_new_ratings_RDD = movie_rdd.union(new_user_ratings_RDD_1)

In [68]:
#Train the ALS model using new dataset and all the parameters we selected before
from time import time

t0 = time()
new_ratings_model = ALS.train(data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "New model trained in %s seconds" % round(tt,3)

####- Getting recommendations

In [70]:
#Getting top recommendations
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings_1) # get just movie IDs
# keep just those not on the ID list
new_user_unrated_movies_RDD = movie_rdd.filter(lambda x: x[0] not in new_user_ratings_ids)\
                               .map(lambda x:(new_user_ID_1, x[0]))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [71]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
# Use distinct() here
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.distinct().map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_RDD.take(3)

In [72]:
#Get movie titles
movies_titles = movie_RDD.map(lambda x: (int(x[1]),x[4]))
movies_titles.take(3)

In [73]:
#Merge movie titles and recommendations for the new user so that the results are meaningful
#movies_titles = ratings_data.map(lambda x: (int(x[0]),x[1]))
new_user_recommendations_rating_title_RDD = new_user_recommendations_rating_RDD.join(movies_titles)
new_user_recommendations_rating_title_RDD.distinct().sortBy(lambda x: x[1][0], ascending=False).take(3)

### Sampling data and comparison between 1M and 100K

Collaborative context filtering ALS model was also built with the data from 100K ratings data obtained fromm the 1M dataset. Prefilter was applied on the 1M dataset and 1/10th of the filtered data (approximately from 100K ratings data dfrom the 1M dartaset) was used to build an ALS model. RMSE of this model was evaluated and compared with the 1M filtered data.

In [75]:
#Sample 1/10 of data (~100K) of the context filtered data
movie_rdd100 = movie_rdd.sample(False, 0.1, 3045)

print "Total context filtered movies from 1M data set: %s \
\nOne-tenth of the filtered movies:  %s" % (movie_rdd.count(), movie_rdd100.count())
movie_rdd100.take(3)

In [76]:
#Split data into training and test datasets
training_RDD100, validation_RDD100, test_RDD100 = movie_rdd100.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD100 = validation_RDD100.map(lambda x: (x[0], x[1]))
test_for_predict_RDD100 = test_RDD100.map(lambda x: (x[0], x[1]))

#Collaborative Filtering with ALS
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model100 = ALS.train(training_RDD100, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions100 = model100.predictAll(validation_for_predict_RDD100).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndpreds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions100)
    error100 = math.sqrt(ratesAndpreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error100
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, round(error100, 4))
    if error < min_error:
        min_error = error100
        best_rank100 = rank
print 'The best model was trained with rank %s' % best_rank100


In [77]:
#Test the selected model with test set data
predictions100 = model100.predictAll(test_for_predict_RDD100).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndpreds100 = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions100)
error = math.sqrt(ratesAndpreds100.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print 'For testing data (from 100 K) the RMSE is %s' % (round(error,4))


####Model comparison

The 100K dataset had the filtered data with 18882 ratings. ALS model built based on this dataset had an RMSE of 1.405 and the test set RMSE of 1.328.  On the other hand, the ALS model built with all the filtered data (190150 ratings) from the 1M dataset had an RMSE of 0.916 and the test set RMSE of 0.918. This shows that the model improves as the number of input ratings is increased.

### Discussion

Large datasets become increasing difficult to process in a timely manner using standard data analytic techniques. Due to this, specialized software and hardware have been developed to allow this processing to occur in a much faster time.

Distributed computing which allows data to be distributed to different clusters and similarly processed by different clusters has been found to vastly reduce processing times. 

The Spark engine on Python, which we chose is one of such applications and it allows the programmer to work with a programing language that is familiar to them (Python in this case), while the implementation of the clustering, data sharing and computation is hidden from them.

Spark can be run on Java, Python, R and Scala, but for this exercise we chose Python. Spark can also be run locally on a single computer, on a virtual machine using Linux, or in the cloud, either as a hosted machine (AWS) or as a hosted platform (DataBricks).

__Problems and Issues__  

_a)	Local Machine_   

We attempted to use Pyspark (Python implementation of Spark) on our local machines after installing Spark and Java. However, the processing was unstable and we kept on getting multiple errors. Pyspark was impossible to run with a dataset that contained more than 100,000 records. In addition, each local machine had a different version of Python, Java and Spark installed with different configurations which made it difficult to troubleshoot.   
<br>

_b)	AWS_   

We attempted to use AWS. AWS was also unstable, probably due to environment configuration issues.We kept on getting multiple errors with configurations and loss of connectivity. In the EC2 instance, we were able to setup a master and two slave nodes and after much effort and Linux configurations, we were able to connect from Jupyter on our local machine to the spark instance in AWS. The stability of the EC2 instance particularly the spark nodes were not sustainable, as the code that ran fine in one instance wiould give JVM memory issues in successive runs. Some of the stage tasks would try to finish up even as the worker tasks are still completing their work.  Majority of the issues were related to socket exceptions and JVM memory issues. This prevented us from focusing on the actual code as most of the time were lost to troubleshooting the configuration and memory issues. In addition to that, some of the group members (2) were not able to get AWS up and running at all.

_c)	Virtual Machine using Ubuntu 16.04_   

We installed a Virtual Machine using Oracle Virtual Box, with Ubuntu 16.04, Anaconda, and Spark, and this allowed us to use Jupyter as the front end. This worked perfectly and we were able to run our 1,000,000 record dataset without any issues at all. 

However, the Virtual Machine does require some technical expertise, and not all of our computers had the necessary hardware to allow installation of the Virtual Machine.

_d)	DataBricks_   

Databricks (https://databricks.com) is a fully hosted Apache Spark Cluster that allows users to run Spark in Jupyter on the cloud without having to install anything on their local machines. Databricks provides a cluster of 6GB to each user for free. We decided to use this as it allowed all of us to have a single well defined environment for programming, code testing and more importantly for troubleshooting. One problem we noticed with the databricks is that, the created cluster terminates automatically after one hour of inactivity which can lead to excessive loss of time in reloading the dataset into the clusters.

In summary we used DataBricks which is a hosted Apache Spark cloud based platform for our project due to its ease of use ans environment stability. We did do some of the initial programing work on our local spark cluster, AWS and on the Virtual Ubuntu Server, but transferred all this to DataBricks for the final deployment

### Conclusion

Recommender systems are becoming ubiquitous due to the large amount of possible choices, the increase in the ability to capture both implicit and explicit data about users and items and computing power. However, the large amount of data that is collected makes it very difficult to analyze this data in a timely manner. Distributed data storage and computing (cluster computing) has come to the rescue and allows large datasets to be computed much more efficiently. Spark allows the use of such cluster by allowing a similar user-friendly interface (Python in this instance) while it hides and takes care of the complexities of the cluster computing.

However, as we found out this does come with a price, systems setup to run cluster computing have to be finely tuned in terms of software, hardware, memory, etc and they are not easy to setup or manage. We were not able to use AWS for our project. While Linux machines are probably easier to setup and use, they are not suitable for teams as Linux machines are not that common.

Hosted services such as DataBricks are probably preferred as they are already setup, do not require an installation, and they take care of all the management and configuration issues (zero management). They also allow the user to ?Launch, dynamically scale up or down, and terminate clusters with just a few clicks?

In conclusion, our project demonstrates the use of a large dataset to recommend and predict movies for users. We used Spark on Python as our Programming Language, and used a hosted (DataBricks) cluster environment to run our program.

In [81]:
tend = time() - tstart
print "Total computation time: %s" %tend