In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("AlsModel") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


25/07/22 18:16:24 WARN Utils: Your hostname, vaibhavi-HP-Laptop-15-fd0xxx resolves to a loopback address: 127.0.1.1; using 192.168.0.128 instead (on interface wlo1)
25/07/22 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/22 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/22 18:16:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/07/22 18:16:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
df = spark.read.parquet("/home/vaibhavi/spark-ml-venv/ml_project/preprocessing/output/als/*")

In [4]:
df.head()

Row(User_id='A01254073JW8SSTKH6AIB', Id='0451521196', rating=5.0)

In [5]:
df.count()

2380346

In [6]:
df.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
# checking the null values in th df
from pyspark.sql.functions import col, sum

null_counts = df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])

null_counts.show()

+-------+---+------+
|User_id| Id|rating|
+-------+---+------+
|      0|  0|     0|
+-------+---+------+



In [8]:
# as we can see there are no null values, but the als model needs only numeric data for the clculation, so converting all the user-id and id into numeric:
from pyspark.ml.feature import StringIndexer

# String index userId and itemId (which may both be strings)
user_indexer = StringIndexer(inputCol="User_id", outputCol="userIdx", handleInvalid="skip")


# Fit and transform
df_indexed = user_indexer.fit(df).transform(df)

# Cast to int
df_ready = df_indexed.select(
    col("userIdx").cast("int").alias("user"),
    col("Id").cast("int").alias("id"),
    col("rating").cast("float")
)


                                                                                

In [9]:
df_ready.head()

25/07/21 15:05:39 WARN DAGScheduler: Broadcasting large task binary with size 38.1 MiB
                                                                                

Row(user=9323, id=451521196, rating=5.0)

In [10]:
df_ready.count()

25/07/21 15:05:45 WARN DAGScheduler: Broadcasting large task binary with size 33.4 MiB
                                                                                

2380346

In [11]:
spark.sparkContext.getConf().get("spark.executor.memory")


'4g'

In [12]:
df_ready.printSchema()

root
 |-- user: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
#checking this to see the manageability of the data : 
print(df_ready.select("user").distinct().count())
print(df_ready.select("id").distinct().count())


25/07/21 15:12:12 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:12:20 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
                                                                                

1004229


25/07/21 15:12:24 WARN DAGScheduler: Broadcasting large task binary with size 33.4 MiB
25/07/21 15:12:30 WARN DAGScheduler: Broadcasting large task binary with size 33.4 MiB


122602


                                                                                

In [21]:
df_ready.filter("user IS NULL OR id IS NULL OR rating IS NULL").count()


25/07/21 15:19:03 WARN DAGScheduler: Broadcasting large task binary with size 38.1 MiB
                                                                                

1345319

In [27]:
from  pyspark.sql.functions import expr

In [31]:
df_ready.selectExpr(
    "count(*) as total",
    "sum(CASE WHEN user IS NULL THEN 1 ELSE 0 END) as user_nulls",
    "sum(CASE WHEN id IS NULL THEN 1 ELSE 0 END) as book_nulls",
    "sum(CASE WHEN rating IS NULL THEN 1 ELSE 0 END) as rating_nulls"
).show()


25/07/21 15:25:01 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB

+-------+----------+----------+------------+
|  total|user_nulls|book_nulls|rating_nulls|
+-------+----------+----------+------------+
|2380346|         0|   1345319|           0|
+-------+----------+----------+------------+



                                                                                

In [9]:
#df_ready 's id coulumn has a lot of null values! removing them -- # making the traning and testing sets:

df_ready = df_ready.filter(df_ready["id"].isNotNull())


In [33]:
(training_data, test_data) = df_ready.randomSplit([0.8, 0.2], seed=42)

training_data.count()

25/07/21 15:29:03 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
                                                                                

828407

In [34]:
# making the als model :
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user",
    itemCol="id",
    ratingCol="rating",  # Or `interaction_count` if implicit
    nonnegative=True,
    coldStartStrategy="drop",
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1
)

als_model = als.fit(training_data)


25/07/21 15:29:40 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:29:42 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:29:48 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:29:55 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:29:59 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:05 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:09 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:12 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:16 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:19 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:23 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:30:26 WARN DAGScheduler: Broadc

In [38]:
training_data.select("user").distinct().count()

25/07/21 15:48:35 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:48:40 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
                                                                                

517890

In [10]:
# Step 1: Get top 1000 users with most reviews
top_users_df = df_ready.groupBy("user").count().orderBy("count", ascending=False).limit(1000)



In [40]:

# Step 2: Filter ALS recommendations to only these users
from pyspark.sql.functions import col
user_recs = als_model.recommendForUserSubset(top_users_df, 10)  # top 10 books

In [42]:
user_recs.show(truncate=False)

25/07/21 15:59:47 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:59:55 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:59:58 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 15:59:59 WARN DAGScheduler: Broadcasting large task binary with size 38.3 MiB
25/07/21 16:00:35 WARN DAGScheduler: Broadcasting large task binary with size 38.3 MiB
[Stage 260:>                                                        (0 + 1) / 1]

+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user|recommendations                                                                                                                                                                                                                                  |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1   |[{738530204, 150.38947}, {1592862578, 7.5992494}, {1884778151, 7.4905157}, {373122241, 7.4680004}, {1861761767, 7.4159055}, {1411610628, 7.403044}, {252023285, 7.368531}, {820701785, 7.3497825}, {873416376, 7.3117695}, {664223591, 7.310601}]|
|3  

                                                                                

In [43]:
predictions = als_model.transform(test_data)


In [44]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")


25/07/21 16:05:00 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 16:05:06 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 16:05:09 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 16:05:13 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/21 16:05:20 WARN DAGScheduler: Broadcasting large task binary with size 38.3 MiB
25/07/21 16:05:23 WARN DAGScheduler: Broadcasting large task binary with size 38.3 MiB
[Stage 342:>                                                        (0 + 2) / 2]

Root-mean-square error = 1.3773746870659802


                                                                                

In [11]:
top_users_df.show()

25/07/22 18:18:47 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/22 18:18:58 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
[Stage 13:>                                                         (0 + 6) / 6]

+----+-----+
|user|count|
+----+-----+
|   0| 3889|
|   1| 2155|
|   2|  611|
|   3|  610|
|   4|  559|
|  21|  508|
|  12|  495|
|  10|  449|
|   5|  436|
|   8|  399|
|  16|  396|
|  17|  389|
|  33|  367|
|   6|  357|
|   9|  332|
|  55|  318|
|  18|  317|
|  23|  309|
|  50|  307|
|  39|  304|
+----+-----+
only showing top 20 rows



                                                                                

In [13]:
top_users_df.count()

25/07/22 18:20:06 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/22 18:20:17 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/22 18:20:20 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
                                                                                

1000

In [14]:
top_users_df.write.mode("overwrite").parquet("output/top_users.parquet")


25/07/22 18:24:00 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/22 18:24:08 WARN DAGScheduler: Broadcasting large task binary with size 38.2 MiB
25/07/22 18:24:11 WARN DAGScheduler: Broadcasting large task binary with size 38.4 MiB
                                                                                

In [15]:
spark.stop()