In [1]:
import findspark
findspark.init("C:\spark")

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Clean Data") \
    .getOrCreate()

# Load the CSV file
df = spark.read.csv("D:\Crack\Book_rating.csv", header=True, inferSchema=True)

# Define the list of columns you want to keep
columns_to_keep = ["id", "User_id", "review_score"]

# Select only the columns you want to keep
df_cleaned = df.select(columns_to_keep)

# Drop rows with missing User_id or missing review_score
df_cleaned = df_cleaned.dropna(subset=["User_id", "review_score", 'id'])

# Show the first few rows of the cleaned DataFrame
df_cleaned.show()


+----------+--------------+------------+
|        id|       User_id|review_score|
+----------+--------------+------------+
|1882931173| AVCGYZL8FQQTD|           4|
|0595344550| ACO23CG8K8T77|           5|
|0595344550|A3OS2QHEH495TD|           1|
|0595344550|A3OZDTEEAF8GS9|           1|
|0802841899| ANX3DDV12ZRRU|           4|
|0802841899|A2H2LORTA5EZY2|           4|
|B0007FIF28|A2GERYVE64DIPL|           3|
|B000JINSBG|A15A5KPP3AL76U|           5|
|0918973031|A1X1CW1GXKC50V|           5|
|0918973031| A309DQ3THGNXD|           5|
|0974289108|A1KZ0RDJZQSY4O|           3|
|0974289108|A3AJA5ADM3Q8LM|           5|
|B000NKGYMK|A258YNWJW2264M|           3|
|B000NKGYMK|A2WY5VMJQ0MM1A|           5|
|B000NKGYMK| A7IA8CTTSQ7A4|           3|
|0789480662|A2GA412HQHN8WV|           5|
|0789480662|A35Z7FIHBSCHKR|           4|
|0789480662|A13HDF4J03LQ81|           4|
|B0000CJHIO|A1UHTWM53B5KM1|           4|
|B0000CJHIO|A22PE7W18KPDE2|           5|
+----------+--------------+------------+
only showing top

In [21]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, abs, hash

# Filter out rows with missing User_id
df_cleaned = df_cleaned.filter(df_cleaned["User_id"].isNotNull())

# Filter out rows with missing id
df_cleaned = df_cleaned.filter(df_cleaned["id"].isNotNull())

# Filter out rows with missing review_score
df_cleaned = df_cleaned.filter(df_cleaned["review_score"].isNotNull())

# Convert User_id to a numeric type using a hash function
df_cleaned = df_cleaned.withColumn("User_id", (abs(hash(col("User_id"))) % (10 ** 8)).cast("integer"))

# Generate unique numeric IDs for 'id' 
df_cleaned = df_cleaned.withColumn("id_numeric", (abs(hash(col("id"))) % (10 ** 8)).cast("integer"))

# Convert id to a numeric type
df_cleaned = df_cleaned.withColumn("review_score", df_cleaned["review_score"].cast("float"))

# Build the ALS model
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="User_id",
    itemCol="id_numeric",
    ratingCol="review_score",
    coldStartStrategy="drop"
)

# Split the data into training and test sets
(training, test) = df_cleaned.randomSplit([0.8, 0.2])

# Fit the ALS model on the training data
model = als.fit(training)

# Generate predictions on the test data
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="review_score",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")




Root Mean Squared Error (RMSE) = 2.9929781215420115
