## 1. Loading functions for this project

In [1]:
import pyspark
import pandas as pd
import numpy as np
import math
import json
import csv

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, column
from pyspark.sql.functions import expr
from pyspark.sql.functions import split
from pyspark.sql import Row
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, IndexToString

## 2. Loading the dataset & data cleaning 
- 2.1 For data cleaning, we have changed the column name and changed the txt-type data into a csv-type file. 

In [3]:
# Initialize a list to hold the parsed JSON objects
# Define the column names you want to extract
columns = ["reviewerID", "asin", "overall"]

# Initialize a list to hold the parsed JSON objects
json_data = []

# Read the JSON data from the file, line by line
with open('reviews_Movies_and_TV_5.txt', 'r') as file:
    for line in file:
        # Parse the JSON data and append the selected columns to the list
        data = json.loads(line)
        json_data.append({key: data[key] for key in columns})

# Open a new CSV file for writing
with open('output.csv', 'w', newline='') as csvfile:
    # Create a CSV writer object
    csv_writer = csv.writer(csvfile)

    # Write the header row
    csv_writer.writerow(columns)

    # Write each row of data
    for item in json_data:
        csv_writer.writerow(item.values())

In [51]:
df = pd.read_csv("output.csv", header = None, names=['reviewerID','productID', 'rating'])

sampled_csv_data = df.sample(n=10000, random_state=1)

# Save the sampled data to a new CSV file
sampled_file_path = 'sample10000.csv'
sampled_csv_data.to_csv(sampled_file_path, index=False)


## 3. Create a Spark Session to use the DataFrame

In [52]:
ss=SparkSession.builder.appName("Mini Project ALS-based Recommendation Systems").getOrCreate()

In [53]:
ss.sparkContext.setCheckpointDir("~/scratch")

## 4. Creating a rating_DF and rating_RDD 

In [54]:
rating_schema = StructType([ StructField("reviewerID", StringType(), False ), \
                            StructField("productID", StringType(), False), \
                            StructField("rating", FloatType(), True ), \
                           ])

In [55]:
ratings_DF = ss.read.csv("sample1000.csv", schema=rating_schema, header=True, inferSchema=False)

In [56]:
ratings_DF.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- productID: string (nullable = true)
 |-- rating: float (nullable = true)



In [57]:
ratings2_DF = ratings_DF.select("reviewerID","productID","rating")

In [58]:
ratings2_DF.first()

Row(reviewerID='A3OL1AX1IODBYL', productID='B001NK74AU', rating=5.0)

In [59]:
ratings2_RDD = ratings2_DF.rdd
ratings2_RDD.take(3)

[Row(reviewerID='A3OL1AX1IODBYL', productID='B001NK74AU', rating=5.0),
 Row(reviewerID='A1A1PSZU6SLW9O', productID='B00005U0JX', rating=5.0),
 Row(reviewerID='A1G6NL5AQMZDAC', productID='B000BOH8Z0', rating=3.0)]

## 5. DataCleaning 
- labeling each reviewerID and ProductID a unique integer for furture mapreduce process

In [60]:
labelIndexer= StringIndexer(inputCol="reviewerID", outputCol="reviewerID_int").fit(ratings2_DF)

In [61]:
labelIndexer

StringIndexerModel: uid=StringIndexer_a42dc29b7f99, handleInvalid=error

In [62]:
transformed_data = labelIndexer.transform(ratings2_DF)

In [63]:
transformed_data.show(4)

+--------------+----------+------+--------------+
|    reviewerID| productID|rating|reviewerID_int|
+--------------+----------+------+--------------+
|A3OL1AX1IODBYL|B001NK74AU|   5.0|          78.0|
|A1A1PSZU6SLW9O|B00005U0JX|   5.0|        1445.0|
|A1G6NL5AQMZDAC|B000BOH8Z0|   3.0|        1790.0|
|A28IP5K9LFL7AV|B001J66JQS|   5.0|        3415.0|
+--------------+----------+------+--------------+
only showing top 4 rows



In [64]:
labelIndexer= StringIndexer(inputCol="productID", outputCol="productID_int").fit(transformed_data)

In [65]:
labelIndexer

StringIndexerModel: uid=StringIndexer_da4a12210a73, handleInvalid=error

In [66]:
transformed_data_1 = labelIndexer.transform(transformed_data)

In [67]:
transformed_data_1.show(4)

+--------------+----------+------+--------------+-------------+
|    reviewerID| productID|rating|reviewerID_int|productID_int|
+--------------+----------+------+--------------+-------------+
|A3OL1AX1IODBYL|B001NK74AU|   5.0|          78.0|       1399.0|
|A1A1PSZU6SLW9O|B00005U0JX|   5.0|        1445.0|       3669.0|
|A1G6NL5AQMZDAC|B000BOH8Z0|   3.0|        1790.0|       1244.0|
|A28IP5K9LFL7AV|B001J66JQS|   5.0|        3415.0|       5719.0|
+--------------+----------+------+--------------+-------------+
only showing top 4 rows



In [68]:
data = transformed_data_1.select("reviewerID_int","productID_int","rating")

In [69]:
data.show(4)

+--------------+-------------+------+
|reviewerID_int|productID_int|rating|
+--------------+-------------+------+
|          78.0|       1399.0|   5.0|
|        1445.0|       3669.0|   5.0|
|        1790.0|       1244.0|   3.0|
|        3415.0|       5719.0|   5.0|
+--------------+-------------+------+
only showing top 4 rows



In [70]:
data.printSchema()

root
 |-- reviewerID_int: double (nullable = false)
 |-- productID_int: double (nullable = false)
 |-- rating: float (nullable = true)



## 6. Splitting the data into training, validation, and testing sets. 

In [71]:
training_RDD, validation_RDD, test_RDD = data.rdd.randomSplit([3.0,1.0,1.0], 19)

In [72]:
training_input_RDD = training_RDD.map(lambda x: (x[0], x[1]) )
validation_input_RDD = validation_RDD.map(lambda x: (x[0], x[1]) ) 
testing_input_RDD = test_RDD.map(lambda x: (x[0], x[1]) )

## 7. Training the ALS model using a randomly assigned hyperparameter.

In [73]:
model = ALS.train(training_RDD, 4, seed=17, iterations=30, lambda_=0.1)

In [74]:
training_prediction_RDD = model.predictAll(training_input_RDD)

In [75]:
training_prediction_RDD.take(4)

[Rating(user=4904, product=336, rating=4.906144581712827),
 Rating(user=1084, product=4380, rating=4.910743992547013),
 Rating(user=3764, product=557, rating=4.911909577537192),
 Rating(user=7608, product=6930, rating=3.9053015442246846)]

In [76]:
training_RDD.take(3)

[Row(reviewerID_int=78.0, productID_int=1399.0, rating=5.0),
 Row(reviewerID_int=21.0, productID_int=6941.0, rating=3.0),
 Row(reviewerID_int=1734.0, productID_int=450.0, rating=5.0)]

In [77]:
training_target_output_RDD = training_RDD.map(lambda x: ( (x['reviewerID_int'], x['productID_int']), x['rating'] ) )

In [78]:
training_target_output_RDD.take(3)

[((78.0, 1399.0), 5.0), ((21.0, 6941.0), 3.0), ((1734.0, 450.0), 5.0)]

In [79]:
training_prediction2_RDD = training_prediction_RDD.map(lambda x: (( x[0], x[1]), x[2] ))

In [80]:
training_prediction2_RDD.take(3)

[((4904, 336), 4.906144581712827),
 ((1084, 4380), 4.910743992547013),
 ((3764, 557), 4.911909577537192)]

In [81]:
training_evaluation_RDD = training_target_output_RDD.join(training_prediction2_RDD)

In [82]:
training_evaluation_RDD.take(3)

[((78.0, 1399.0), (5.0, 4.914347743893639)),
 ((21.0, 6941.0), (3.0, 2.8990734581963244)),
 ((1096.0, 2093.0), (5.0, 4.910744263848937))]

In [83]:
training_error = math.sqrt(training_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2).mean())

In [84]:
print(training_error)

0.0939684405038181


In [85]:
validation_prediction_RDD = model.predictAll(validation_input_RDD).map(lambda x: ( (x[0], x[1]), x[2] ) )

In [86]:
validation_prediction_RDD.take(3)

[((140, 196), -2.308392890131816),
 ((4, 248), -2.52559004420781),
 ((4, 338), -2.7648625313100474)]

In [87]:
validation_evaluation_RDD = validation_RDD.map(lambda y: ( ( y[0], y[1]), y[2] ) ).join(validation_prediction_RDD)

In [88]:
validation_evaluation_RDD.take(3)

[((638.0, 201.0), (2.0, 1.0823960309728573)),
 ((804.0, 104.0), (5.0, 0.3669750270530301)),
 ((560.0, 58.0), (5.0, -0.13069524042198544))]

In [89]:
validation_error = math.sqrt(validation_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2).mean())

In [90]:
print(validation_error)

4.887780122322847


## 8. Hyperparameter Tuning 

In [91]:
## Initialize a Pandas DataFrame to store evaluation results of all combination of hyper-parameter settings
hyperparams_eval_df = pd.DataFrame( columns = ['k', 'regularization', 'iterations', 'validation RMS', 'testing RMS'] )
# initialize index to the hyperparam_eval_df to 0
index =0 
# initialize lowest_error
lowest_validation_error = float('inf')
# Set up the possible hyperparameter values to be evaluated
iterations_list = [15, 30]
regularization_list = [0.1, 0.2, 0.3]
rank_list = [4, 7, 10, 13]
for k in rank_list:
    for regularization in regularization_list:
        for iterations in iterations_list:
            seed = 37
            # Construct a recommendation model using a set of hyper-parameter values and training data
            model = ALS.train(training_RDD, k, seed=17, iterations=30, lambda_=0.1)
            # Evaluate the model using evalution data
            # map the output into ( (userID, movieID), rating ) so that we can join with actual evaluation data
            # using (userID, movieID) as keys.
            validation_prediction_RDD= model.predictAll(validation_input_RDD).map(lambda x: ( (x[0], x[1]), x[2] )   )
            validation_evaluation_RDD = validation_RDD.map(lambda y: ( (y[0], y[1]), y[2] ) ).join(validation_prediction_RDD)
            # Calculate RMS error between the actual rating and predicted rating for (userID, movieID) pairs in validation dataset
            validation_error = math.sqrt(validation_evaluation_RDD.map(lambda z: (z[1][0] - z[1][1])**2).mean())
            # Save the error as a row in a pandas DataFrame
            hyperparams_eval_df.loc[index] = [k, regularization, iterations, validation_error, float('inf')]
            index = index + 1
            # Check whether the current error is the lowest
            if validation_error < lowest_validation_error:
                best_k = k
                best_regularization = regularization
                best_iterations = iterations
                best_index = index - 1
                lowest_validation_error = validation_error
print('The best rank k is ', best_k, ', regularization = ', best_regularization, ', iterations = ',\
      best_iterations, '. Validation Error =', lowest_validation_error)

The best rank k is  10 , regularization =  0.1 , iterations =  15 . Validation Error = 4.209011096705229


## 8. Evaluate the best hyperparameter combination using testing data 

In [92]:
seed = 37
model = ALS.train(training_RDD, best_k, seed=seed, iterations=15, lambda_=0.1)
testing_prediction_RDD=model.predictAll(testing_input_RDD).map(lambda x: ((x[0], x[1]), x[2]))
testing_evaluation_RDD= test_RDD.map(lambda x: ((x[0], x[1]), x[2])).join(testing_prediction_RDD)
testing_error = math.sqrt(testing_evaluation_RDD.map(lambda x: (x[1][0]-x[1][1])**2).mean())
print('The Testing Error for rank k =', best_k, ' regularization = ', best_regularization, ', iterations = ', \
      best_iterations, ' is : ', testing_error)

The Testing Error for rank k = 10  regularization =  0.1 , iterations =  15  is :  4.366579976329201


In [93]:
print(best_index)

12


In [94]:
# Store the Testing RMS in the DataFrame
hyperparams_eval_df.loc[best_index]=[best_k, best_regularization, best_iterations, lowest_validation_error, testing_error]

In [95]:
schema3= StructType([ StructField("k", FloatType(), True), \
                      StructField("regularization", FloatType(), True ), \
                      StructField("iterations", FloatType(), True), \
                      StructField("Validation RMS", FloatType(), True), \
                      StructField("Testing RMS", FloatType(), True) \
                    ])

In [96]:
HyperParams_RMS_DF = ss.createDataFrame(hyperparams_eval_df, schema3)

In [97]:
output_path = "/storage/home/wml5207/'MiniProject '/MiniProjectALSHyperParamTuning"
HyperParams_RMS_DF.write.option("header", True).csv(output_path)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/storage/home/wml5207/'MiniProject '/MiniProjectALSHyperParamTuning already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
ss.stop()