In [1]:
# Import the Rating object
from pyspark.mllib.recommendation import Rating
# Import the ALS method
from pyspark.mllib.recommendation import ALS

In [2]:
data = spark.read.csv("gs://shujiangtan_msba_uci/rating.csv", header=True, inferSchema=True)
data.show(5)

+---------+-------+--------+--------------------+
|     user|product|duration|       category name|
+---------+-------+--------+--------------------+
|100278414|     59|       8|Gift Certificates...|
|105583702|     59|       6|Gift Certificates...|
|105583702|     59|      10|Gift Certificates...|
|105583702|     59|      35|Gift Certificates...|
|105583702|     59|      15|Gift Certificates...|
+---------+-------+--------+--------------------+
only showing top 5 rows



In [3]:
# Convert your dataframe into an RDD
ratings = data.rdd.map(list)
ratings.take(5)

[[100278414, 59, 8, 'Gift Certificates & Coupons'],
 [105583702, 59, 6, 'Gift Certificates & Coupons'],
 [105583702, 59, 10, 'Gift Certificates & Coupons'],
 [105583702, 59, 35, 'Gift Certificates & Coupons'],
 [105583702, 59, 15, 'Gift Certificates & Coupons']]

In [4]:
# Convert the data into Rating objects
ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))
# This is what a Rating object looks like
ratings_final.take(5)

[Rating(user=100278414, product=59, rating=8.0),
 Rating(user=105583702, product=59, rating=6.0),
 Rating(user=105583702, product=59, rating=10.0),
 Rating(user=105583702, product=59, rating=35.0),
 Rating(user=105583702, product=59, rating=15.0)]

In [9]:
# Split the data into training and test, in 80-20% ratio
training_data, test_data = ratings_final.randomSplit([0.8,0.2])
# Build the model based on the training data, with tank = 10 and iterations = 10
model = ALS.train(training_data, rank=10, iterations=10)
# Predict the model
predictions = model.predictAll(testdata_no_rating)
predictions.take(5)

[Rating(user=110055418, product=21, rating=28.499575169708187),
 Rating(user=110055418, product=21, rating=28.499575169708187),
 Rating(user=110055418, product=17, rating=24.99981139967126),
 Rating(user=95318548, product=22, rating=15.96656904137907),
 Rating(user=95318548, product=22, rating=15.96656904137907)]

In [8]:
# Drop the ratings column
testdata_no_rating = test_data.map(lambda p: (p[0],p[1]))
testdata_no_rating.take(5)

[(100278414, 59),
 (105583702, 59),
 (110727138, 59),
 (111903371, 59),
 (77626420, 59)]

In [10]:
# Prepare ratings data
rates = ratings_final.map(lambda r: ((r[0],r[1]),r[2]))
rates.take(5)

[((100278414, 59), 8.0),
 ((105583702, 59), 6.0),
 ((105583702, 59), 10.0),
 ((105583702, 59), 35.0),
 ((105583702, 59), 15.0)]

In [11]:
# Prepare predictions data
preds = predictions.map(lambda r: ((r[0],r[1]),r[2]))
preds.take(5)

[((110055418, 21), 28.499575169708187),
 ((110055418, 21), 28.499575169708187),
 ((110055418, 17), 24.99981139967126),
 ((95318548, 22), 15.96656904137907),
 ((95318548, 22), 15.96656904137907)]

In [13]:
# Join the ratings data with predictions data
rates_and_preds = rates.join(preds)
rates_and_preds.take(20)

[((111903371, 59), (11.0, 10.999930861449739)),
 ((95589169, 59), (18.0, 17.99914521627594)),
 ((101568678, 17), (66.0, 38.135414645939846)),
 ((101568678, 17), (36.0, 38.135414645939846)),
 ((101568678, 17), (36.0, 38.135414645939846)),
 ((101568678, 17), (14.0, 38.135414645939846)),
 ((101568678, 17), (27.0, 38.135414645939846)),
 ((101568678, 17), (62.0, 38.135414645939846)),
 ((101568678, 17), (62.0, 38.135414645939846)),
 ((101568678, 17), (19.0, 38.135414645939846)),
 ((101568678, 17), (19.0, 38.135414645939846)),
 ((101568678, 17), (35.0, 38.135414645939846)),
 ((101568678, 17), (38.0, 38.135414645939846)),
 ((101568678, 17), (52.0, 38.135414645939846)),
 ((104963334, 17), (20.0, 19.999994091487252)),
 ((110956695, 17), (22.0, 21.999962448491942)),
 ((110823053, 16), (38.0, 28.250143932426823)),
 ((110823053, 16), (38.0, 28.250143932426823)),
 ((110823053, 16), (38.0, 28.250143932426823)),
 ((110823053, 16), (38.0, 28.250143932426823))]

In [18]:
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MSE

849.1812110571841

# ALS 

In [22]:
ratings = spark.read.csv("gs://shujiangtan_msba_uci/ratings.csv", header=True, inferSchema=True)
ratings.show(5)

+---------+-------+--------+
|     user|product|duration|
+---------+-------+--------+
|100278414|     59|       8|
|105583702|     59|       6|
|105583702|     59|      10|
|105583702|     59|      35|
|105583702|     59|      15|
+---------+-------+--------+
only showing top 5 rows



In [24]:
# Count the total number of ratings in the dataset
numerator = ratings.select("duration").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("user").distinct().count()
num_products = ratings.select("product").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_products

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  77.91% empty.


In [26]:
# Import the requisite packages
from pyspark.sql.functions import col

# # Group data by userId, count ratings
ratings.groupBy("user").count().show(10)

+---------+-----+
|     user|count|
+---------+-----+
|110727138|    6|
|112223217|   12|
|110967225|    3|
| 91439063|    1|
|115177811|    8|
|110910759|    7|
|113142536|    1|
|109627730|    1|
| 15847884|   27|
|105179386|    9|
+---------+-----+
only showing top 10 rows



In [41]:
ratings.printSchema()

root
 |-- user: integer (nullable = true)
 |-- product: integer (nullable = true)
 |-- duration: integer (nullable = true)



In [43]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="user", itemCol="product", ratingCol="duration", nonnegative = True, implicitPrefs = False)

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [44]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.maxIter, [5, 50, 100, 200]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
RMSE = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  64


In [45]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=RMSE, numFolds=5)

# Confirm cv was built
print(cv)

CrossValidator_63caf442a4a1


In [46]:
train.take(5)

[Row(user=15847884, product=1, duration=52),
 Row(user=15847884, product=1, duration=52),
 Row(user=15847884, product=1, duration=52),
 Row(user=15847884, product=1, duration=52),
 Row(user=15847884, product=1, duration=52)]

In [48]:
ratings.filter(col("user") == 15847884).sort("rating", ascending = False).show()

AnalysisException: "cannot resolve '`rating`' given input columns: [user, product, duration];;\n'Sort ['rating DESC NULLS LAST], true\n+- Filter (user#216 = 15847884)\n   +- Relation[user#216,product#217,duration#218] csv\n"

In [47]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
#best_model = model.bestModel

IllegalArgumentException: 'Field "rating" does not exist.\nAvailable fields: user, product, duration, CrossValidator_63caf442a4a1_rand, prediction'

In [28]:
best_model = model.bestModel

NameError: name 'model' is not defined

In [23]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# Print "Rank"
print("  Rank:", best_model.getRank())

# Print "MaxIter"
print("  MaxIter:", best_model.getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model.getRegParam())

NameError: name 'best_model' is not defined

In [35]:
test_predictions = best_model.transform(test)

NameError: name 'best_model' is not defined