In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql.functions import udf, col, first, when
from pyspark.sql.types import IntegerType
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
import numpy as np
import pandas as pd
from pyspark.sql.functions import udf
from pandas import pivot_table

In [2]:
sc = StructType([ 
                     StructField("id",IntegerType(),True),
                     StructField("Brand",StringType(),True),
                     StructField("Categories",StringType(),True),
                     StructField("dateAdded",StringType(),True),
                     StructField("dateUpdated",StringType(),True),
                     StructField("ean",IntegerType(),True),
                     StructField("keys",StringType(),True),
                     StructField("manufacturer",StringType(),True),
                     StructField("manufacturerNumber",IntegerType(),True),
                     StructField("manufacturerName",StringType(),True),
                     StructField("reviewDate",StringType(), True),
                     StructField("reviewDateAdded",StringType(),True),
                     StructField("dateSeen",StringType(),True),
                     StructField("didPurchase",StringType(),True),
                     StructField("doRecommended",StringType(),True),
                     StructField("reviewId",StringType(),True),
                     StructField("reviewHelpful",StringType(),True),
                     StructField("rating",IntegerType(),True),
                     StructField("sourceURL",StringType(),True),                         
                     StructField("text",StringType(),True),       
                     StructField("title",StringType(),True),               
                     StructField("userCity",StringType(),True),
                     StructField("userProvince",StringType(),True),
                     StructField("username",StringType(),True),
                     StructField("upc",IntegerType(),True) ]) 

In [3]:
spark_df = sqlContext.read.format('csv').options(header='true', inferSchema = True).load('/FileStore/tables/ecommerce.csv')
#spark_df = spark.read.csv('/FileStore/tables/', header=True, schema=sc);

data = pd.read_csv('/dbfs/FileStore/tables/ecommerce.csv', encoding = "ISO-8859-1")

#data = spark.read.csv('/FileStore/tables/newdata.csv', header=True, schema=sc)

In [4]:
display(spark_df)

In [5]:
print(spark_df)

In [6]:
# didPurchase           38845
# doRecommend           10571
# reviews_id            38845
# reviews.numHelpful    38496
# rating                    0
# sourceURLs                0
# text                     34
# title                   473
# userCity                  0
# userProvince          70554
# username                 54
# upc                       2
# spark_df = spark_df.drop('ean')
# spark_df = spark_df.drop('manufacturerNumber')
# spark_df = spark_df.drop('didPurchase')
# spark_df = spark_df.drop('doRecommend')
# spark_df = spark_df.drop('reviews_id')
# spark_df = spark_df.drop('reviews.numHelpful')
# spark_df = spark_df.drop('title')
# spark_df = spark_df.drop('text')
# spark_df = spark_df.drop('userProvince')
# spark_df = spark_df.drop('dateAdded.1')
# spark_df = spark_df.drop('reviews.date')
# spark_df = spark_df.drop('upc')

In [7]:
spark_df=spark_df.withColumn("rating", spark_df["rating"].cast(IntegerType()))


In [8]:
spark_df = spark_df[['rating','username','id']]

In [9]:
spark_df = spark_df.fillna({'rating':'3'})

In [10]:
spark_df.printSchema()

In [11]:
df_pandas = spark_df.toPandas()

In [12]:
# this will give the 9 digit hash of given string
# and it, most definitely, will be unique for different values and same for same values
def get_hash(x):
  return abs(hash(x)) % 10**9
df_pandas['username'] = df_pandas['username'].apply(get_hash)
df_pandas['id'] = df_pandas['id'].apply(get_hash)
# you can do the same thing for any column you want without ever worrying about 
# getting duplicate entries

In [13]:
# here you will see that 1 and 2 has the same values it's because both 
# of their username strings were same
df_pandas.head()['username']

In [14]:
(df_pandas['rating'])


In [15]:
##ALS takes as user preference by item as an input and generates an item for a
##user. We have used userId, rating and customerId for recommendation of product in
## ALS Algorithm. ALS algorithm considers similarity between user’s taste for
## product as well as user’s past purchases.


In [16]:
table = df_pandas.pivot_table(index='username', values='rating', columns='id', aggfunc=np.mean)
table = table.fillna(0)
stockCodes = table.columns[:]
print(len(stockCodes))
print((table.columns))
stockCode_list = data['id'].unique()
print(len(stockCode_list))
user_rating_array = []

product_list = []
# create a list of product StockCodes
for index, row in table.iterrows():
  for name, value in row.iteritems():
    product_list.append(name)
    print(name)
  break

progress = 0
# this will take some time
for index, row in table.iterrows():
  customer_id = index
  product_id = 0
  for name, value in row.iteritems():
    if int(value) > 0:
      rating_row_array = (int(customer_id), int(product_id), int(value))
      user_rating_array.append(rating_row_array)
    product_id += 1
  progress += 1
  
print(user_rating_array)
#   bar.update(progress)
# bar.finish()

In [17]:
df_user_rating = pd.DataFrame(user_rating_array, columns=['user', 'product', 'rating'])
print("pandas dataframe created")
df_user_rating.to_csv('/dbfs/FileStore/tables/user_rating.csv')

In [18]:
### Creating the final csv userr_rating which have userid, rating and productId for ALS Algorithm

In [19]:
print("creating user_ratings dataframe")
# user_ratings = sqlContext.createDataFrame(user_rating_array, ['user', 'product', 'rating']).collect()
user_ratings = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/user_rating.csv')

In [20]:
user_ratings.show()

In [21]:
## Diving the data into training and test
## Using recommendForAllUsers method of ALS Algorithm, products are generated
## which are recommended to each user. Using recommendForAllItems method
##we get user recommendation for each product. 

In [22]:
(training, test) = user_ratings.randomSplit([0.8, 0.2])
training=training.withColumn("product", training["product"].cast(IntegerType()))

test=test.withColumn("product", test["product"].cast(IntegerType()))

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(rank = 10, maxIter=30, regParam=0.3, userCol="user", itemCol="product", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each product
prodRecs = model.recommendForAllItems(10)

In [23]:
training.head(10)

In [24]:
## A function is created which takes userId as input and gives the Top 10
## recommended products to the user and show’s user’s actual purchase. It also
## shows the predicted rating for the product which nearly matches with the actual
## rating.


In [25]:
user_test = 33739
test_user_recs = userRecs.where(col('user') == user_test).collect()
print(test_user_recs)