In [None]:
import pandas as pd
import numpy as np
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Recommendation using Spark ALS

this step is used to initiate a spark session in order to use pyspark functions

In [None]:
!pip install findspark



In [None]:
!pip install pyspark py4j



In [None]:
import numpy as np
import json
import findspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
file_path_prefix = "/content/drive/MyDrive/datasets/sampled-data/"

file1_name = "filtered-sampled-cleaned-2019-Oct.csv"
file2_name = "filtered-sampled-cleaned-2019-Nov.csv"
file3_name = "filtered-sampled-cleaned-2019-Dec.csv"
file4_name = "filtered-sampled-cleaned-2020-Jan.csv"
file5_name = "filtered-sampled-cleaned-2020-Feb.csv"
file6_name = "filtered-sampled-cleaned-2020-Mar.csv"
file7_name = "filtered-sampled-cleaned-2020-Apr.csv"

filename_list = [file1_name, file2_name, file3_name, file4_name, file5_name, file6_name, file7_name]

In [None]:
spark = SparkSession.builder.appName("PySpark ALS").getOrCreate()
spark

In [None]:
customSchema = T.StructType([
    T.StructField("user_id", T.IntegerType(), True),
    T.StructField("product_id", T.IntegerType(), True),
    T.StructField("rating", T.IntegerType(), True),
])

# Implement Matrix Factorization using SparkALS

this is where matrix factorization being implemented to get the user embedding and product embedding in 10 dimensional space.

In [None]:
from pyspark.ml.recommendation import ALS

In [None]:
# configure a ALS spark model
als = ALS(
    maxIter = 5,
    regParam = 0.01,
    userCol = "user_id",
    itemCol = "product_id",
    ratingCol = "rating",
    coldStartStrategy = "drop"
)

extract ratings grouped by user and product

In [None]:
chunk_size = 1000000

columns_to_load = ["product_id", "user_id", "rating"]
user_product_rating_df = pd.DataFrame(columns=columns_to_load)

for filename in filename_list:

  # load dataset in chunks
  chunks = pd.read_csv(file_path_prefix + filename, chunksize=chunk_size)
  print(f"reading {filename}", end="")

  for index, chunk in enumerate(chunks):
    user_product_rating_df = pd.concat([user_product_rating_df, chunk], ignore_index=True)

  print(f"\n{filename} completed")

reading filtered-sampled-cleaned-2019-Oct.csv
filtered-sampled-cleaned-2019-Oct.csv completed
reading filtered-sampled-cleaned-2019-Nov.csv
filtered-sampled-cleaned-2019-Nov.csv completed
reading filtered-sampled-cleaned-2019-Dec.csv
filtered-sampled-cleaned-2019-Dec.csv completed
reading filtered-sampled-cleaned-2020-Jan.csv
filtered-sampled-cleaned-2020-Jan.csv completed
reading filtered-sampled-cleaned-2020-Feb.csv
filtered-sampled-cleaned-2020-Feb.csv completed
reading filtered-sampled-cleaned-2020-Mar.csv
filtered-sampled-cleaned-2020-Mar.csv completed
reading filtered-sampled-cleaned-2020-Apr.csv
filtered-sampled-cleaned-2020-Apr.csv completed


In [None]:
# group the user and product based on user_id and product_id by rating summation
user_product_rating_df = user_product_rating_df.groupby(
    ['user_id', 'product_id'],
    as_index=False
    )['rating'].sum()

In [None]:
# # normalise
# global_min = user_product_rating_df["rating"].min()
# global_max = user_product_rating_df["rating"].max()
# print(f"min: {global_min}, max: {global_max}")

min: 1, max: 253


In [None]:
# user_product_rating_df["rating"] = user_product_rating_df["rating"].map(lambda x:
#  (x - global_min)/(global_max - global_min)
# )

In [None]:
user_product_rating_df

Unnamed: 0,user_id,product_id,rating
0,12511517,100042492,1
1,29515875,1801638,1
2,31198833,1003549,1
3,34526405,18600003,1
4,34916060,12600007,1
...,...,...,...
12670555,649774322,100143628,1
12670556,649774322,100143638,1
12670557,649774789,100142959,1
12670558,649774866,45400088,1


In [None]:
# user_product_rating_df.to_csv(f"{file_path_prefix}user-product-rating.csv", index=False, header=True)

In [None]:
# print(f"{file_path_prefix}user-product-rating.csv")

/content/drive/MyDrive/datasets/sampled-data/user-product-rating.csv


In [None]:
spark_df = spark.createDataFrame(user_product_rating_df, schema=customSchema)

In [None]:
model = als.fit(spark_df)

# Store User Embedding

the calculated user embedding is exported into a .parquet format file

In [None]:
model.userFactors.show(5)

+---------+--------------------+
|       id|            features|
+---------+--------------------+
| 34916060|[0.32710588, 0.00...|
| 62336140|[0.07852814, -0.3...|
|104397540|[-0.15077847, -0....|
|106416780|[0.36028907, -0.0...|
|110773810|[-0.07616552, 0.0...|
+---------+--------------------+
only showing top 5 rows



In [None]:
model.userFactors.count()

4175116

In [None]:
model.userFactors.write.parquet("/content/drive/MyDrive/spark-model/userFactors.parquet")

In [None]:
parquet_user = spark.read.parquet("/content/drive/MyDrive/spark-model/userFactors.parquet")
parquet_user.printSchema()
parquet_user.show(5)

# Store Item Embedding

the item embedding is also exported into a .parquet file format

In [None]:
model.itemFactors.show(5)

+-------+--------------------+
|     id|            features|
+-------+--------------------+
|1001440|[0.014171817, 0.3...|
|1001820|[0.150131, 0.2934...|
|1002100|[-0.18513426, -0....|
|1002310|[-0.097115144, 0....|
|1002460|[0.054038845, 0.6...|
+-------+--------------------+
only showing top 5 rows



In [None]:
model.itemFactors.count()

209271

In [None]:
model.itemFactors.toPandas().to_csv("/content/drive/MyDrive/spark-model/spark-item-embedding.csv", index=False, header=True)

In [None]:
model.itemFactors.write.parquet("/content/drive/MyDrive/spark-model/itemFactors.parquet")

In [None]:
parquet_item = spark.read.parquet("/content/drive/MyDrive/spark-model/itemFactors.parquet")
parquet_item.printSchema()
parquet_item.show(5)

# Calculate the Top 5 favourite Products of a User
**steps:**
1. get the selected user embedding
2. calculate the cosine similarity values between that user embedding with all the product embeddings
3. filter out the purchased products
4. pick the top N products

# Loading user and item embedding data

In [None]:
df_product_embedding = pd.read_csv("/content/drive/MyDrive/spark-model/spark-item-embedding.csv")

In [None]:
parquet_user = spark.read.parquet("/content/drive/MyDrive/spark-model/userFactors.parquet")
parquet_item = spark.read.parquet("/content/drive/MyDrive/spark-model/itemFactors.parquet")

In [None]:
# df_user_embedding["features"] = df_user_embedding["features"].map(lambda x: np.array(json.loads(x)))
df_product_embedding["features"] = df_product_embedding["features"].map(lambda x: np.array(json.loads(x)))

Select a User

In [None]:
purchase_df = pd.DataFrame(pd.read_csv("/content/drive/MyDrive/datasets/sampled-data/purchase_history.csv"))

In [None]:
selected_user_id = 567950899

In [None]:
selected_user_embedding = parquet_user.filter(parquet_user["id"] == selected_user_id).toPandas()
selected_user_embedding

Unnamed: 0,id,features
0,567950899,"[1.7456978559494019, -0.37916985154151917, -1...."


In [None]:
purchased_by_user_df = purchase_df[purchase_df["user_id"] == selected_user_id]
purchased_by_user_df

Unnamed: 0,event_time,product_id,user_id
118628,2019-11-30 17:18:17 UTC,1002544,567950899


# Calculate the Similarity between User Embedding and Product Embeddings

in this step, the distance library from scipy is used to calculate the cosine similarity between user embedding with each item embedding in 10 dimensional space.

In [None]:
from scipy.spatial import distance

In [None]:
df_product_embedding["similarity"] = df_product_embedding["features"].apply(
    lambda x: 1 - distance.cosine(
        selected_user_embedding.iloc[0]["features"],x
        )
)

In [None]:
df_product_embedding.head(5)

Unnamed: 0.1,Unnamed: 0,id,features,similarity
0,0,1001440,"[-0.7812828421592712, 0.2694493234157562, 0.01...",0.237662
1,1,1001820,"[-0.3348201513290405, 0.10412280261516571, -0....",0.273127
2,2,1002100,"[0.0010078996419906616, -0.13538743555545807, ...",0.817165
3,3,1002310,"[-0.13170188665390015, 0.19735312461853027, -0...",-0.008945
4,4,1002460,"[0.4232328236103058, 0.4985388219356537, -0.37...",-0.192992


# Picked the Top N products

the similarity scores calculated previously are filtered and sorted

In [None]:
purchase_df = pd.DataFrame(pd.read_csv("/content/drive/MyDrive/purchase-data.csv"))
purchased_by_user_list = purchase_df[purchase_df["user_id"] == selected_user_id]["product_id"].tolist()

In [None]:
df_target_product_id = (
    df_product_embedding[~df_product_embedding["id"]
                         .isin(purchased_by_user_list)]
    .sort_values(by="similarity", ascending=False)
    .head(50)[["id", "similarity"]]
)

In [None]:
df_target_product_id.head(5)

Unnamed: 0,id,similarity
38333,100116101,0.955521
71182,26405343,0.937367
189186,2600939,0.93102
101598,100129684,0.929368
142331,100097646,0.927199


# Print All Details of the top N recommended products

In [None]:
products_df = pd.DataFrame(pd.read_csv("/content/drive/MyDrive/datasets/sampled-data/sampled-product-dataset.csv"))

In [None]:
pd.merge(
    left = df_target_product_id,
    right = products_df,
    left_on = "id",
    right_on = "product_id"
)[["product_id", "category_code", "brand", "price", "similarity"]]

Unnamed: 0,product_id,category_code,brand,price,similarity
0,100116101,construction.components.faucet,sokolov,155.91,0.955521
1,26405343,computers.peripherals.printer,robertobravo,1582.13,0.937367
2,2600939,appliances.kitchen.refrigerators,gefest,229.32,0.93102
3,100129684,medicine.tools.tonometer,witerra,12.1,0.929368
4,100097646,sport.bicycle,defacto,10.27,0.927199
5,100010405,furniture.living_room.chair,ikea,303.77,0.925281
6,100083459,furniture.kitchen.table,totolink,24.75,0.922676
7,100202210,electronics.clocks,certina,731.04,0.922311
8,2900583,appliances.kitchen.microwave,electrolux,154.42,0.922278
9,14000417,appliances.personal.massager,cersanit,316.85,0.922145


In [None]:
chunk_size = 1000000
selected_user_data = pd.DataFrame()
for filename in filename_list:
  chunks = pd.read_csv(file_path_prefix + filename, chunksize=chunk_size)
  print(f"reading {filename} ...")

  for index, chunk in enumerate(chunks):
    selected_user_data = pd.concat([selected_user_data, chunk[chunk['user_id'] == selected_user_id]], ignore_index=True)

selected_user_data

reading filtered-sampled-cleaned-2019-Oct.csv ...
reading filtered-sampled-cleaned-2019-Nov.csv ...
reading filtered-sampled-cleaned-2019-Dec.csv ...
reading filtered-sampled-cleaned-2020-Jan.csv ...
reading filtered-sampled-cleaned-2020-Feb.csv ...
reading filtered-sampled-cleaned-2020-Mar.csv ...
reading filtered-sampled-cleaned-2020-Apr.csv ...


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,rating
0,2019-11-05 19:43:08 UTC,view,1002540,2053013555631882655,electronics.smartphone,apple,469.77,567950899,daf127f1-498d-496d-94d8-736e1bc324f8,1
1,2019-11-14 20:50:00 UTC,cart,1004858,2053013555631882655,electronics.smartphone,samsung,126.0,567950899,7f7122a9-4ed0-4563-8e17-8532e3c0f627,2
2,2019-11-14 20:52:15 UTC,view,1002544,2053013555631882655,electronics.smartphone,apple,458.1,567950899,7f7122a9-4ed0-4563-8e17-8532e3c0f627,1
3,2019-11-25 01:20:09 UTC,view,1002542,2053013555631882655,electronics.smartphone,apple,488.51,567950899,93222d73-2da8-411a-94cc-ce24129d0cdc,1
4,2019-11-24 13:19:13 UTC,view,1307401,2053013558920217191,computers.notebook,asus,295.76,567950899,f0e66679-efa5-4eea-afd6-cda52ee17f61,1
5,2019-11-24 13:18:29 UTC,view,1307136,2053013558920217191,computers.notebook,acer,514.27,567950899,f0e66679-efa5-4eea-afd6-cda52ee17f61,1
6,2019-11-27 12:25:32 UTC,cart,1004856,2053013555631882655,electronics.smartphone,samsung,125.85,567950899,bd0846ca-5f88-4eaa-8f5f-37a3457344a2,2
7,2019-11-30 17:18:17 UTC,purchase,1002544,2053013555631882655,electronics.smartphone,apple,457.92,567950899,a1cf761a-8929-4604-b34c-e63b58de0f28,3
8,2019-11-30 17:14:15 UTC,view,1003317,2053013555631882655,electronics.smartphone,apple,1003.89,567950899,a1cf761a-8929-4604-b34c-e63b58de0f28,1
9,2019-11-30 17:17:54 UTC,cart,1002544,2053013555631882655,electronics.smartphone,apple,457.92,567950899,a1cf761a-8929-4604-b34c-e63b58de0f28,2


# Evaluation of Pyspark Matrix Factorization

this step is to evaluate the performance of the matrix factorization

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], seed=123)

model = als.fit(train_data)

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

In [None]:
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 2.09552380673
