# _import modules & set constants:_

In [1]:
# enable in-line MatPlotLib
%matplotlib inline

In [2]:
# import Python modules
from __future__ import division, print_function
import numpy
import os
import pandas
import sys

In [3]:
# hack for live printing in iPython Notebook, adapted from:
# http://stackoverflow.com/questions/29772158/make-ipython-notebook-print-in-real-time
class flushfile():
    def __init__(self, f):
        self.f = f
    def __getattr__(self,name): 
        return object.__getattribute__(self.f, name)
    def write(self, x):
        self.f.write(x)
        self.f.flush()
    def flush(self):
        self.f.flush()
        
oldsysstdout = sys.stdout        
sys.stdout = flushfile(sys.stdout)

In [5]:
# set CONSTANTS

# using AWS EMR?
AWS_EMR_MODE = os.path.expanduser('~') == '/home/hadoop'

# data paths
DATA_FOLDER_NAME = 'DATA___MovieLens___20M'
DATA_REPO_URL = 'https://github.com/ChicagoBoothML/%s' % DATA_FOLDER_NAME
MOVIES_FILE_NAME = 'movies.csv'
RATINGS_FILE_NAMES = \
    ['ratings01.csv',
     'ratings02.csv',
     'ratings03.csv',
     'ratings04.csv',
     'ratings05.csv',
     'ratings06.csv',
     'ratings07.csv',
     'ratings08.csv',
     'ratings09.csv',
     'ratings10.csv']

# number of examples to display for a data set
NB_EXAMPLES_TO_SHOW = 9

# random_seed
RANDOM_SEED = 99

# Apache Spark settings
if AWS_EMR_MODE:
    SPARK_MODE = 'yarn-client'                 # running Spark on AWS EMR YARN cluster
    SPARK_HOME = '/usr/lib/spark'              # default Spark installation folder on AWS EMR master node
    SPARK_DRIVER_MEMORY = '9g'                 # memory allocated to MapReduce driver process
    SPARK_EXECUTOR_MEMORY = '3g'               # memory allocated to each MapReduce executor process
    SPARK_DRIVER_MAX_RESULT_SIZE = '6g'        # maximum size of objects collected back to MapReduce driver process
else:
    SPARK_MODE = 'local'                       # running Spark on single machine
    SPARK_HOME = '/Applications/spark-1.5.2'   # Spark installation folder on my machine
    SPARK_DRIVER_MEMORY = '5g'                 # memory allocated to MapReduce driver process 
    SPARK_EXECUTOR_MEMORY = '1g'               # memory allocated to each MapReduce executor process
    SPARK_DRIVER_MAX_RESULT_SIZE = '3g'        # maximum size of objects collected back to MapReduce driver process

In [6]:
# install ChicagoBoothML_Helpy
CHICAGOBOOTHML_HELPY_INSTALLATION_COMMAND = \
    'pip install --upgrade git+git://GitHub.com/ChicagoBoothML/Helpy --no-dependencies'
if AWS_EMR_MODE:
    os.system('sudo %s' % CHICAGOBOOTHML_HELPY_INSTALLATION_COMMAND)
else:
    os.system(CHICAGOBOOTHML_HELPY_INSTALLATION_COMMAND)

# import from package
from ChicagoBoothML_Helpy.Print import printflush

# Launch PySpark and set up SparkContext & HiveContext

In [7]:
if 'pyspark' not in vars():   # set up Apache Spark environment if not yet done so
    
    # set environment variables for Spark
    os.environ['SPARK_HOME'] = SPARK_HOME
    os.environ['SPARK_HIVE'] = 'true'
    
    # enable importing of PySpark through FindSpark package
    import findspark
    findspark.init()
    
    # import PySpark and set up SparkContext ("sc") & HiveContext ("hc")
    import pyspark
    
    sc = pyspark.SparkContext(
        conf=pyspark.SparkConf()
            .setMaster(SPARK_MODE)
            .setAppName('BostonHousing')
            .set('spark.driver.memory', SPARK_DRIVER_MEMORY)
            .set('spark.executor.memory', SPARK_EXECUTOR_MEMORY)
            .set('spark.driver.maxResultSize', SPARK_DRIVER_MAX_RESULT_SIZE))
    
    hc = pyspark.sql.HiveContext(sc)
    
print('SparkContext:', sc)
print('HiveContext:', hc)

SparkContext: <pyspark.context.SparkContext object at 0x7f386cdee3d0>
HiveContext: <pyspark.sql.context.HiveContext object at 0x7f386ce04a50>


In [8]:
# imports from PySpark
from pyspark.ml.recommendation import ALS

# Download PySpark_CSV.py and put it into SparkContext

In [9]:
# download PySpark_CSV.py and put it into SparkContext
!curl https://raw.githubusercontent.com/seahboonsiew/pyspark-csv/master/pyspark_csv.py --output pyspark_csv.py

if AWS_EMR_MODE:
    sc.addPyFile('pyspark_csv.py')

from pyspark_csv import csvToDataFrame

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  5493  100  5493    0     0  30641      0 --:--:-- --:--:-- --:--:-- 30687


# Download, parse & preprocess data

In [10]:
# download data
print('Cloning Data Repo... ', end='')
os.system('git clone %s' % DATA_REPO_URL)
print('done!')

Cloning Data Repo... done!


In [11]:
# move data to same folder or into HDFS
print('Moving Data Files:', end='')
for file_name in [MOVIES_FILE_NAME] + RATINGS_FILE_NAMES:
    print(' %s,' % file_name, end='')
    if AWS_EMR_MODE:
        os.system('hadoop fs -put %s %s'
                  % (os.path.join(DATA_FOLDER_NAME, file_name), file_name))
    elif sys.platform.startswith('win'):
        os.system('copy /y %s %s'
                  % (os.path.join(DATA_FOLDER_NAME, file_name), file_name))
    else:
        os.system('yes | cp -rf %s %s'
                  % (os.path.join(DATA_FOLDER_NAME, file_name), file_name))
print(' done!')

Moving Data Files: movies.csv, ratings01.csv, ratings02.csv, ratings03.csv, ratings04.csv, ratings05.csv, ratings06.csv, ratings07.csv, ratings08.csv, ratings09.csv, ratings10.csv, done!


In [None]:
print('Parsing %s...' % MOVIES_FILE_NAME, end='')
movies_ddf = \
    csvToDataFrame(
        sqlCtx=hc,
        rdd=sc.textFile(MOVIES_FILE_NAME),
        columns=None,
        sep=',',
        parseDate=True)\
    .cache()
    
movies_ddf.registerTempTable('movies')

print(' done!\n')
movies_ddf.show(NB_EXAMPLES_TO_SHOW)

Parsing movies.csv ...

In [11]:
print('Parsing ')
for i, ratings_file_name in enumerate(RATINGS_FILE_NAMES):
    d = csvToDataFrame(
        sqlCtx=hc,
        rdd=sc.textFile(ratings_file_name),
        columns=None,
        sep=',',
        parseDate=True)
    if not i:
        ratings_ddf = d
    else:
        ratings_ddf = ratings_ddf.unionAll(d)

ratings_ddf.cache()
ratings_ddf.registerTempTable('ratings')

ratings_ddf.show(NB_EXAMPLES_TO_SHOW)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
+------+-------+------+----------+
only showing top 9 rows



In [14]:
# number of ratings
ratings_ddf.count()

20000263

### _**NOTE**: the below parameters run successfully on an AWS EMR cluster of 1 + 5 nodes of type M3.xlarge_

In [76]:
# split Train & Test sets:
ratings_train_ddf, ratings_test_ddf = \
    ratings_ddf.randomSplit(
        weights=[.5, .5],
        seed=RANDOM_SEED)

# Build model pipelines

In [77]:
latent_factor_reccommender = \
    ALS(
        rank=30,
        maxIter=30,
        regParam=1e-3,
        numUserBlocks=10,
        numItemBlocks=10,
        implicitPrefs=False,
        alpha=1.,   # only relevant for implicit preferences
        userCol='userId',
        itemCol='movieId',
        seed=RANDOM_SEED,
        ratingCol='rating',
        nonnegative=True,
        checkpointInterval=10)

# Fit model

In [78]:
latent_factor_rec_model = \
    latent_factor_reccommender.fit(
        dataset=ratings_train_ddf)

# Make & evaluate predictions

In [79]:
predicted_ratings_ddf = \
    latent_factor_rec_model.transform(
        dataset=ratings_test_ddf)
    
predicted_ratings_ddf.registerTempTable('predicted_ratings')

predicted_ratings_ddf = hc.sql(
    "SELECT \
        * \
    FROM \
        predicted_ratings \
    WHERE \
        prediction != 'NaN'")\
    .cache()

predicted_ratings_ddf.cache()
predicted_ratings_ddf.registerTempTable('predicted_ratings')

predicted_ratings_ddf.show(NB_EXAMPLES_TO_SHOW)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|    32|     31|   3.0| 845962944| 2.8142872|
|  6632|     31|   3.5|1423926132| 2.4679818|
|  8232|     31|   4.0| 839840269|  4.621995|
|  9032|     31|   3.0| 934442206| 2.9327242|
| 10632|     31|   3.0|1112458785| 4.1600375|
| 23832|     31|   3.0|1094565708|  3.680944|
| 32432|     31|   4.0| 844687388|  3.444585|
| 39232|     31|   5.0| 832587925|  4.792559|
| 39432|     31|   3.5|1214953387|  3.760844|
+------+-------+------+----------+----------+
only showing top 9 rows



In [80]:
# min & max ratings for sanity-checking
preds = \
    predicted_ratings_ddf\
    .select('prediction')\
    .rdd\
    .map(lambda row: row[0])\
    .collect()

# there are extreme ratings way out of bound -
# so the recommender is not that great yet
min(p), max(p)

(-20.3768310546875, 21.41329574584961)

In [84]:
# RMSE evaluation of bounded predictions
numpy.sqrt(
    hc.sql(
        "SELECT \
            SUM(POW( \
                (CASE \
                    WHEN prediction < 0.0 THEN 0.0 \
                    WHEN prediction > 5.0 THEN 5.0 \
                    ELSE prediction \
                    END) - rating, 2)) \
        FROM \
            predicted_ratings")\
    .rdd\
    .map(lambda row: row[0])\
    .take(1)[0] / \
    predicted_ratings_ddf.count())

0.91489142509378918

# _END!_