In [7]:
from pyspark.sql import SparkSession

In [8]:
# Read data
spark = SparkSession.builder.appName("NMF rec sys").getOrCreate()
product_rating = spark.read.csv("../data/Ratings.csv", header=True, inferSchema=True)

product_rating.show()

+--------------------+--------------------+------+
|              userId|           productId|rating|
+--------------------+--------------------+------+
|65362c3195e84456c...|65363731f7a5f28d5...|     4|
|65362c3195e84456c...|65363731262e9f91d...|     3|
|65362c3195e84456c...|65363731c193c5580...|     1|
|65362c3195e84456c...|653637316ac4c08ab...|     4|
|65362c3195e84456c...|65363731f0f3e986c...|     1|
|65362c3195e84456c...|653637315c9280c97...|     3|
|65362c3195e84456c...|65363731403de5e9f...|     3|
|65362c3195e84456c...|65363731a7165c457...|     5|
|65362c3195e84456c...|653637313feea7faa...|     1|
|65362c3195e84456c...|65363731e90db023f...|     5|
|65362c3195e84456c...|6536373152560a326...|     5|
|65362c3195e84456c...|65363731f022c5c7b...|     3|
|65362c3195e84456c...|65363731a190e56d3...|     1|
|65362c3195e84456c...|65363731a69e3e141...|     2|
|65362c3195e84456c...|6536373107d8bbebf...|     3|
|65362c3195e84456c...|65363731375072736...|     3|
|65362c3195e84456c...|65363731a

In [9]:
df_product = spark.read.csv('../data/Products.csv', header = True)

# Assuming you have a DataFrame named df and you want to rename column 'oldColumnName' to 'newColumnName'
df_product = df_product.withColumnRenamed('id', 'productId')

In [10]:
merged_df = product_rating.join(df_product, 'productId', 'inner')
merged_df.show(5)

+--------------------+--------------------+------+--------------------+-----+
|           productId|              userId|rating|               title|price|
+--------------------+--------------------+------+--------------------+-----+
|65363731f7a5f28d5...|65362c3195e84456c...|     4| Lenovo ThinkPad 16P|  114|
|65363731262e9f91d...|65362c3195e84456c...|     3|  Camera Sony ZV-E10|  600|
|65363731c193c5580...|65362c3195e84456c...|     1|Apple Watch Serie...|  704|
|653637316ac4c08ab...|65362c3195e84456c...|     4|Samsung Galaxy S2...|  236|
|65363731f0f3e986c...|65362c3195e84456c...|     1|      HP Probook 440|  790|
+--------------------+--------------------+------+--------------------+-----+
only showing top 5 rows



In [11]:
# check null for all cols:
from pyspark.sql.functions import *
merged_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in merged_df.columns]
    ).show()

merged_df.show(5)
merged_df.printSchema()

+---------+------+------+-----+-----+
|productId|userId|rating|title|price|
+---------+------+------+-----+-----+
|        0|     0|     0|    0|    0|
+---------+------+------+-----+-----+

+--------------------+--------------------+------+--------------------+-----+
|           productId|              userId|rating|               title|price|
+--------------------+--------------------+------+--------------------+-----+
|65363731f7a5f28d5...|65362c3195e84456c...|     4| Lenovo ThinkPad 16P|  114|
|65363731262e9f91d...|65362c3195e84456c...|     3|  Camera Sony ZV-E10|  600|
|65363731c193c5580...|65362c3195e84456c...|     1|Apple Watch Serie...|  704|
|653637316ac4c08ab...|65362c3195e84456c...|     4|Samsung Galaxy S2...|  236|
|65363731f0f3e986c...|65362c3195e84456c...|     1|      HP Probook 440|  790|
+--------------------+--------------------+------+--------------------+-----+
only showing top 5 rows

root
 |-- productId: string (nullable = true)
 |-- userId: string (nullable = true

In [12]:
import concurrent.futures
import numpy as np
from surprise import Reader, Dataset, accuracy, NMF
from pyspark.sql import SparkSession

def generate_sequential_list_float(start, end, increment):
    sequential_list = list(np.arange(start, end, increment))
    return sequential_list

def generate_sequential_list(start, end, increment):
    sequential_list = list(range(start, end, increment))
    return sequential_list

def calculate_rmse(train_value, k_values, merged_df):
    df_train, df_test = merged_df.randomSplit([train_value, 1-train_value], seed=96)
    df_train_pandas = df_train.toPandas()
    df_test_pandas = df_test.toPandas()

    reader = Reader(rating_scale=(1, 5))
    data_train = Dataset.load_from_df(df_train_pandas[['userId', 'productId', 'rating']], reader)
    data_test = Dataset.load_from_df(df_test_pandas[['userId', 'productId', 'rating']], reader)

    trainset = data_train.build_full_trainset()
    testset = data_test.build_full_trainset().build_testset()

    rmse_values = []
    for k in k_values:
        algo = NMF(n_factors=15, n_epochs=k, biased=True)
        algo.fit(trainset)
        knn_predictions = algo.test(testset)

        rmse = accuracy.rmse(knn_predictions)
        rmse_values.append(rmse)

    return rmse_values

# ... (other setup code)

# Specify the start, end, and increment for training rate
transtart = 0.7
tranend = 0.8
incre = 0.1

# Generate the sequential list for training rate
train_values = generate_sequential_list_float(transtart, tranend, incre)

# Specify the start, end, and increment for k values
start_k = 1
end_k = 20
increment_k = 10

# Generate the sequential list for k values
k_values = generate_sequential_list(start_k, end_k, increment_k)

# Initialize a 2D array to store RMSE values
arr = np.zeros((len(train_values), len(k_values)))

# Use 10 threads to parallelize the computation for different training rates
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    futures = [executor.submit(calculate_rmse, t, k_values, merged_df) for t in train_values]

    for i, future in enumerate(concurrent.futures.as_completed(futures)):
        rmse_values = future.result()
        arr[i, :] = rmse_values

# Find the indices of the minimum RMSE value in the array
min_rmse_idx = np.unravel_index(np.argmin(arr), arr.shape)

# Get the corresponding values
best_train_value = train_values[min_rmse_idx[0]]
best_k_value = k_values[min_rmse_idx[1]]
best_rmse = arr[min_rmse_idx]

print("Best Training Rate:", best_train_value)
print("Best n_epochs:", best_k_value)
print("Best RMSE:", best_rmse)


RMSE: 2.2553
RMSE: 2.2367
RMSE: 1.6646
RMSE: 1.4318
Best Training Rate: 0.7999999999999999
Best n_epochs: 11
Best RMSE: 1.4318490764900942
