This code uses the training, validation and test sets created on the notebook: Implicit_Rating_Calculation_category.ipynb to create an Collaborative Filtering (CF) model using Alternating Least Squares (ALS) from Pyspark.

In [1]:
# Loading needed pyspark libraries
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import StringIndexer

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1747,application_1570493391423_71703,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Initializing spark context and setting up checkpoint directory
sc = spark.sparkContext
sc.setCheckpointDir("s3a://myaws-capstone-bucket/data/modeling/checkpointdir")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Reading train, validation and test datasets for implicit_cat data
train_file ='s3://myaws-capstone-bucket/data/modeling/input/implicit_cat_rating_train.csv'
df_train =spark.read.option("header","true").csv(train_file)

val_train_file ='s3://myaws-capstone-bucket/data/modeling/input/implicit_cat_rating_val_train.csv'
df_train_val =spark.read.option("header","true").csv(val_train_file)

test_file ='s3://myaws-capstone-bucket/data/modeling/input/implicit_cat_rating_test.csv'
df_test =spark.read.option("header","true").csv(test_file)

val_test_file ='s3://myaws-capstone-bucket/data/modeling/input/implicit_cat_rating_val_test.csv'
df_test_val =spark.read.option("header","true").csv(val_test_file)

df_train.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+-------------------+---------------+-----+
|  user_id|            category|        category_id|implicit_rating|catID|
+---------+--------------------+-------------------+---------------+-----+
|512823699|22327321097960168...|2232732109796016868|              2|  803|
|543131199|22327321350541151...|2232732135054115162|              2|  916|
|519321120|20530135556318826...|2053013555631882655|              2|  107|
|567347471|22327321010634757...|2232732101063475749|              2|  725|
|566466570|20530135563113599...|2053013556311359947|              2|  126|
|513413314|20530135616632921...|2053013561663292159|              1|  274|
|616301238|22327320930775207...|2232732093077520756|              2|  668|
|642449975|20530135531656317...|2053013553165631753|              3|   40|
|572148514|20530135659834255...|2053013565983425517|              2|  375|
|537109818|22327320922973801...|2232732092297380188|              2|  662|
|515850364|20530135660337

In [4]:
# Splitting product column to obtain numeric id since PySpark expects all columns for model to be integer type

# Transforming user_id and catID and rating to integer types since PySpark expects all columns for model to be integer type
df_train = df_train.withColumn("user_id", df_train["user_id"].cast(IntegerType()))
df_train = df_train.withColumn("catID", df_train["catID"].cast(IntegerType()))
df_train = df_train.withColumn("implicit_rating", df_train["implicit_rating"].cast(IntegerType()))

df_train.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+-------------------+---------------+-----+
|  user_id|            category|        category_id|implicit_rating|catID|
+---------+--------------------+-------------------+---------------+-----+
|514958895|20530135530565798...|2053013553056579841|              2|   36|
|555710084|22327320930775207...|2232732093077520756|              3|  668|
|608957617|22327320953927766...|2232732095392776612|              1|  679|
|558103411|20530135659834255...|2053013565983425517|              2|  375|
|534158155|20530135602372287...|2053013560237228724|              2|  241|
|625373582|22327320930775207...|2232732093077520756|              2|  668|
|642802709|22327320930775207...|2232732093077520756|              2|  668|
|597183225|20530135638359417...|2053013563835941749|              2|  328|
|560866052|20530135552627838...|2053013555262783879|              2|   97|
|566739264|22327320930775207...|2232732093077520756|              2|  668|
|517640654|22327321024140

In [5]:
# Checking type of columns
df_train.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('user_id', 'int'), ('category', 'string'), ('category_id', 'string'), ('implicit_rating', 'int'), ('catID', 'int')]

In [6]:
# Repeating process for validation and testing dataframes


# Transforming user_id and catID and rating to integer types since PySpark expects all columns for model to be integer type
df_train_val = df_train_val.withColumn("user_id", df_train_val["user_id"].cast(IntegerType()))
df_train_val = df_train_val.withColumn("catID", df_train_val["catID"].cast(IntegerType()))
df_train_val = df_train_val.withColumn("implicit_rating", df_train_val["implicit_rating"].cast(IntegerType()))

# Transforming user_id and catID and rating to integer types since PySpark expects all columns for model to be integer type
df_test = df_test.withColumn("user_id", df_test["user_id"].cast(IntegerType()))
df_test = df_test.withColumn("catID", df_test["catID"].cast(IntegerType()))
df_test = df_test.withColumn("implicit_rating", df_test["implicit_rating"].cast(IntegerType()))

# Transforming user_id and catID, and rating to integer types since PySpark expects all columns for model to be integer type
df_test_val = df_test_val.withColumn("user_id", df_test_val["user_id"].cast(IntegerType()))
df_test_val = df_test_val.withColumn("catID", df_test_val["catID"].cast(IntegerType()))
df_test_val = df_test_val.withColumn("implicit_rating", df_test_val["implicit_rating"].cast(IntegerType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_train_val.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+-------------------+---------------+-----+
|  user_id|            category|        category_id|implicit_rating|catID|
+---------+--------------------+-------------------+---------------+-----+
|643746492|20530135661428090...|2053013566142809077|              2|  378|
|568638108|20530135556318826...|2053013555631882655|              2|  107|
|575511213|20530135546588040...|2053013554658804075|              2|   84|
|590989339|20530135553215041...|2053013555321504139|              1|   99|
|600770607|20530135605308300...|2053013560530830019|              2|  249|
|515252388|22327320930775207...|2232732093077520756|              2|  668|
|619320693|22327320790098248...|2232732079009824823|              2|  602|
|512405566|20530135550950117...|2053013555095011711|              1|   93|
|524877776|22327320930775207...|2232732093077520756|              2|  668|
|596125073|22327320735740077...|2232732073574007777|              1|  594|
|545955421|22327320930775

In [8]:
# Building a generic ALS CF model 
als = ALS(userCol="user_id", itemCol="catID", ratingCol="implicit_rating",
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

# Performing hyperparameter tuning with ParamGrid that will try for each of the different parameter combinations
param_grid = ParamGridBuilder().addGrid(als.rank, [1, 5, 10]).addGrid(als.maxIter, [5, 10, 20]).addGrid(als.regParam, [0.05, 0.1, 0.5]).build()

# Evaluator to evaluate the performance of the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="implicit_rating", predictionCol="prediction")

# Using CrossValidator to perform cross validation 
cv = CrossValidator(estimator = als,
estimatorParamMaps = param_grid,
evaluator = evaluator,
numFolds = 5)# Performing 5 folds

# Running the CV on the validation training dataset
model = cv.fit(df_train_val)

# Extracting the best model from cross validation
best_model = model.bestModel

# Make predictions on the validation test set and evaluate using RMSE
predictions = best_model.transform(df_test_val)
rmse = evaluator.evaluate(predictions)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Obtaining the evaluation metrics and model parameters
print ("Best Model Parameters")
print ("RMSE: " + str(rmse))
print (" Rank: " + str(best_model.rank))
print (" Max Iter: " + str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam: " + str(best_model._java_obj.parent().getRegParam())) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best Model Parameters
RMSE: 0.7171618259224922
 Rank: 1
 Max Iter: 20
 RegParam: 0.1

In [10]:
# Building the recommendation model using ALS best parameter model
als = ALS(maxIter=20, rank= 1, regParam=0.1, implicitPrefs=False,coldStartStrategy="drop",
          userCol='user_id', itemCol='catID', ratingCol='implicit_rating')

model = als.fit(df_train)
predictions = model.transform(df_test)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root-mean-square error = 0.6956542505471972

In [11]:
# Generating top 10 recommendations for each user
recommendations = model.recommendForAllUsers(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Viewing recommendations df schema to know which columns to select when formatting the df
recommendations.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- catID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

In [13]:
# Formatting results
recommendations_output = recommendations.select('user_id',F.posexplode('recommendations')).select('user_id','col.catID','pos','col.rating').checkpoint()
recommendations_output.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+---+---------+
|  user_id|catID|pos|   rating|
+---------+-----+---+---------+
|468371772|  416|  0|3.1851895|
|468371772|  570|  1|2.8981311|
|468371772|  668|  2|2.3996842|
|468371772|  577|  3| 2.312127|
|468371772|  742|  4|2.2705405|
|468371772|  117|  5|2.2208114|
|468371772|  662|  6|2.1889753|
|468371772|  714|  7|2.1717632|
|468371772|  725|  8|2.1682422|
|468371772|  657|  9|2.1342862|
|512372691|  416|  0|2.4433029|
|512372691|  570|  1|2.2231054|
|512372691|  668|  2|1.8407556|
|512372691|  577|  3|1.7735921|
|512372691|  742|  4|1.7416917|
|512372691|  117|  5|1.7035456|
|512372691|  662|  6|1.6791246|
|512372691|  714|  7|1.6659214|
|512372691|  725|  8|1.6632206|
|512372691|  657|  9|1.6371735|
+---------+-----+---+---------+
only showing top 20 rows

In [14]:
# Saving results to S3 - Using coalesce function to merge all the small files Spark creates into one so that there is only one final file
recommendations_output.coalesce(1).write.option("quote","\u0000").csv("s3a://myaws-capstone-bucket/data/modeling/ALS/results",mode="overwrite",sep=',')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…