In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, udf, row_number,avg, desc
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import Window
from pyspark.ml.feature import BucketedRandomProjectionLSH, Normalizer
import plotly.express as px
import seaborn as sns

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Data Preprocessing") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/09 18:23:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Load the datasets from hadoop
file_path1 = "hdfs://localhost:9000/datafolder/datazip/Books_rating.csv"
file_path2 = "hdfs://localhost:9000/datafolder/datazip/books_data.csv"

books_rating = spark.read.csv(file_path1, header=True, inferSchema=True)
books_data = spark.read.csv(file_path2, header=True, inferSchema=True)

In [5]:
# Considering only the required columns
books_rating = books_rating[['Id','Title','User_id','review/score','Price']] 
books_data = books_data[['Title','authors','publisher','categories','publishedDate']]

books_rating.describe().show()
books_data.describe().show()


24/05/09 18:23:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|summary|                  Id|               Title|            User_id|      review/score|               Price|
+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|  count|             3000000|             2999792|            2437750|           2999870|              482421|
|   mean|1.0568515696607149E9|   2012.796651763537|  18.29299003322259| 1656.860421970827|  21.767951161877054|
| stddev| 1.284488524833734E9|  1536.7533549608797|  21.99284402625621|1427549.9863179324|   26.21155241772817|
|    min|          0001047604|  """ Film technique| "" Film acting """|   & Algorithms"""|              "" and|
|    max|          B0064P287I|you can do anythi...|      AZZZZW74AAX75|         thersites|: A guide to loca...|
+-------+--------------------+--------------------+-------------------+------------------+--------------

[Stage 7:>                                                          (0 + 8) / 8]

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|               Title|             authors|           publisher|          categories|       publishedDate|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              212403|              181153|              139274|              171880|              186560|
|   mean|   3823.672941176471|              1578.4|             3734.75|  1983.7334777898159|   1982.702933143332|
| stddev|  10717.999589636447|  1278.7901502106834|  10193.316327911616|  142.43423125699238|   37.65620052385513|
|    min|  """ Film technique| "" ""I'm a Littl...| "" ""Skipper Ire...| "" Knox's quirky...| "" ""Cruising fo...|
|    max|you can do anythi...|” “Jeanie with th...|                펜립|�� folk art is a ...|” which is anthol...|
+-------+--------------------+--------------------+--------------------+----------



In [6]:
# Extract only the year from the publishedDate column
from pyspark.sql.functions import year
books_data = books_data.withColumn("publishedYear", year("publishedDate")).drop("publishedDate")
books_data = books_data.filter(col("publishedYear").rlike("^\d+$"))



In [7]:
# dropping duplicate rows in both the tables

count_original = books_rating.count() 
books_rating.dropDuplicates()
count_after = books_rating.count() 
d1 = count_original - count_after
print("number of duplicates in books_rating:", d1)
count_original2 = books_data.count() 
books_data.dropDuplicates()
count_after2 = books_data.count() 
d2 = count_original2 - count_after2
print("number of duplicates in books_data:", d2)

                                                                                

number of duplicates in books_rating: 0
number of duplicates in books_data: 0


In [8]:
# Create a dictionary to display the count of null values for each column
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)

                                                                                

CodeCache: size=131072Kb used=38790Kb max_used=38899Kb free=92281Kb
 bounds [0x00000001020b0000, 0x0000000104700000, 0x000000010a0b0000]
 total_blobs=14222 nmethods=13280 adapters=854
 compilation: disabled (not enough contiguous free space left)
Null values in books_rating dataset: {'Id': 0, 'Title': 208, 'User_id': 562250, 'review/score': 130, 'Price': 2517579}
Null values in books_data dataset: {'Title': 1, 'authors': 6839, 'publisher': 48179, 'categories': 15259, 'publishedYear': 0}




In [9]:
from pyspark.sql.functions import when, col, first
from pyspark.sql.window import Window
books_rating = books_rating.withColumn("Price", when(col("Price").cast("double").isNotNull(), col("Price")).otherwise(None))
books_rating.show()
window_spec = Window.partitionBy("Title").orderBy("Price")
books_rating = books_rating.withColumn("Price", first("Price", True).over(window_spec))

+----------+--------------------+--------------+------------+-----+
|        Id|               Title|       User_id|review/score|Price|
+----------+--------------------+--------------+------------+-----+
|1882931173|Its Only Art If I...| AVCGYZL8FQQTD|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A30TK6U7DNS82R|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3UH4UZ4RSVO82|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2MVUWT453QH61|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A22X4XUPKF66MR|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2F6NONFUDB6UK|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A14OJS0VWMOSWO|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2RSSXTDZDUSH4|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A25MD5I2GUIW6W|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3VA4XFS5WNJO3|         4.0| NULL|
|0829814000|Wonderful Worship...| AZ0IOBU20TBOP|         5.0|19.40|
|0829814000|Wonderful Worship...|A373VVEU6Z9M0N|

In [10]:
# Dropping null values

books_rating = books_rating.na.drop()
books_data = books_data.na.drop()
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)

books_rating.describe().show()
books_data.describe().show()

                                                                                

Null values in books_rating dataset: {'Id': 0, 'Title': 0, 'User_id': 0, 'review/score': 0, 'Price': 0}
Null values in books_data dataset: {'Title': 0, 'authors': 0, 'publisher': 0, 'categories': 0, 'publishedYear': 0}


                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|        review/score|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""Beauty Shop-Ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|www.whitbread.org...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------



In [11]:
from pyspark.sql.functions import udf, col, split, size, regexp_replace, initcap
from pyspark.sql.types import StringType
import re
from pyspark.sql.functions import year
def remove_special_characters(text):
    if text is not None:
        return re.sub(r'[^\w\s]', '', text)
    else:
        return None
remove_special_characters_udf = udf(remove_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_special_characters_udf("categories"))
books_data = books_data.filter(size(split(col("categories"), " ")) == 1)
books_data = books_data.withColumn("Title", regexp_replace(col("Title"), "\\b\\s+", ""))
books_data = books_data.withColumn("Title", initcap(col("Title")))
books_rating = books_rating.withColumn("Title", initcap(col("Title")))
books_data = books_data.withColumn("categories", initcap(col("categories")))
def remove_integers_and_special_characters(text):
    if text is not None:
        return re.sub(r'[^a-zA-Z\s]', '', text)
    else:
        return None
remove_integers_and_special_characters_udf = udf(remove_integers_and_special_characters, StringType())
books_data = books_data.withColumn("categories", remove_integers_and_special_characters_udf("categories"))
books_data = books_data.filter(books_data["categories"].rlike("^[a-zA-Z\s]+$"))


In [12]:

# Renaming columns for convenience
books_rating = books_rating.withColumnRenamed("review/score", "rating")
books_rating.describe().show()



+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|              rating|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""beauty Shop-ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|Zulu Shaman: Drea...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------



In [13]:
# Count the number of ratings per user
user_id_counts = books_rating.groupBy("User_id").agg(count("*").alias("count"))
user_id_counts= user_id_counts.orderBy(user_id_counts["count"].desc())
user_id_counts.show(20)
result = user_id_counts.groupBy((col('count') / 10).cast('int').alias('range')).count().orderBy('range')
result.show()

                                                                                

+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 2105|
|   AFVQZQ8PW0L|  606|
| AG35NEEFCMQVR|  307|
|A1M8PP7MLHNBQB|  278|
|A1D2C0WDCSHUWZ|  271|
| AHD101501WCN1|  242|
|A2VKWLCNZF4ZVB|  205|
|A1NATT3PN24QWY|  200|
|A1K1JW1C5CUSUZ|  179|
|A1X8VZWTOG8IS6|  174|
|A1S3C5OFU508P3|  158|
|A3M174IC0VXOS2|  152|
|A2EDZH51XHFA9B|  147|
|A2VE83MZF98ITY|  143|
|A21NVBFIEQWDSG|  142|
|A2NJO6YE954DBH|  141|
|A2OJW07GQRNJUT|  129|
|A2F6N60Z96CAJI|  118|
|A1OX82JPAQLL60|  113|
|A281NPSIMI1C2R|  112|
+--------------+-----+
only showing top 20 rows





+-----+------+
|range| count|
+-----+------+
|    0|303448|
|    1|  1213|
|    2|   250|
|    3|   126|
|    4|    47|
|    5|    33|
|    6|    16|
|    7|    10|
|    8|     6|
|    9|     7|
|   10|     8|
|   11|     5|
|   12|     1|
|   14|     4|
|   15|     2|
|   17|     2|
|   20|     2|
|   24|     1|
|   27|     2|
|   30|     1|
+-----+------+
only showing top 20 rows



                                                                                

In [None]:
# Filter out users that has invalid user_id
user_id_counts = user_id_counts.filter(col("user_id").rlike("^[a-zA-Z0-9]+$"))
user_id_counts.show(20)

# Filter users who has given more than 50 ratings
filtered_user_ids = user_id_counts.filter(col("count") >= 1).select("User_id")
filtered_user_ids.describe().show()



In [None]:
# Join to filter the ratings
filtered_ratings = books_rating.join(filtered_user_ids, "User_id", "inner")

In [None]:
# Merging both datasets
books_merged = filtered_ratings.join(books_data, "Title", "inner")

In [None]:
# Ensure the rating column is numeric
books_final = books_merged.withColumn("rating", col("rating").cast("double"))
books_final_df = books_final.toPandas()
print(books_final_df)
books_final_df.to_csv('books_final.csv', index=False)
books_final.show()

In [None]:
# Show the contents of the DataFrame
books_final.show()

In [None]:
books_final.describe().show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd
temp_df = books_final.groupBy("rating").count().toPandas()

# Create trace1 for bar chart
trace1 = go.Bar(
    x=temp_df['rating'],
    y=temp_df['count'],
    text = temp_df['count'],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)', line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_bar = go.Layout(
    template="plotly_dark",
    title='RATINGS COUNT',
    xaxis=dict(title='Rating'),
    yaxis=dict(title='Count')
)
fig_bar = go.Figure(data=[trace1], layout=layout_bar)
fig_bar.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd

# Filter out null values
temp_df_filtered = temp_df.dropna(subset=['rating'])

def pie_plot(cnt_srs, title):
    labels = cnt_srs['rating']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig

fig_pie = pie_plot(temp_df_filtered, 'Rating Distribution')
fig_pie.show()

In [None]:
from pyspark.sql.functions import col, when
# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
    .withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
    .groupBy().sum("Fiction", "Non-Fiction") \
    .withColumnRenamed("sum(Fiction)", "Fiction") \
    .withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")
genre_counts_pd = genre_counts.toPandas()
fiction_label = genre_counts_pd.iloc[0]["Fiction"]
non_fiction_label = genre_counts_pd.iloc[0]["Non-Fiction"]

trace1 = go.Bar(
    x=["Fiction", "Non-Fiction"],
    y=[fiction_label, non_fiction_label],
    text=[fiction_label, non_fiction_label],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(template="plotly_dark", title = 'Count of ratings among Fiction and Non-Fiction Books', xaxis=dict(title='Categories'), yaxis=dict(title='Count'))
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


In [None]:
import plotly.graph_objs as go
from pyspark.sql.functions import col, when

# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Category", when(col("categories") == "Fiction", "Fiction").otherwise("Non-Fiction"))
genre_counts_pie = genre_counts.groupBy('Category').count()
genre_counts_pd = genre_counts_pie.toPandas()
def pie_plot(cnt_srs, title):
    labels = cnt_srs['Category']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig
fig_pie = pie_plot(genre_counts_pd, 'GENRE Distribution')
fig_pie.show()


In [None]:
!pip install bubbly

In [None]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['categories', 'publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for genre in df1['categories'].unique():
    df_genre = df1[df1['categories'] == genre]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=genre
    ))
fig.update_layout(
    template="plotly_dark",
    title='Bestsellers Amazon',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)
fig.show()


In [None]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd
df1 = books_final.groupBy(['publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()
fig = make_subplots(rows=1, cols=1)
for year in df1['publishedYear'].unique():
    df_genre = df1[df1['publishedYear'] == year]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=str(year)
    ))
fig.update_layout(
    template="plotly_dark",
    title='Bestsellers Amazon',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)
fig.show()


In [None]:
from pyspark.sql.functions import col, when
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
    .withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
    .groupBy().sum("Fiction", "Non-Fiction") \
    .withColumnRenamed("sum(Fiction)", "Fiction") \
    .withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")
fiction_data = books_final.filter(col("categories") == "Fiction")
non_fiction_data = books_final.filter(col("categories") == "Non Fiction")
temp_df1 = fiction_data.groupBy("rating").count().orderBy("rating").toPandas()
temp_df2 = non_fiction_data.groupBy("rating").count().orderBy("rating").toPandas()
import plotly.graph_objs as go
trace1 = go.Bar(
    x=temp_df1['rating'],
    y=temp_df1['count'],
    name="Fiction",
    marker=dict(color='rgb(249, 6, 6)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
trace2 = go.Bar(
    x=temp_df2['rating'],
    y=temp_df2['count'],
    name="Non Fiction",
    marker=dict(color='rgb(26, 118, 255)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
    template="plotly_dark",
    title='RATING BY GENRE',
    xaxis=dict(title='Rating'),
    yaxis=dict(title='Count')
)
fig = go.Figure(data=[trace1, trace2], layout=layout)
fig.show()


In [None]:
# Group by Author and count occurrences
top_authors = books_final.groupBy('authors').agg(F.avg('rating').alias("author_rating")) \
    .orderBy('author_rating', ascending=False).limit(10)
top_authors_pd = top_authors.toPandas()
trace1 = go.Bar(
    x=top_authors_pd['authors'],
    y=top_authors_pd['author_rating'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 AUTHORS WITH HIGH AVERAGE RATING',
    xaxis=dict(title='Author', tickangle=45),
    yaxis=dict(title='COUNT')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


In [None]:
# Group by Author and count occurrences
top_publishers = books_final.groupBy('publisher').agg(F.avg('rating').alias("publisher_rating")) \
    .orderBy('publisher_rating', ascending=False).limit(10)
top_publishers_pd = top_publishers.toPandas()
trace1 = go.Bar(
    x=top_publishers_pd['publisher'],
    y=top_publishers_pd['publisher_rating'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 PUBLISHERS WITH HIGH AVERAGE RATING',
    xaxis=dict(title='Author', tickangle=45),
    yaxis=dict(title='COUNT')
)
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


In [None]:
temp_df1_year = books_final.groupBy('publishedYear').mean()
temp_df1_year_pd = temp_df1_year.toPandas()
trace1_year = go.Bar(
    x=temp_df1_year_pd['publishedYear'],
    y=temp_df1_year_pd['avg(rating)'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)',width=1.5))
)
layout_year = go.Layout(
    template="plotly_dark",
    title='AVERAGE REVIEWS OVER THE YEARS',
    xaxis=dict(title='Year'),
    yaxis=dict(title='Average Reviews')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()


In [None]:
from pyspark.sql import functions as F
import plotly.graph_objs as go
import pandas as pd

# Group by publishedYear and calculate the mean of price
temp_df_year = books_final.groupBy('publishedYear').agg(F.mean('Price').alias('avg_price'))
temp_df_year_pd = temp_df_year.toPandas()
trace1_year = go.Bar(
    x=temp_df_year_pd['publishedYear'],
    y=temp_df_year_pd['avg_price'],
    marker=dict(color='rgb(148, 103, 189)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout_year = go.Layout(
    template="plotly_dark",
    title='AVERAGE PRICE OVER THE YEARS',
    xaxis=dict(title='Year'),
    yaxis=dict(title='Average Price')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()


In [None]:
books_final.show(20)

In [None]:
# Pivoting the DataFrame
book_pivot = books_final.groupBy("Title").pivot("User_id").avg("rating")

# Fill Null values with 0
book_pivot = book_pivot.na.fill(0)

# Convert pivot table to RDD of (Title, features) tuples
sparse_rdd = book_pivot.rdd.map(lambda row: (row[0], Vectors.dense(row[1:])))

# Define a schema for the RDD
schema = ["Title", "features"]

# Create a DataFrame from the RDD
sparse_matrix = spark.createDataFrame(sparse_rdd, schema)
sparse_matrix.show()

# Normalizing the features
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
norm_features = normalizer.transform(sparse_matrix)

# Creating a Bucketed Random Projection LSH model
brp = BucketedRandomProjectionLSH(inputCol="norm_features", outputCol="hashes", bucketLength=1.0, numHashTables=10)
model = brp.fit(norm_features)


In [None]:
#Provide input book title from the user
user_input_title = input("Enter the title of the book: ")
user_input_title = user_input_title.title()

In [None]:
# Check if the input book title is in the DataFrame
if book_pivot.filter(col("Title") == user_input_title).count() == 0:
    print(f"Book '{user_input_title}' not found.")
else:
    # Extracting the features of the input book
    input_book_features = norm_features.filter(col("Title") == user_input_title).select("norm_features").collect()[0][0]

# Approximate k nearest neighbors of the input book
knn = model.approxNearestNeighbors(norm_features, input_book_features, 6)
# Display the recommended books with their IDs
recommended_books = knn.filter(col("Title") != user_input_title).limit(5).collect()

for book in recommended_books:
    print(book[0])