In [112]:
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import os
import sys
import pickle
from pyspark.ml.recommendation import ALS



In [113]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [114]:
# Dynamic path settings
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath("__file__")))
DATA_DIR = os.path.join(BASE_DIR, "dataset")
TRAIN_TEST_SPLIT_DIR = os.path.join(BASE_DIR, "train_test_split")


In [115]:
#Training the model


In [116]:
spark = SparkSession.builder \
    .appName("RecommenderSystem") \
    .master("local[*]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.local.dir", "C:/tmp/spark-temp") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

In [117]:
# Load training data
train_sales_path = os.path.join(TRAIN_TEST_SPLIT_DIR, "train_sales_data.csv")
train_df = pd.read_csv(train_sales_path)
# Ensure there are data rows in training data
if train_df.empty:
    raise ValueError("Training DataFrame is empty.")

In [118]:
# Convert interaction_type column to numeric
train_df['interaction_type'] = pd.to_numeric(train_df['interaction_type'], errors='coerce')
train_df.dropna(subset=['interaction_type'], inplace=True)

In [119]:
# Check if the dataframe is still empty after conversion and dropping NA values
if train_df.empty:
    raise ValueError("Training DataFrame is empty after converting 'interaction_type' to numeric and dropping NA values.")

In [120]:
print("Train DataFrame:\n", train_df.head(), "\n")

Train DataFrame:
    user id                        product id Interaction type  \
0      1.0  4c69b61db1fc16e7013b43fc926e502d         purchase   
1      2.0  66d49bbed043f5be260fa9f7fbff5957             view   
2      3.0  2c55cae269aebf53838484b0d7dd931a             like   
3      4.0  18018b6bc416dab347b1b7db79994afa             view   
4      5.0  e04b990e95bf73bbe6a3fa09785d7cd0             like   

            Time stamp  Unnamed: 4  user_id  product_id  interaction_type  
0  2023-10-10 08:00:00         NaN        0         905                 3  
1  2023-10-11 08:00:00         NaN        1        1178                 1  
2  2023-10-12 08:00:00         NaN        2         517                 2  
3  2023-10-13 08:00:00         NaN        3         276                 1  
4  2023-10-14 08:00:00         NaN        4        2638                 2   



In [121]:
# Convert training data to Spark DataFrame
train_data = [Row(user_id=int(row.user_id), product_id=int(row.product_id), interaction_type=float(row['interaction_type'])) for index, row in train_df.iterrows()]
train_df_spark = spark.createDataFrame(train_data)

In [122]:
# ALS modeling
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="user_id",
    itemCol="product_id",
    ratingCol="interaction_type",
    coldStartStrategy="drop"
)

In [123]:
# Training the model
model = als.fit(train_df_spark)

In [124]:
# Upload Maps
with open(os.path.join(DATA_DIR, 'user_id_map.pkl'), 'rb') as f:
    user_id_map = pickle.load(f)

with open(os.path.join(DATA_DIR, 'product_id_map.pkl'), 'rb') as f:
    product_id_map = pickle.load(f)

In [125]:
spark = SparkSession.builder \
    .appName("RecommenderSystem") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "4") \
    .config("spark.python.worker.reuse", "false") \
    .config("spark.local.dir", "/tmp/spark-temp") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()

In [126]:
# Load test data
train_sales_path = os.path.join(TRAIN_TEST_SPLIT_DIR, "train_sales_data.csv")
test_sales_path = os.path.join(TRAIN_TEST_SPLIT_DIR, "test_sales_data.csv")
train_df = pd.read_csv(train_sales_path)
test_df = pd.read_csv(test_sales_path)

In [127]:
# Check common user and product IDs
common_user_ids = set(train_df['user_id']).intersection(set(test_df['user_id']))
common_product_ids = set(train_df['product_id']).intersection(set(test_df['product_id']))

print(f"Common user IDs: {len(common_user_ids)}")
print(f"Common product IDs: {len(common_product_ids)}")

if len(common_user_ids) == 0 or len(common_product_ids) == 0:
    raise ValueError("No common user IDs or product IDs between training and test sets.")

Common user IDs: 1148
Common product IDs: 1148


In [128]:
# Ensure there are data rows in test data
if test_df.empty:
    raise ValueError("Test DataFrame is empty.")
print("Test DataFrame:\n", test_df.head(), "\n")

Test DataFrame:
    user id                        product id Interaction type  \
0      3.0  2c55cae269aebf53838484b0d7dd931a             like   
1      7.0  40d3cd16b41970ae6872e914aecf2c8e         purchase   
2      8.0  bc178f33a04dbccefa95b165f8b56830             view   
3      9.0  cc2083338a16c3fe2f7895289d2e98fe             like   
4     14.0  82c86a4d24dce5e14303033d7b658b78             view   

            Time stamp  Unnamed: 4  user_id  product_id  interaction_type  
0  2023-10-12 08:00:00         NaN        2         517                 2  
1  2023-10-16 08:00:00         NaN        6         759                 3  
2  2023-10-17 08:00:00         NaN        7        2235                 1  
3  2023-10-18 08:00:00         NaN        8        2405                 2  
4  2023-10-23 08:00:00         NaN       13        1510                 1   



In [129]:
# Convert interaction_type column to numeric
test_df['interaction_type'] = pd.to_numeric(test_df['interaction_type'], errors='coerce')
test_df.dropna(subset=['interaction_type'], inplace=True)

In [130]:
# Check if the dataframe is still empty after conversion and dropping NA values
if test_df.empty:
    raise ValueError("Test DataFrame is empty after converting 'interaction_type' to numeric and dropping NA values.")

In [131]:
# Convert test data to Spark DataFrame
test_data = [Row(user_id=int(row.user_id), product_id=int(row.product_id), interaction_type=float(row['interaction_type'])) for index, row in test_df.iterrows()]
test_df_spark = spark.createDataFrame(test_data)

In [132]:
print("Test DataFrame:\n", test_df_spark.head(), "\n")

Test DataFrame:
 Row(user_id=2, product_id=517, interaction_type=2.0) 



In [133]:
# Load the trained model using Spark's load method
#model_path = os.path.join(BASE_DIR, "als_model")
#model = ALSModel.load(model_path)

In [134]:
# Making predictions on test data
predictions = model.transform(test_df_spark)

In [135]:
if predictions.head(1) == []:
    raise ValueError("Predictions DataFrame is empty after model.transform(). Ensure your test data has sufficient entries.")

In [136]:
predictions.show()

+-------+----------+----------------+----------+
|user_id|product_id|interaction_type|prediction|
+-------+----------+----------------+----------+
|    148|       386|             2.0| 1.9074911|
|     85|      2024|             1.0| 0.8998625|
|     65|      1527|             1.0| 0.8998625|
|     53|      1350|             1.0| 0.8998626|
|     78|      1710|             2.0|  1.907491|
|    101|      2774|             1.0| 0.8998625|
|    115|      1670|             3.0| 2.9219954|
|     81|      1758|             1.0| 0.8998624|
|     28|      2980|             2.0|  1.907491|
|    183|        40|             2.0| 1.9074911|
|     76|      1324|             2.0|  1.907491|
|     26|       589|             3.0| 2.9219952|
|    159|      2387|             1.0| 0.8998625|
|    192|      2163|             2.0| 1.9074913|
|    236|      1973|             1.0| 0.8998625|
|     91|      2580|             1.0| 0.8998625|
|    222|      2044|             1.0|0.89986247|
|    128|      1700|

In [137]:
# Evaluation metrics
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="interaction_type", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.09117348511795605


In [138]:
# Getting recommendations for users
user_recs = model.recommendForAllUsers(10)
user_recs.show()
user_recs = model.recommendForAllUsers(100).toPandas()
print(user_recs.head(1))


+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|     12|[{1842, 2.1329694...|
|     22|[{1515, 2.1775942...|
|     26|[{589, 2.9219952}...|
|     27|[{2142, 1.4505175...|
|     28|[{2527, 2.2592998...|
|     31|[{1778, 1.4231818...|
|     34|[{545, 2.0368469}...|
|     47|[{2263, 1.4468113...|
|     53|[{1390, 1.4645165...|
|     65|[{2406, 1.3001112...|
|     76|[{1201, 2.1642861...|
|     78|[{2998, 2.1832957...|
|     81|[{921, 1.3574818}...|
|     85|[{622, 1.5659992}...|
|     91|[{2612, 1.36672},...|
|    101|[{396, 1.514143},...|
|    103|[{1829, 2.9219954...|
|    115|[{1670, 2.9219954...|
|    122|[{2091, 1.6130909...|
|    126|[{1940, 2.0424297...|
+-------+--------------------+
only showing top 20 rows

   user_id                                    recommendations
0       28  [(2527, 2.2592997550964355), (2117, 1.99739861...


In [139]:
from py2neo import Graph, Node, Relationship

In [140]:
# Connecting to the Neo4j database
graph = Graph("bolt://localhost:7687", auth=("neo4j", "PaVi@9731"))

In [None]:
# Add Recommendations to Neo4j
for index, row in user_recs.iterrows():
    original_user_id = user_id_map[row['user_id']]
    user_node = graph.nodes.match("Customer", id=original_user_id).first()
 
    recommendations = row['recommendations']
    for rec in recommendations:
        original_product_id = product_id_map[rec['product_id']]
        score = rec['rating']
        product_node = graph.nodes.match("Product", id=original_product_id).first()
        
        if user_node and product_node:
            recommendation = Relationship(user_node, "RECOMMENDED", product_node, score=score)
          
            graph.create(recommendation)
         

(_28:Customer {age: 54, category: 'Accessories', color: 'Gray', discount_applied: 'Yes', frequency: 'Every 3 Months', gender: 'Male', id: 29, item_purchased: 'Handbag', location: 'North Carolina', payment_method: 'PayPal', previous_purchases: 41, promo_code_used: 'Yes', purchase_amount: 94, review_rating: 4.4, season: 'Fall', shipping_type: 'Free Shipping', size: 'M', subscription_status: 'Yes'})
(_6295:Product {about_product: 'Make sure this fits by entering your model number. | Trusted brand delivers convenience and ease of use | Kids love to paint, can be used on almost any surface Great for arts and crafts and school projects | Washability makes fun with colors and clean-up simpler than ever | Painting develops child\u2019s fine motor skills and teaches color mixing and blending | Painting is enjoyed by children of all ages', asin: nan, brand: nan, category: 'Toys & Games | Arts & Crafts | Drawing & Painting Supplies | Paints | Watercolor Paint', color: nan, dimensions: nan, direct

In [None]:
   print( recommendation )

In [None]:
print(product_node)