Copyright (c) Microsoft Corporation. 
Licensed under the MIT license. 
## Model Training Script for Synapse-AI-Retail-Recommender  
Model Author (Data Scientist): Xiaoyong Zhu  
  
This script is an adapted script of the full Model Training script that can be found in `4. ML Model Building`. This is a slimmed down version that only has the required operations for producing a model that the Model Deployment Process and the RecommendationRefresh notebook can consume.

In [None]:
import sys
print(sys.version)

In [None]:
# import libraries
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil import parser
from pyspark.sql.functions import unix_timestamp

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

import azureml.core
from azureml.core import Workspace
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.core.run import Run
from azureml.core.experiment import Experiment
from azureml.core.model import Model
import os
import shutil
from shutil import rmtree
import json
import pprint

## Connect To Azure Machine Learning Workspace Using Service Principal

In [None]:
subscription_id = ''
workspace_name = ''
tenant_id = ''
service_principal_id = ''
service_principal_password = ''

# Service Principal Authentication
sp = ServicePrincipalAuthentication(tenant_id = tenant_id, # tenantID
                                    service_principal_id = service_principal_id, # clientId
                                    service_principal_password = service_principal_password) # clientSecret

# Connect to your Azure Machine Learning Workspace using the Service Principal
ws = Workspace.get(name = workspace_name, 
                   auth = sp,
                   subscription_id = subscription_id)

print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

## Data Ingestion
Read Spark table as a Spark dataframe

In [None]:
df = spark.read.table("retailaidb.cleaned_dataset")
spark.sparkContext.setCheckpointDir('checkpoint/')

In [None]:
# Filter only for Electronics items

df = df.withColumn('category_code_new', df['category_code'].substr(0, 11))
df = df.filter("category_code_new = 'electronics'")

In [None]:
top_category = df.groupBy('category_code_new').count().sort('count', ascending=False).limit(5) # only keep top 5 categories
top_category = top_category.withColumnRenamed("category_code_new","category_code_tmp")

In [None]:
item_to_save = df.groupBy('product_id', "category_code").count().sort('count', ascending=False)

In [None]:
item_to_save = item_to_save.join(top_category, top_category.category_code_tmp == item_to_save.category_code).limit(20)

In [None]:
raw_df = df

product_count = df.groupBy('product_id').count()
product_count = product_count.filter("count >= 30000").orderBy('count', ascending=False) # only counts when the product has 30000 views

raw_df = raw_df.withColumnRenamed("product_id","product_id_tmp")
raw_df = raw_df.join(product_count, raw_df.product_id_tmp == product_count.product_id)

user_count = df.groupBy('user_id').count()
user_count = user_count.filter("count >= 200").orderBy('count', ascending=False) # only counts when the user has 200 clicks

raw_df = raw_df.withColumnRenamed("user_id","user_id_tmp")
raw_df = raw_df.join(user_count, raw_df.user_id_tmp == user_count.user_id)

df = raw_df

df = df.where(df.event_type == "view")
df = df.drop("event_time","category_code","user_session","price","brand","category_id")
df = df.groupBy([df.product_id, df.user_id]).count()

In [None]:
# save table for further use
df.write.saveAsTable("retailaidb.cleaned_dataset_electronics", mode="overwrite")

In [None]:
df = df.withColumn("user_id", df["user_id"].cast(IntegerType()))
df = df.withColumn("product_id", df["product_id"].cast(IntegerType()))

In [None]:
#split the data into training and test datatset
train,test=df.randomSplit([0.75,0.25])

In [None]:
os.path.join(os.getcwd())

In [None]:
# define variables for experiment, model name, file path, seed value
experiment_name = 'retail_ai_experiment'
model_name = 'retailai_recommendation_model.pkl'
model_path = os.path.join(os.path.join(os.getcwd()), model_name)
random_seed_val = 12345

# start a training run by defining an experiment
experiment = Experiment(workspace = ws, name = experiment_name)
run = experiment.start_logging()

# create an ALS recommender
maxIter = 40
regParam = 0.20
rank = 25
rec = ALS(maxIter = maxIter,regParam = regParam, rank = rank, implicitPrefs = True, userCol = 'user_id', itemCol = 'product_id', \
          ratingCol = 'count', nonnegative = True, coldStartStrategy = 'drop')

# fit the model on train set
rec_model = rec.fit(train)
# making predictions on test set 
predicted_ratings = rec_model.transform(test)

# create Regressor evaluator object for measuring accuracy
evaluator = RegressionEvaluator(metricName = 'rmse', predictionCol = 'prediction', labelCol = 'count')
# apply the RE on predictions dataframe to calculate RMSE
rmse = evaluator.evaluate(predicted_ratings)

# log hyperparameters and evaluation metrics to Azure ML
run.log('maxIter', maxIter)
run.log('regParam', regParam)
run.log('rank', rank)
run.log('RMSE', rmse)
run.log_list('columns', train.columns)

# save model
rec_model.write().overwrite().save("retailai_recommendation_model")

# Declare run completed
run.complete()
run_id = run.id
print ("run id:", run.id)

predicted_ratings.printSchema()

In [None]:
# view current run in Azure ML
run

In [None]:
# query metrics tracked
pprint.pprint(run.get_metrics(recursive = True))

In [None]:
predicted_ratings_witherr = predicted_ratings.withColumn('err',abs(predicted_ratings["prediction"] - predicted_ratings["count"]))