In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import os
import glob
import pandas as pd

In [2]:
shared_path = "/content/drive/My Drive/Big Data A3"  # For Shared drives

# Check the folder exists
if not os.path.exists(shared_path):
    raise FileNotFoundError("Couldn't find 'Big Data A3' — check your shared drive path")


files = glob.glob(shared_path + "/*.parquet")
print(files)

['/content/drive/My Drive/Big Data A3/Gift_Cards_Merged.parquet', '/content/drive/My Drive/Big Data A3/Digital_Music_Merged.parquet', '/content/drive/My Drive/Big Data A3/Health_and_Personal_Care_Merged.parquet', '/content/drive/My Drive/Big Data A3/All_Beauty_Merged.parquet', '/content/drive/My Drive/Big Data A3/Amazon_Fashion_Merged.parquet', '/content/drive/My Drive/Big Data A3/Appliances_Merged.parquet', '/content/drive/My Drive/Big Data A3/Cell_Phones_Merged.parquet', '/content/drive/My Drive/Big Data A3/Health_and_Household_Merged.parquet', '/content/drive/My Drive/Big Data A3/Baby_Products_Merged.parquet', '/content/drive/My Drive/Big Data A3/Arts_Crafts_and_Sewing_Merged.parquet', '/content/drive/My Drive/Big Data A3/CDs_and_Vinyl_Merged.parquet', '/content/drive/My Drive/Big Data A3/Kindle_Store_Merged.parquet', '/content/drive/My Drive/Big Data A3/Industrial_and_Scientific_Merged.parquet', '/content/drive/My Drive/Big Data A3/Magazine_Subscriptions.parquet', '/content/drive/M

In [3]:
parquet_paths = [
    '/content/drive/My Drive/Big Data A3/Gift_Cards_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Digital_Music_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Health_and_Personal_Care_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/All_Beauty_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Amazon_Fashion_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Appliances_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Cell_Phones_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Health_and_Household_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Baby_Products_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Arts_Crafts_and_Sewing_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/CDs_and_Vinyl_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Kindle_Store_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Industrial_and_Scientific_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Magazine_Subscriptions.parquet',
    '/content/drive/My Drive/Big Data A3/Musical_Instruments_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Office_Products_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Movies_and_TV_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Patio_Lawn_and_Garden_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Handmade_Products_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Beauty_and_Personal_Care_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Subscription_Boxes_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Grocery_and_Gourmet_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Pet_Supplies_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Software_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Electronics_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Sports_and_Outdoors_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Automotive_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Video_Games_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Tools_and_Home_Improvement_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Toys_and_Games_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Clothes_Shoes_and_Jewelry_Merged.parquet',
    '/content/drive/My Drive/Big Data A3/Home_and_Kitchen_Merged.parquet'
]

In [5]:
!pip install implicit



In [6]:
import dask.dataframe as dd
import pandas as pd
from implicit import als
from sklearn.metrics import mean_squared_error
import numpy as np
from scipy.sparse import csr_matrix
import random

In [7]:
# Load only necessary columns from each parquet
# Properly load and concatenate into a single Dask DataFrame
df = dd.concat([dd.read_parquet(path, columns=['user_id', 'asin', 'rating']) for path in parquet_paths])

In [8]:
# Step 1: Define a function to filter each partition
def filter_users_partition(partition):
    user_counts = partition['user_id'].value_counts()
    valid_users = user_counts[user_counts >= 5].index
    return partition[partition['user_id'].isin(valid_users)]

# Step 2: Apply it to each partition
df_filtered = df.map_partitions(filter_users_partition)

In [9]:
# Step 3: Sample a fraction of each partition
df_sample = df_filtered.map_partitions(lambda p: p.sample(frac=0.05, random_state=42))

# Step 4: Convert to Pandas (safe because it's a small subset now)
df_sample_pd = df_sample.compute()

In [10]:
!pip install pyspark



In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import random

In [12]:
# Start Spark session
spark = SparkSession.builder \
    .appName("ALS Recommender") \
    .getOrCreate()

In [13]:
# Ensure columns are of the correct type
df_sample_pd['user_id'] = df_sample_pd['user_id'].astype('category').cat.codes
df_sample_pd['asin'] = df_sample_pd['asin'].astype('category').cat.codes
df_sample_pd['rating'] = df_sample_pd['rating'].astype(float)

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(df_sample_pd[['user_id', 'asin', 'rating']])

In [14]:
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], seed=42)

In [15]:
als = ALS(
    userCol="user_id",
    itemCol="asin",
    ratingCol="rating",
    coldStartStrategy="drop",  # drop NaNs during evaluation
    nonnegative=True
)

model = als.fit(train_data)

In [16]:
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-Mean-Square Error = {rmse:.4f}")

Root-Mean-Square Error = 2.1003


In [18]:
from pyspark.sql.functions import lit

In [19]:
# Get 3 random users from the test set
user_ids = test_data.select('user_id').distinct().sample(False, 0.01, seed=42).limit(3)
user_ids_list = [row['user_id'] for row in user_ids.collect()]

# Create DataFrame of all items for each selected user
all_items = spark_df.select('asin').distinct()
for user_id in user_ids_list:
    user_df = all_items.withColumn("user_id", lit(user_id))
    user_recs = model.transform(user_df)
    top_5 = user_recs.orderBy('prediction', ascending=False).limit(5)
    print(f"\nTop 5 recommendations for user {user_id}:")
    top_5.select('asin', 'prediction').show()


Top 5 recommendations for user 328361:
+-------+----------+
|   asin|prediction|
+-------+----------+
|2531371|  5.390152|
|1403963|  5.361312|
|  43661| 5.3598437|
| 351135|   5.23058|
| 709378|  5.216456|
+-------+----------+


Top 5 recommendations for user 264386:
+----+----------+
|asin|prediction|
+----+----------+
+----+----------+


Top 5 recommendations for user 479704:
+-------+----------+
|   asin|prediction|
+-------+----------+
|2348812|   7.46769|
| 514036| 7.3918624|
| 678906|  7.351141|
|1663669| 7.3138204|
|1929344| 7.2960505|
+-------+----------+

