# Installing Pyspark and Creating Spark Session:

In [1]:
# Install PySpark
!pip install pyspark
import pyspark

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .getOrCreate()




# Loading the Big Data sets:

### Book Name and ID with their Average Ratings:

In [2]:
# Define the columns you want to select
selected_columns = ["Id", "Name"]

# Define the file names url from the google drive
file_names = [
    "/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1-100k.csv",
    "/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1000k-1100k.csv",
    "/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book100k-200k.csv",
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1100k-1200k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1200k-1300k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1300k-1400k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1400k-1500k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1500k-1600k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1600k-1700k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1800k-1900k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1700k-1800k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book1900k-2000k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book2000k-3000k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book200k-300k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book3000k-4000k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book300k-400k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book4000k-5000k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book400k-500k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book500k-600k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book600k-700k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book700k-800k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book800k-900k.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/book900k-1000k.csv',
    # Add other file names url  here

]

# Load each CSV file into separate DataFrames
dataframes = {}
for i, file_name in enumerate(file_names, 1):
    df = spark.read.csv(file_name, header=True)  # Assuming CSV files have a header
    # Select the required columns
    df_selected = df.select(selected_columns)
    # Create the DataFrame name as df1, df2, df3, etc.
    df_name = f"df{i}"
    # Assign the DataFrame to a dictionary with key as DataFrame name
    dataframes[df_name] = df_selected

# Access the DataFrames using keys like dataframes["df1"], dataframes["df2"], etc.


In [3]:
# Access individual DataFrames using keys
df23 = dataframes["df23"]
# and so on...

# Show the contents of individual DataFrames
df23.show()
# and so on...


+------+--------------------+
|    Id|                Name|
+------+--------------------+
|900000| Cabinet 03: Weather|
|900001|Workouts in a Bin...|
|900002|Las 120 jornadas ...|
|900004|The Traveler: An ...|
|900006|      Vertical Smile|
|900009| The Best Laid Plans|
|900012|Morning, Noon & N...|
|900013|Sex, Botany And E...|
|900015|              Memory|
|900017|Pandora's Breeche...|
|900018|Scientists Anonym...|
|900022|All Under Heaven:...|
|900024|Confessor (Herbie...|
|900025|           Confessor|
|900029|      The Naked Face|
|900030|Are You Afraid Of...|
|900036|The World's Stupi...|
|900038|The Secret of the...|
|900041|The Book of Stran...|
|900043| The House You Build|
+------+--------------------+
only showing top 20 rows



In [4]:
from functools import reduce

# Combine all DataFrames into one DataFrame using union
combined_df = reduce(lambda df1, df2: df1.union(df2), dataframes.values())

# Show the combined DataFrame
combined_df.show()


+---+--------------------+
| Id|                Name|
+---+--------------------+
|  1|Harry Potter and ...|
|  2|Harry Potter and ...|
|  3|Harry Potter and ...|
|  4|Harry Potter and ...|
|  5|Harry Potter and ...|
|  6|Harry Potter and ...|
|  8|Harry Potter Boxe...|
|  9|"Unauthorized Har...|
| 10|Harry Potter Coll...|
| 12|The Ultimate Hitc...|
| 13|The Ultimate Hitc...|
| 14|The Hitchhiker's ...|
| 18|The Ultimate Hitc...|
| 21|A Short History o...|
| 22|Bill Bryson's Afr...|
| 23|Bryson's Dictiona...|
| 24|In a Sunburned Co...|
| 25|I'm a Stranger He...|
| 26|The Lost Continen...|
| 27|Neither Here nor ...|
+---+--------------------+
only showing top 20 rows



In [5]:
# Show the number of rows in the combined DataFrame
print("Number of rows in the combined DataFrame:", combined_df.count())

Number of rows in the combined DataFrame: 1886019


In [6]:
# Show the data types of each column in the combined DataFrame
combined_df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)



### User ID and their individual Ratings:

In [7]:
# Define the file names for user ratings
user_ratings_files = [
    "/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_0_to_1000.csv",
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_1000_to_2000.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_2000_to_3000.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_3000_to_4000.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_5000_to_6000.csv',
    '/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_6000_to_11000.csv',
    "/content/drive/MyDrive/Data Science Projects/Book Recommendation System/Datasets/user_rating_4000_to_5000.csv"
    # Add all file names url here
]

# Load each CSV file containing user ratings into separate DataFrames
user_ratings_dataframes = {}
for i, file_name in enumerate(user_ratings_files, 1):
    df = spark.read.csv(file_name, header=True)  # Assuming CSV files have a header
    # Extract the DataFrame name from the file name
    df_name = f"user_ratings_df{i}"
    # Assign the DataFrame to a dictionary with key as DataFrame name
    user_ratings_dataframes[df_name] = df

# Access the user ratings DataFrames using keys like user_ratings_dataframes["user_ratings_df1"], user_ratings_dataframes["user_ratings_df2"], etc.


In [8]:
# Access individual user ratings DataFrames using keys
user_ratings_df1 = user_ratings_dataframes["user_ratings_df1"]
user_ratings_df2 = user_ratings_dataframes["user_ratings_df2"]
# and so on...

# Show the contents of individual user ratings DataFrames
user_ratings_df1.show()
user_ratings_df2.show()
# and so on...


+---+--------------------+---------------+
| ID|                Name|         Rating|
+---+--------------------+---------------+
|  1|Agile Web Develop...| it was amazing|
|  1|The Restaurant at...| it was amazing|
|  1|          Siddhartha| it was amazing|
|  1|The Clock of the ...|really liked it|
|  1|Ready Player One ...|really liked it|
|  1|The Hunger Games ...| it was amazing|
|  1|The Clue in the E...| it was amazing|
|  1|The Authoritative...| it was amazing|
|  1|The Clue of the B...| it was amazing|
|  1|The Clue of the H...| it was amazing|
|  1|The Clue of the S...| it was amazing|
|  1|The Return of the...| it was amazing|
|  1|The Name of the Rose|       liked it|
|  1|Blue Mars (Mars T...|       liked it|
|  1|Give and Take: A ...| it was amazing|
|  1|Mindset: The New ...|really liked it|
|  1|Bad Blood: Secret...|really liked it|
|  1|Dark Apprentice (...|       liked it|
|  1|A Short History o...| it was amazing|
|  1|The Mystery of th...| it was amazing|
+---+------

In [9]:
# Combine all user ratings DataFrames into one DataFrame using union
combined_user_ratings_df = reduce(lambda df1, df2: df1.union(df2), user_ratings_dataframes.values())

# Show the combined DataFrame
combined_user_ratings_df.show()


+---+--------------------+---------------+
| ID|                Name|         Rating|
+---+--------------------+---------------+
|  1|Agile Web Develop...| it was amazing|
|  1|The Restaurant at...| it was amazing|
|  1|          Siddhartha| it was amazing|
|  1|The Clock of the ...|really liked it|
|  1|Ready Player One ...|really liked it|
|  1|The Hunger Games ...| it was amazing|
|  1|The Clue in the E...| it was amazing|
|  1|The Authoritative...| it was amazing|
|  1|The Clue of the B...| it was amazing|
|  1|The Clue of the H...| it was amazing|
|  1|The Clue of the S...| it was amazing|
|  1|The Return of the...| it was amazing|
|  1|The Name of the Rose|       liked it|
|  1|Blue Mars (Mars T...|       liked it|
|  1|Give and Take: A ...| it was amazing|
|  1|Mindset: The New ...|really liked it|
|  1|Bad Blood: Secret...|really liked it|
|  1|Dark Apprentice (...|       liked it|
|  1|A Short History o...| it was amazing|
|  1|The Mystery of th...| it was amazing|
+---+------

In [10]:
# Show the number of rows in the combined user ratings DataFrame
print("Number of rows in the combined user ratings DataFrame:", combined_user_ratings_df.count())

# Show the data types of each column in the combined user ratings DataFrame
combined_user_ratings_df.printSchema()


Number of rows in the combined user ratings DataFrame: 362596
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Rating: string (nullable = true)



# Data Preprocessing:

In [11]:
# Rename the 'Id' column to 'Book ID' in the combined DataFrame of books
combined_df = combined_df.withColumnRenamed("Id", "Book ID")
# Rename the 'ID' column to 'User ID' in the combined DataFrame of user ratings
combined_user_ratings_df = combined_user_ratings_df.withColumnRenamed("ID", "User ID")


In [16]:
# Count the occurrences of each unique value in the 'Rating' column
rating_counts = combined_user_ratings_df.groupBy("Rating").count()

# Show the unique values and their counts in the 'Rating' column
rating_counts.show()


+--------------------+------+
|              Rating| count|
+--------------------+------+
|     did not like it|  7806|
|     really liked it|132779|
| Mr. Feynman!"": ...|     1|
|            Proverbs|     1|
|                #1)"|    35|
|            liked it| 96021|
| en el país de lo...|     1|
|           it was ok| 28806|
|          1931-1932"|    19|
|      it was amazing| 92313|
|This user doesn't...|  4765|
| Why We Say the T...|     1|
|     "" the Vibrator|     1|
| But"" Thinking a...|     2|
|                #2)"|     2|
| Vol. 2: A Seth B...|     1|
|                #3)"|     3|
|    African American|     6|
|               #12)"|     1|
|                #5)"|     1|
+--------------------+------+
only showing top 20 rows



*** Note: *** Here, it looks like the ratings given by users are full of inconsistent values. We need to clean these. We are going to replace the texts with reasonable numerical values (on a scale of 1-10, where, 1=did not like, 10=really liked and 0 = did not read):

really liked it = 10

did not like it = 1

 #6)" = 6

 #3)" = 3

liked it = 8
 #7)" = 7

it was ok = 5

it was amazing = 9

 #4)" = 4

 #1)" = 1


 #2)" = 2

 #5)" = 5

 Anything else will be considered = 5 (average value)

In [21]:
from pyspark.sql.functions import when

# Define the mapping from text ratings to numerical values
rating_mapping = { "really liked it": 10,
    "did not like it": 1,
    "#6\"": 6,
    "#3\"": 3,
    "liked it": 8,
    "#7\"": 7,
    "it was ok": 5,
    "it was amazing": 9,
    "#4\"": 4,
    "#1\"": 1,
    "#2\"": 2,
    "#5\"": 5
}

# Define the default numerical value for anything else
default_rating = 0  # Considered as unknown or null

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Define a user-defined function to apply the mapping to each value in the 'Rating' column
def map_rating(rating):
    if rating in rating_mapping:
        return rating_mapping[rating]
    else:
        return default_rating

# Register the user-defined function
map_rating_udf = udf(map_rating, IntegerType())

# Apply the user-defined function to the 'Rating' column
cleaned_ratings_df = combined_user_ratings_df.withColumn("Rating", map_rating_udf(combined_user_ratings_df["Rating"]))

# Show the updated DataFrame
cleaned_ratings_df.show()


+-------+--------------------+------+
|User ID|                Name|Rating|
+-------+--------------------+------+
|      1|Agile Web Develop...|     9|
|      1|The Restaurant at...|     9|
|      1|          Siddhartha|     9|
|      1|The Clock of the ...|    10|
|      1|Ready Player One ...|    10|
|      1|The Hunger Games ...|     9|
|      1|The Clue in the E...|     9|
|      1|The Authoritative...|     9|
|      1|The Clue of the B...|     9|
|      1|The Clue of the H...|     9|
|      1|The Clue of the S...|     9|
|      1|The Return of the...|     9|
|      1|The Name of the Rose|     8|
|      1|Blue Mars (Mars T...|     8|
|      1|Give and Take: A ...|     9|
|      1|Mindset: The New ...|    10|
|      1|Bad Blood: Secret...|    10|
|      1|Dark Apprentice (...|     8|
|      1|A Short History o...|     9|
|      1|The Mystery of th...|     9|
+-------+--------------------+------+
only showing top 20 rows



In [22]:
# Count the occurrences of each unique value in the 'Rating' column
rating_counts = cleaned_ratings_df.groupBy("Rating").count()

# Show the unique values and their counts in the 'Rating' column
rating_counts.show()


+------+------+
|Rating| count|
+------+------+
|     1|  7806|
|     5| 28806|
|     9| 92313|
|     8| 96021|
|    10|132779|
|     0|  4871|
+------+------+



### Joining the two dataframes to create a final dataframe:

In [34]:
from pyspark.sql.functions import col

# Perform an inner join on the 'Name' column
final_df = cleaned_ratings_df.join(combined_df, combined_df["Name"] == combined_user_ratings_df["Name"], "left").drop(combined_df["Name"])

# Show the final DataFrame
final_df.show()


+-------+--------------------+------+-------+
|User ID|                Name|Rating|Book ID|
+-------+--------------------+------+-------+
|   8693|28 Barbary Lane: ...|     5|  16264|
|      1|Agile Web Develop...|     9|     45|
|   8698|Angry White Pyjam...|     8| 198051|
|      1|Bad Blood: Secret...|    10|   NULL|
|      1|Blue Mars (Mars T...|     8|  41131|
|   8693|Children of God (...|     8|  16948|
|   9164|Children of God (...|     8|  16948|
|   6675|Dahlia Season: St...|    10|1243755|
|      1|Dark Apprentice (...|     8|1146551|
|      1|Dark Apprentice (...|     8|2671637|
|   8702|Doing Harm: The T...|     9|   NULL|
|      1|Give and Take: A ...|     9|   NULL|
|   8706|Hell Is a Very Sm...|     9|   NULL|
|   8706|  Here Come The Dogs|     9|   NULL|
|      1|Mindset: The New ...|    10|   NULL|
|   6675|No One Belongs He...|    10| 113429|
|   6675|No One Belongs He...|    10|1561322|
|   6675|No One Belongs He...|    10|2066751|
|   6765|No One Belongs He...|    

In [26]:
# Count the number of rows in the final DataFrame
row_count = final_df.count()

# Print the number of rows
print("Number of rows in the final DataFrame:", row_count)


Number of rows in the final DataFrame: 1184215


### Checking and Handling Missing Values :

In [27]:
from pyspark.sql.functions import col

# Define a dictionary to store the count of null values for each column
null_counts = {}

# Iterate over each column and count the number of null values

for col_name in final_df.columns:
    # Count the number of null values in the column
    null_count = final_df.filter(col(col_name).isNull()).count()
    # Store the null count in the dictionary
    null_counts[col_name] = null_count

# Print the count of null values for each column
for col_name, count in null_counts.items():
    print(f"Column '{col_name}': {count} null values")


Column 'User ID': 0 null values
Column 'Name': 0 null values
Column 'Rating': 0 null values
Column 'Book ID': 130992 null values


*** Note***: Here, there are 130992 null values in Book ID. I will Try to create unique random values for these missing Book IDs.
I used the following code snippet:

```python
from pyspark.sql.functions import col, lit

# Step 1: Create a new DataFrame with only "Name" and "Book ID" columns
name_book_id_df = final_df.select("Name", "Book ID")

# Step 2: Filter out rows where "Book ID" is not null
filtered_df = name_book_id_df.filter(col("Book ID").isNotNull())

# Step 3: Find the max Book ID
max_book_id_row = filtered_df.agg({"Book ID": "max"}).first()
max_book_id = max_book_id_row[0] if max_book_id_row[0] else 0  # Get the max Book ID or 0 if it's null

# Step 4: Iterate over unique "Name" values and assign unique Book ID to each missing value
unique_names = filtered_df.select("Name").distinct().collect()
for name_row in unique_names:
    name = name_row["Name"]
    if not name_book_id_df.filter((col("Name") == name) & (col("Book ID").isNotNull())).count():
        # Assign a unique Book ID to the name
        max_book_id += 1
        name_book_id_df = name_book_id_df.withColumn("Book ID",
                                                     when(col("Name") == name, lit(max_book_id)).otherwise(col("Book ID")))

# Join the original final_df with the modified name_book_id_df to fill in the missing Book IDs
final_df = final_df.join(name_book_id_df, "Name", "left")

# Show the updated final DataFrame
final_df.show()
```

*** Note: *** But it seems this is taking too long with my Limited Resources. So i am going to delete these rows with missing data.

In [43]:
# Remove rows with null values in the "Book ID" column
final_df = final_df.na.drop(subset=["Book ID"])


In [44]:
from pyspark.sql.functions import col

# Define a dictionary to store the count of null values for each column
null_counts = {}

# Iterate over each column and count the number of null values

for col_name in final_df.columns:
    # Count the number of null values in the column
    null_count = final_df.filter(col(col_name).isNull()).count()
    # Store the null count in the dictionary
    null_counts[col_name] = null_count

# Print the count of null values for each column
for col_name, count in null_counts.items():
    print(f"Column '{col_name}': {count} null values")


Column 'User ID': 0 null values
Column 'Name': 0 null values
Column 'Rating': 0 null values
Column 'Book ID': 0 null values


# Creating the Collaborative Filtering Recommendation Model:

The Alternating Least Squares (ALS) algorithm is a collaborative filtering algorithm used for recommendation systems. It is particularly useful for large-scale collaborative filtering problems, especially when dealing with sparse datasets. Here's an explanation of how ALS works:

1. **Matrix Factorization**: ALS is based on the idea of matrix factorization. In a recommendation system context, we represent the user-item interaction matrix as a sparse matrix where rows correspond to users and columns correspond to items (e.g., movies, products). The goal is to decompose this matrix into two lower-rank matrices: one that represents users and their latent features, and the other represents items and their latent features.

2. **Objective Function**: ALS aims to minimize the difference between the actual ratings given by users and the predicted ratings obtained from the matrix factorization. This is typically achieved by minimizing the sum of squared errors (hence, "Least Squares"). The objective function is optimized iteratively using alternating least squares, where one set of variables (either user or item latent features) is fixed while the other set is optimized.

3. **Alternating Least Squares (ALS)**: The ALS algorithm alternates between two steps:
   - **Fixing User Latent Features**: Given the current item latent features, the user latent features are updated by solving a least squares problem. This step aims to find the optimal representation of each user in the latent feature space.
   - **Fixing Item Latent Features**: Given the updated user latent features, the item latent features are updated by solving another least squares problem. This step aims to find the optimal representation of each item in the latent feature space.
   These steps are repeated iteratively until convergence or until a stopping criterion is met.

4. **Cold Start Handling**: ALS includes a "cold start" strategy to handle new users or items for which no rating data is available. This strategy typically involves either dropping or providing default values for such users/items during training or prediction.

5. **Regularization**: ALS often incorporates regularization techniques to prevent overfitting during training. Regularization helps to generalize the learned latent features and avoid fitting the noise in the data too closely.

6. **Scalability**: ALS is highly parallelizable and scalable, making it suitable for large-scale recommendation systems. It can be efficiently implemented in distributed computing frameworks like Apache Spark.

Overall, ALS is a popular and effective algorithm for collaborative filtering-based recommendation systems, offering good performance and scalability for large datasets.

In [52]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Split dataset into training and test sets
(train_data, test_data) = final_df.randomSplit([0.8, 0.2], seed=123)

# Define StringIndexer for User ID column
user_indexer = StringIndexer(inputCol="User ID", outputCol="user_index",handleInvalid="keep")

# Fit StringIndexer to training data
user_indexer_model = user_indexer.fit(train_data)

# Transform training data to add user_index column
train_data_indexed = user_indexer_model.transform(train_data)

# Transform test data using the fitted StringIndexer for User ID
test_data_indexed = user_indexer_model.transform(test_data)

# Define StringIndexer for Book ID column
book_indexer = StringIndexer(inputCol="Book ID", outputCol="book_index", handleInvalid="keep")

# Fit StringIndexer to training data
book_indexer_model = book_indexer.fit(train_data_indexed)

# Transform training data to add book_index column
train_data_indexed = book_indexer_model.transform(train_data_indexed)

# Transform test data using the fitted StringIndexer for Book ID
test_data_indexed = book_indexer_model.transform(test_data_indexed)

# Define ALS model using the indexed user and book columns
als = ALS(userCol="user_index", itemCol="book_index", ratingCol="Rating", coldStartStrategy="drop")

# Train ALS model on training data
model = als.fit(train_data_indexed)

# Generate predictions on test data
predictions = model.transform(test_data_indexed)

# Evaluate the model using Root Mean Squared Error (RMSE)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)


Root Mean Squared Error (RMSE): 1.5533601663969065


### Looks like our model performs superbly well with a very low Root Mean Square Value.

In [55]:
# Generate top 'n' recommendations for each user
# let's find top 10 recommendations for all the users
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show(truncate=False)


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_index|recommendations                                                                                                                                                                                                 |
+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1         |[{22892, 13.214581}, {20493, 12.391882}, {29573, 11.795004}, {91615, 11.760404}, {90143, 11.760404}, {89438, 11.760404}, {78561, 11.760404}, {59585, 11.760404}, {53870, 11.760404}, {51413, 11.760404}]        |
|3         |[{15902, 11.077564}, {22489, 10.94491}, {30292, 10.8600445}, {27399, 10.8600445}, {12530, 10.822273}

## Seeing Recommendations for a specific User ID:

We Want to Recommend Top 10 Books to Specific Users that The user is predicted to Give a rating greater than or equal to 9.

In [57]:
specific_user_index = 37  # Replace 123 with the desired user_index

# Filter the DataFrame to get recommendations for the specific user_index
specific_user_recommendations = user_recommendations.filter(user_recommendations.user_index == specific_user_index)

from pyspark.sql import functions as F
# Apply an additional filter to show recommendations with ratings greater than or equal to 9
specific_user_recommendations = specific_user_recommendations.withColumn("recommendations",
                                                                          F.expr("filter(recommendations, x -> x.rating >= 9)"))

# Show the recommendations for the specific user_index
specific_user_recommendations.show(truncate=False)


+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_index|recommendations                                                                                                                                                                                         |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|37        |[{31167, 12.418376}, {91615, 11.929238}, {90143, 11.929238}, {89438, 11.929238}, {78561, 11.929238}, {59585, 11.929238}, {53870, 11.929238}, {51413, 11.929238}, {50619, 11.929238}, {22892, 11.908589}]|
+----------+------------------------------------------------------------------------------------------------------------------------------------