In [1]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import pylab
import json
import numpy as np

from pyspark.sql import Row
from datetime import datetime
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

## Using Spark to process the data and build the model

In [3]:
sc =SparkContext()
sqlContext = SQLContext(sc)

### Loading Training Data

In [8]:
reviews = sc.textFile('../Filter_Data/yelp.train.rating').map(lambda l: l.split("\t"))
training = reviews.map(lambda l: Rating(int(l[0]),int(l[1]),float(l[2])))
training.take(5)

[Rating(user=0, product=0, rating=3.0),
 Rating(user=0, product=1, rating=3.0),
 Rating(user=0, product=2, rating=4.0),
 Rating(user=0, product=3, rating=4.0),
 Rating(user=0, product=4, rating=3.0)]

### Parameters for ALS
Tuned over several runs

In [14]:
rank = 100
numIterations = 20
model = ALS.train(training, rank, numIterations)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60125)
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 852, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 990, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:60125)

Running Predictions and calculating MSE on Training Data 

In [7]:
testdata = training.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))

In [9]:
ratesAndPreds = reviews.map(lambda l: ((int(l[0]),int(l[1])),int(l[2]))).join(predictions)
ratesAndPreds.take(5)

[((0, 8), (3, 3.035057897333798)),
 ((0, 26), (4, 3.986302984546862)),
 ((0, 28), (4, 4.012686001305583)),
 ((0, 34), (4, 4.007371726733274)),
 ((0, 36), (4, 3.98576594915043))]

In [10]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error On Training Data= " + str(MSE))

Mean Squared Error On Training Data= 0.0027320870365812764


### Loading Test Data
Running Predictions and calculating MSE on Test Data 

In [23]:
test_reviews = sc.textFile('../Filter_Data/yelp.test.rating').map(lambda l: l.split("\t")).map(lambda l: ((int(l[0]),int(l[1])),int(l[2])))
testdata = test_reviews.map(lambda p: (p[0][0], p[0][1]))

In [None]:
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test_reviews.join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error On Test Data = " + str(MSE))

In [12]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol = "user_id", itemCol ="buss_id", ratingCol= "rating")

rank = [100]
maxIter = [20]
reg =  [.15]
param_grid = ParamGridBuilder().addGrid(als.rank, rank).addGrid(als.maxIter, maxIter).addGrid(als.regParam, reg).build()

In [13]:
evaluator = RegressionEvaluator(metricName= "rmse", labelCol= "rating", predictionCol= "prediction")

cv = TrainValidationSplit(estimator= als, estimatorParamMaps = param_grid, evaluator= evaluator)

In [16]:
#model = cv.fit(reviews.toDF("user_id", "buss_id", "rating")
training = sqlContext.createDataFrame(reviews.map(lambda x: Row(user_id=int(x[0]), buss_id=int(x[1]), rating=int(x[2]))))

In [17]:
model = cv.fit(training)

In [18]:
best_model = model.bestModel

In [31]:
testdata = sqlContext.createDataFrame(test_reviews.map(lambda x: Row(user_id=x[0][0], buss_id=x[0][1], rating = x[1])))
predictions = best_model.transform(testdata)
evaluator = RegressionEvaluator(metricName= "rmse", labelCol= "rating", predictionCol= "prediction")
rmse = evaluator.evaluate(predictions)
print("RMSE with test data and regularization is = ", rmse)

RMSE with test data is =  nan


In [32]:
predictions.take(5)

[Row(buss_id=148, rating=4, user_id=12905, prediction=3.2213430404663086),
 Row(buss_id=148, rating=5, user_id=17364, prediction=4.357882022857666),
 Row(buss_id=463, rating=3, user_id=12751, prediction=4.343938827514648),
 Row(buss_id=463, rating=5, user_id=10897, prediction=4.0205078125),
 Row(buss_id=463, rating=5, user_id=13658, prediction=3.950611114501953)]

In [33]:
testdata.take(5)

[Row(buss_id=43, rating=5, user_id=0),
 Row(buss_id=72, rating=5, user_id=1),
 Row(buss_id=127, rating=4, user_id=2),
 Row(buss_id=151, rating=5, user_id=3),
 Row(buss_id=411, rating=4, user_id=4)]

In [36]:
evaluator = RegressionEvaluator(metricName= "rmse", labelCol= "rating", predictionCol= "prediction")
rmse = evaluator.evaluate(predictions)
rmse

nan

In [50]:
test_b = set(testdata.select(testdata.buss_id).distinct().rdd.map(lambda r: r[0]).collect())

In [53]:
train_b = set(training.select(training.buss_id).distinct().rdd.map(lambda r: r[0]).collect())

In [55]:
diff = test_b.difference(train_b)

234

In [56]:
testdata.filter(testdata.buss_id not in diff).count()

TypeError: unhashable type: 'Column'