#### Import Libraries

The following cell has the libraries needed to perform the operations in this notebook

In [1]:
import pandas as pd

import numpy as np

from matplotlib import pyplot as plt
%matplotlib inline
import seaborn as sns


import os

from sklearn.preprocessing import LabelEncoder

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pyspark
spark = SparkSession.builder.master('local[*]').appName('capstone').getOrCreate()

# from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import *

In [3]:
# df_pd= pd.read_csv('../../data/final/comb_df_clean.csv')


In [4]:
# setting a schema to let spark know what data types to import the columns as from the csv

# schema = StructType([StructField("genre", StringType(), True),StructField("title", StringType(), True), 
#                      StructField("author", StringType(), True), StructField("pages", IntegerType(), True), 
#                      StructField("average_rating", FloatType(), True), StructField("book_description", StringType(), True), 
#                      StructField("user_unique_key", IntegerType(), True),StructField("number_of_stars", IntegerType(), True), 
#                      StructField("book_unique_key", IntegerType(), True), StructField("review_text", StringType(), True)])

# df_sp = spark.createDataFrame(df_pd, schema=schema)

In [3]:
df_pd= pd.read_csv('../../data/final/comb_df_sm.csv')

In [4]:
schema = StructType([StructField("user_unique_key", IntegerType(), True),StructField("number_of_stars", IntegerType(), True), 
                      StructField("book_unique_key", IntegerType(), True)])

df_sp = spark.createDataFrame(df_pd, schema=schema)

In [7]:
df_sp.printSchema()

root
 |-- user_unique_key: integer (nullable = true)
 |-- number_of_stars: integer (nullable = true)
 |-- book_unique_key: integer (nullable = true)



In [8]:
df_sp

DataFrame[user_unique_key: int, number_of_stars: int, book_unique_key: int]

In [5]:
stars_num= df_sp.select("number_of_stars").count()


users = df_sp.select("user_unique_key").distinct().count()
books = df_sp.select("book_unique_key").distinct().count()


denominator = users * books


sparsity = (1.0 - (stars_num *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  99.60% empty.


In [6]:
train, test = df_sp.randomSplit([0.8, 0.2], seed= 2020)

#### First Simple Model(FSM)

In [6]:
als = ALS(userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop')


model_fsm = als.fit(train)


In [7]:
predictions = model_fsm.transform(test)

In [8]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='number_of_stars',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('The RMSE is= ' + str(rmse))

The RMSE is= 1.5986875907003517


In [9]:
evaluator2 = RegressionEvaluator(metricName='mae', labelCol='number_of_stars',
                                predictionCol='prediction')
mae = evaluator2.evaluate(predictions)
print('The MAE is = ' + str(mae))

The MAE is = 1.2244768512854267


In [10]:
als2 = ALS(maxIter=5,rank=4, 
          userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop', seed= 2020)


model_2 = als2.fit(train)
predictions2 = model_2.transform(test)
predictions2.show()

+---------------+---------------+---------------+----------+
|user_unique_key|number_of_stars|book_unique_key|prediction|
+---------------+---------------+---------------+----------+
|          15108|              4|            148|  4.440096|
|          23425|              5|            148| 2.3717651|
|           1114|              4|            148| 1.5453693|
|          22818|              5|            148|  4.038414|
|          19906|              5|            148| 3.7037847|
|          24328|              4|            148|   3.44603|
|          19732|              4|            148| 3.6605346|
|           6728|              4|            148| 2.8172495|
|          19599|              4|            148| 3.8714411|
|          13901|              5|            148| 3.5737967|
|          28773|              5|            148| 3.1483269|
|          21987|              3|            148| 3.8977916|
|          16454|              3|            148| 4.5940084|
|          14344|       

In [13]:
rmse2 = evaluator.evaluate(predictions2)
print('The RMSE for Model 2 is= ' + str(rmse2))

The RMSE for Model 2 is= 1.9110262281206258


In [14]:
mae2 = evaluator2.evaluate(predictions2)
print('The MAE for Model 2 is = ' + str(mae2))

The MAE for Model 2 is = 1.3487552461758907


Model 2 did worse than the first model. So I will try and put the numbers higher than the automatic values of 

In [16]:
als3 = ALS(maxIter=15,rank=10, 
          userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop', seed= 2020)


model_3 = als3.fit(train)
predictions3 = model_3.transform(test)


In [17]:
rmse3 = evaluator.evaluate(predictions3)
print('The RMSE for Model 3 is= ' + str(rmse3))
mae3 = evaluator2.evaluate(predictions3)
print('The MAE for Model 3 is = ' + str(mae3))

The RMSE for Model 3 is= 1.3476289986585732
The MAE for Model 3 is = 1.04600046428953


In [19]:
als4 = ALS(maxIter=15,rank=15, 
          userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop', seed= 2020)


model_4 = als4.fit(train)
predictions4 = model_4.transform(test)

In [20]:
rmse4 = evaluator.evaluate(predictions4)
print('The RMSE for Model 4 is= ' + str(rmse4))
mae4 = evaluator2.evaluate(predictions4)
print('The MAE for Model 4 is = ' + str(mae4))

The RMSE for Model 4 is= 1.3560000712066078
The MAE for Model 4 is = 1.0610425236177448


In [21]:
als5 = ALS(maxIter=20,rank=10, 
          userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop', seed= 2020)

model_5 = als5.fit(train)
predictions5 = model_5.transform(test)

In [22]:
rmse5 = evaluator.evaluate(predictions5)
print('The RMSE for Model 5 is= ' + str(rmse5))
mae5 = evaluator2.evaluate(predictions5)
print('The MAE for Model 5 is = ' + str(mae5))

The RMSE for Model 5 is= 1.269203298147445
The MAE for Model 5 is = 0.9883590474245254


In [8]:
als6 = ALS(maxIter=25,rank=10, 
          userCol='user_unique_key', itemCol='book_unique_key', 
          ratingCol='number_of_stars',
          coldStartStrategy='drop', seed= 2020)

model_6 = als6.fit(train)
predictions6 = model_6.transform(test)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: org does not exist in the JVM

In [23]:
rmse6 = evaluator.evaluate(predictions6)
print('The RMSE for Model 6 is= ' + str(rmse6))
mae6 = evaluator2.evaluate(predictions6)
print('The MAE for Model 6 is = ' + str(mae6))

NameError: name 'predictions6' is not defined

In [11]:
(train, test) = df_sp.randomSplit([0.8, 0.2], seed= 2020)

In [9]:
als_model_reg = ALS(userCol='user_unique_key', itemCol='book_unique_key',ratingCol='number_of_stars',
                nonnegative=True,implicitPrefs=False, 
                coldStartStrategy='drop')



ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63844)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63844)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Trac

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:63844)

In [10]:
# pred = model_reg.transform(test)

In [13]:
evaluator_reg = RegressionEvaluator(metricName='rmse', labelCol='number_of_stars',
                                predictionCol='prediction')

In [9]:
params=ParamGridBuilder()

In [14]:
params = ParamGridBuilder().addGrid(als_model_reg.regParam, [0.01, .1, 1.5]) \
        .addGrid(als_model_reg.rank, [4, 10, 30]) \
        .addGrid(als_model_reg.maxIter, [5, 15, 30]).build()
    


In [10]:
cv= CrossValidator()

In [15]:

cv = CrossValidator(estimator=als_model_reg, 
                          estimatorParamMaps=params,
                          evaluator=evaluator_reg,
                          numFolds= 5)

In [14]:
print(crossval)

CrossValidator_508cc021c8aa


In [16]:
model= cv.fit(train)  
#type(crossval)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 62644)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/accumulators.py", line 245, in accum_updates
  

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:62631)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62631)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62631)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Trac

In [None]:
best_model = modelv.bestModel

In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:
# Print "Rank"
print(" The best model Rank is: ", best_model.getRank())

print(" The best model MaxIter is:", best_model.getMaxIter())


print("  RegParam:", best_model.getRegParam())

## Evaluation

Cross validation is....

in the context of the project it means...

In [None]:
# close spark session

spark.stop()