In [None]:
import os
def setup_my_environment():
    import os
    
def setenv(var, val):
    os.environ[var] = val

def prepend_path(var, val):
    old_val = os.environ.get(var, '')
    os.environ[var] = val + ":" + old_val
def setup_java():
    PKG_ROOT='/ichec/packages/java/8'
    setenv('JAVA_PATH', PKG_ROOT)
    setenv('JAVA_HOME', PKG_ROOT)
    prepend_path('PATH', PKG_ROOT + '/bin')
    prepend_path('MANPATH', PKG_ROOT + '/man')
    prepend_path('CPATH', PKG_ROOT + '/include')
def setup_spark():
    PKG_ROOT='/ichec/packages/spark/2.3.3/kay/spark-2.3.3'
    setenv('SPARK_DIST_CLASSPATH', PKG_ROOT + 'spark-2.3.3-bin-kay-spark')
    prepend_path('PATH', PKG_ROOT + PKG_ROOT + 'spark-2.3.3-bin-kay-spark/bin')
          
setup_java()
setup_spark()
setup_my_environment()
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "35g") \
    .config("spark.executor.cores","3")\
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.instance","12")\
    .config('spark.sql.shuffle.partitions',"128")\
    .config("spark.sql.crossJoin.enabled","true")\
    .config("spark.debug.maxToStringsFields","100")\
    .appName("AmazonCrossSelling") \
    .getOrCreate()


In [None]:
df1 = spark.read.json("/ichec/work/mucom001c/Amazon/review/All_Beauty.json")
df2 = spark.read.json("/ichec/work/mucom001c/Amazon/review/AMAZON_FASHION.json")

In [None]:
df = df1.join(df2,["asin","image","overall","reviewText","reviewTime","reviewerID","reviewerName","summary","unixReviewTime","verified","vote"],"full_outer").drop("style")
df.show(5)

df.dtypes

In [None]:
import pyspark.sql.functions as F
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df = df.fillna({'vote':'1'})
df = df.withColumn('verified', F.when(df.verified == 'false', 0).otherwise(1))
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
df_new = df.select('asin','overall','reviewerID','verified','vote')
feature_group = ["reviewerID","asin","verified"]
df_avg =df.groupby(feature_group).agg(F.mean("overall").alias("Average"))
df_count = df.groupby(feature_group).count()
df_final = df_avg.join(df_count,feature_group)
df_vote = df.groupby(feature_group).agg(F.sum("vote").alias("Total_Votes"))
df_final = df_final.join(df_vote,feature_group)
df_final = df_final.join(df_new,feature_group)
df_final.dtypes

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
revIDindexer = StringIndexer(inputCol="reviewerID", outputCol="reviewerID_index")
asinindexer = StringIndexer(inputCol="asin",outputCol="asin_index")
pipeline = Pipeline(stages=[asinindexer,revIDindexer])
df_final = pipeline.fit(df_final).transform(df_final)

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
for i in ["reviewerID_index","asin_index","count","Average","Total_Votes"]:
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect",handleInvalid = "skip")
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    pipeline = Pipeline(stages=[assembler, scaler])
    df_final = pipeline.fit(df_final).transform(df_final).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator
vecAssembler = VectorAssembler(inputCols=["reviewerID_index_Scaled","asin_index_Scaled","count_Scaled","Average_Scaled","Total_Votes_Scaled"], outputCol="features",handleInvalid = "skip")
new_df = vecAssembler.transform(df_final)
new_df=new_df.drop('count','Total_Votes','reviewerID_index','asin_index')


In [None]:
wcss = []
list_k1 = []
list_k = list(range(2,10))
for i in list_k:
    kmeans = KMeans(maxIter = 3).setK(i).setSeed(1)
    model_kmeans = kmeans.fit(new_df)
    list_k1.append(list_k)
    wcss.append(model_kmeans.computeCost(new_df))
    print("Within Set Sum of Squared Errors = " + str(wcss))
    print("Value of k = " + str(i))

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(6, 6))
plt.plot(list_k, wcss,marker = 'o')
plt.xlabel(r'Number of clusters *k*')
plt.ylabel('Sum of squared distance')
plt.title('Elbow Method')
plt.show()

In [None]:
#kmeans algorithm
kmeans = KMeans(maxIter = 5).setK(5).setSeed(1).setPredictionCol("cluster_prediction")
model_kmeans = kmeans.fit(new_df)

In [None]:
# calculating Silhouette distance
evaluator_kmeans = ClusteringEvaluator().setPredictionCol("cluster_prediction")
df_predictions = model_kmeans.transform(new_df)
silhouette_kmeans = evaluator_kmeans.evaluate(df_predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette_kmeans))

In [None]:
revID = input("Enter your reviewerID:")

In [None]:
cluster = df_predictions.filter(df_predictions['reviewerID'] == revID).select('cluster_prediction').distinct()

In [None]:
df_predictions.createOrReplaceTempView("KmeansAmazonDataset")

In [None]:
df3 = spark.sql("SELECT asin from KmeansAmazonDataset where Average >= 4 and cluster_prediction = '{0}'".format(cluster.first().cluster_prediction))

In [None]:
#starting Naive Bayes
# Creating Label
from pyspark.sql.functions import *
from pyspark.sql.functions import when
df_final = df_final.withColumn(
    'label',
     when((col("overall").between(4, 5)),1.0).when((col("overall").between(0,3)),0.0)
)

In [None]:
(training_df,test_df)=df_final.randomSplit([0.6, 0.4])

In [None]:
#Naive Bayes algorithm for recommendation of products cross validation with various smoothing values.
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
vecAssembler_NB = VectorAssembler(inputCols=["asin_index", "reviewerID_index"], outputCol="features")
nb = NaiveBayes(modelType="multinomial")
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()
pipeline_NB = Pipeline(stages=[vecAssembler_NB, nb])
(training_df,test_df)=df_final.randomSplit([0.6, 0.4])
cvEvaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol= "prediction", metricName="accuracy")
cv = CrossValidator(estimator=pipeline_NB, estimatorParamMaps=paramGrid, evaluator=cvEvaluator,numFolds = 4)
NB_Model = cv.fit(training_df)
NB_Predictions = NB_Model.transform(test_df)

In [None]:
#Naive Bayes without Cross-Validation
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
nb = NaiveBayes(smoothing = 0.5)
vecAssembler_NB = VectorAssembler(inputCols=["asin_index","reviewerID_index"], outputCol="features",handleInvalid = "skip")
pipeline_NB = Pipeline(stages=[vecAssembler_NB, nb])
model = pipeline_NB.fit(training_df)
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",metricName="accuracy")
evaluator.evaluate(predictions)

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
prediction_evaluation = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(prediction_evaluation)
print("*****Printing Confusion matrix***** ")
print(metrics.confusionMatrix())

In [None]:
df4 = predictions.filter(predictions['reviewerID']==revID).select(col("reviewerID"),col("asin").alias("item"),col("label"),col("overall"),col("prediction"))

### Item Recommended for you

In [None]:
print("Printing first five recommendations from the list using k means and Naive Bayes")
df3.select('asin').distinct().show(5)

### ALS Method For Recommendation of items.

In [None]:
# recommendation using ALS method
df_ALS = new_df.select(new_df['asin'],new_df['reviewerID'],new_df['Average'])
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
revIDindexer = StringIndexer(inputCol="reviewerID", outputCol="reviewerID_index")
asinindexer = StringIndexer(inputCol="asin",outputCol="asin_index")
pipeline = Pipeline(stages={asinindexer,revIDindexer})
transformed = pipeline.fit(df_ALS).transform(df_ALS)
(training_df,test_df)=transformed.randomSplit([0.6, 0.4])

In [None]:
#finding best model for ALS method using average
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

als_avg=ALS(userCol="reviewerID_index",itemCol="asin_index",ratingCol="Average",coldStartStrategy="drop",nonnegative=True).setPredictionCol("ALS_prediction_avg")
param_grid = ParamGridBuilder()\
    .addGrid(als_avg.rank, [100,150])\
    .addGrid(als_avg.maxIter, [5])\
    .addGrid(als_avg.regParam, [0.09,0.01,0.5])\
    .build()
evaluator_ALS_avg=RegressionEvaluator(metricName="rmse",labelCol="Average",predictionCol="ALS_prediction_avg")
cv = CrossValidator(estimator = als_avg,estimatorParamMaps = param_grid,evaluator = evaluator_ALS_avg,numFolds = 3)
ALS_model_CV_avg = cv.fit(training_df)
ALS_model_avg = ALS_model_CV_avg.bestModel
ALS_predictions_avg = ALS_model_avg.transform(test_df)
rmse_ALS_avg=evaluator_ALS_avg.evaluate(ALS_predictions_avg)
print("RMSE considering Average in ALS="+str(rmse_ALS_avg))
# Print evaluation metrics and model parameters
print ("**Best Model**")
print ("RMSE = ", rmse_ALS_avg)
print (" Rank: ", ALS_model_avg.rank)
print (" MaxIter: ", ALS_model_avg._java_obj.parent().getMaxIter())
print (" RegParam: ", ALS_model_avg._java_obj.parent().getRegParam())


In [None]:
evaluator_ALS_avg=RegressionEvaluator(metricName="mae",labelCol="Average",predictionCol="ALS_prediction_avg")
rmse_ALS_avg=evaluator_ALS_avg.evaluate(ALS_predictions_avg)
rmse_ALS_avg

In [None]:
user_recs_avg=ALS_model_avg.recommendForAllUsers(5)

In [None]:
from pyspark.ml.feature import IndexToString
from pyspark.ml.feature import StringIndexerModel
user_labels = revIDindexer.fit(df_ALS).labels
product_labels = asinindexer.fit(df_ALS).labels
user_id_to_label = IndexToString(inputCol="reviewerID_index", outputCol="reviewerId", labels=user_labels)
n = 5
product_labels_ =F.array(*[F.lit(x) for x in product_labels])
recommendations = F.array(*[F.struct(product_labels_[F.col("recommendations")[i]["asin_index"]].alias("asin"),F.col("recommendations")[i]["rating"].alias("rating")) for i in range(n)])

In [None]:
final_recom_avg = user_id_to_label.transform(user_recs_avg)
final_recom_avg = final_recom_avg.withColumn("recommendations",recommendations)
final_recom_avg.show(3)

### Item Recommended for you

In [None]:
final_recom_avg.select('recommendations').filter(final_recom_avg['reviewerID']==revID).show(2,False)