In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd

In [None]:
# Build the SparkSession
# spark = SparkSession.builder.master("local").appName("ALS Model").config("spark.executor.memory", "20G").getOrCreate()

spark = SparkSession.builder.master("local") \
        .appName("ALS Model") \
        .config("spark.executor.memory", "14G") \
        .config("spark.executor.cores", "8") \
        .config("spark.driver.memory",'13G')\
        .config("spark.driver.cores", "4") \
        .config("spark.memory.fraction", 0.8) \
        .config("spark.rpc.message.maxSize","400")\
        .config("spark.default.parallelism", "800")\
        .config("spark.sql.shuffle.partitions", "800")\
        .getOrCreate()

In [None]:
# Create the Checkpoint Directory
spark.sparkContext.setCheckpointDir('***********')

In [None]:
df = spark.read.option('header','true')\
.option('inferSchema','true')\
.csv('********.csv')

In [None]:
# Rename
df = df.withColumnRenamed("Quantity Delivered (No UOM)", "Rating") \
   .withColumnRenamed("CustomerID","CustomerID")\
   .withColumnRenamed("StockCode","ItemID")\
   .withColumnRenamed("Description","Description")

In [None]:
# Change the type of columns
df = df.withColumn("CustomerID", df["CustomerID"].cast(IntegerType())) \
   .withColumn("ItemID", df["ItemID"].cast(IntegerType())) \
   .withColumn("Rating",df["Rating"].cast(FloatType()))

In [None]:
# Select the 4 columns
dataset = df.select('CustomerID','ItemID','Rating','Description')

In [None]:
# Filter the Decription is not equal to %NW_Online%
NW_Filter = dataset.filter("Description not like '%NW_Online%'")
NW_Filter.show()

In [None]:
# Rename
dataset_Filter = NW_Filter.filter("CustomerID is not NULL and ItemID is not NULL and Rating is not NULL")
# Select 3 columns without "Description"
dataset = dataset_Filter.select('CustomerID','ItemID','Rating')

In [None]:
# Select 4 columns with "Description" for the result
dataset_4Columns = dataset_Filter.select('CustomerID','ItemID','Rating','Description')
dataset_4Columns.show(20,False)

dataset.show()
dataset.printSchema()
dataset.describe().show()

In [None]:
# splitting into train and test sets
Train, Test = dataset.randomSplit([0.8, 0.2])

In [None]:
# Create the ALS model
als = ALS(userCol= "CustomerID", itemCol= "ItemID", ratingCol= "Rating",
          coldStartStrategy="drop", nonnegative= True)
# Tune model using ParamGridBuilder
param_grid = ParamGridBuilder()\
               .addGrid(als.rank, [150])\
               .addGrid(als.maxIter, [50])\
               .addGrid(als.regParam, [0.1])\
               .build()

In [None]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2)

In [None]:
# Fit ALS model to training data
model = cv.fit(Train)

In [None]:
# Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

# Save the model
best_model.save('********')

# Load the data
best_model = ALSModel.load('********')

In [None]:
# Get the predictions on data
predictions = model.transform(Test)

# Generate predicions and evaluate using RMSE
predictions = best_model.transform(Test)
rmse = evaluator.evaluate(predictions)

print("RMSE =" + str(rmse))
print("**Best Model**")
print("Rank:"), best_model.rank
print("Rank:"), best_model._java_obj.parent().getMaxIter()
print("Rank:"), best_model._java_obj.parent().getRegParam()

In [None]:
# Select one user's previous Item
Filter_user = dataset_4Columns.filter("CustomerID == ***** ")
Filter_user.sort("Rating",ascending=False).show(10000, False)
dataset_4Columns.show()
# Display the Result
predictions.sort("CustomerID","Rating").show()

In [None]:
# Select one user's recommendation
# Get the top 10 recommend for All users But hard to read..
user_recs = best_model.recommendForAllUsers(20)
user_recs = best_model.recommendForUsers(20)
user_recs.show()
# Save to csv
user_recs.toPandas().to_csv('*****.csv')
# Find one user's recommendations
recs = user_recs.filter("CustomerID == *****")
recs.show()

In [None]:
# After modeling, we get the final output for one customer && More readable!!
def get_recs_for_user(recs, dataset_4Columns):
    # Recs should be for a specific user.
    recs = recs.select("CustomerID","recommendations.ItemID", "recommendations.rating")
    Customers = recs.select("CustomerID").toPandas().iloc[0, 0]
    Items = recs.select("ItemID").toPandas().iloc[0,0]
    ratings = recs.select("rating").toPandas().iloc[0, 0]
    ratings_matrix = pd.DataFrame(Items, columns = ["ItemID"])
    ratings_matrix["ratings"] = ratings
    ratings_matrix["CustomerID"] = Customers
    ratings_matrix_ps = spark.createDataFrame(ratings_matrix)
    # Join two tables and get the "Description"
    df1 = ratings_matrix_ps.alias('df1')
    df2 = dataset_4Columns.alias('df2')
    RS_datafram = df1.join(df2, df1.ItemID == df2.ItemID).select('df1.*', 'df2.Description')
    return RS_datafram.distinct().sort("ratings", ascending=False).show(20, False)
# Apply to see the Result

get_recs_for_user(recs, dataset_4Columns)