In [3]:
# Load the datasets
file_path1 = "Books_rating.csv"
file_path2 = "books_data.csv"

books_rating = spark.read.csv(file_path1, header=True, inferSchema=True)
books_data = spark.read.csv(file_path2, header=True, inferSchema=True)

                                                                                

In [4]:
# Considering only the required columns
# books_rating = books_rating[['Id','Title','User_id','review/score']]     ##### price not considered
books_rating = books_rating[['Id','Title','User_id','review/score','Price']] 
books_data = books_data[['Title','authors','publisher','categories','publishedDate']]

books_rating.describe().show()
books_data.describe().show()


24/05/09 13:40:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|summary|                  Id|               Title|            User_id|      review/score|               Price|
+-------+--------------------+--------------------+-------------------+------------------+--------------------+
|  count|             3000000|             2999792|            2437750|           2999870|              482421|
|   mean|1.0568515696607149E9|   2012.796651763537|  18.29299003322259| 1656.860421970827|  21.767951161877054|
| stddev| 1.284488524833734E9|  1536.7533549608797|  21.99284402625621|1427549.9863179324|   26.21155241772817|
|    min|          0001047604|  """ Film technique| "" Film acting """|   & Algorithms"""|              "" and|
|    max|          B0064P287I|you can do anythi...|      AZZZZW74AAX75|         thersites|: A guide to loca...|
+-------+--------------------+--------------------+-------------------+------------------+--------------

[Stage 7:>                                                          (0 + 8) / 8]

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|               Title|             authors|           publisher|          categories|       publishedDate|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              212403|              181153|              139274|              171880|              186560|
|   mean|   3823.672941176471|              1578.4|             3734.75|  1983.7334777898159|   1982.702933143332|
| stddev|  10717.999589636447|  1278.7901502106834|  10193.316327911616|  142.43423125699238|   37.65620052385513|
|    min|  """ Film technique| "" ""I'm a Littl...| "" ""Skipper Ire...| "" Knox's quirky...| "" ""Cruising fo...|
|    max|you can do anythi...|” “Jeanie with th...|                펜립|�� folk art is a ...|” which is anthol...|
+-------+--------------------+--------------------+--------------------+----------

                                                                                

In [5]:
# Extract only the year from the publishedDate column
from pyspark.sql.functions import year
books_data = books_data.withColumn("publishedYear", year("publishedDate")).drop("publishedDate")
# Assuming `books_data` is your DataFrame containing the "publishedYear" column
books_data = books_data.filter(col("publishedYear").rlike("^\d+$"))

In [6]:
# dropping duplicate rows in both the tables

count_original = books_rating.count() 
books_rating.dropDuplicates()
count_after = books_rating.count() 
d1 = count_original - count_after
print("number of duplicates in books_rating:", d1)

count_original2 = books_data.count() 
books_data.dropDuplicates()
count_after2 = books_data.count() 
d2 = count_original2 - count_after2
print("number of duplicates in books_data:", d2)

                                                                                

number of duplicates in books_rating: 0
number of duplicates in books_data: 0


In [7]:
# Create a dictionary to display the count of null values for each column
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}

# Printing the count of null values
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)

                                                                                

CodeCache: size=131072Kb used=39115Kb max_used=39133Kb free=91956Kb
 bounds [0x0000000106458000, 0x0000000108ad8000, 0x000000010e458000]
 total_blobs=14195 nmethods=13252 adapters=854
 compilation: disabled (not enough contiguous free space left)
Null values in books_rating dataset: {'Id': 0, 'Title': 208, 'User_id': 562250, 'review/score': 130, 'Price': 2517579}
Null values in books_data dataset: {'Title': 1, 'authors': 6839, 'publisher': 48179, 'categories': 15259, 'publishedYear': 0}




In [8]:
from pyspark.sql.functions import when, col, first
from pyspark.sql.window import Window

# Replace non-numeric values with None in the price column
books_rating = books_rating.withColumn("Price", when(col("Price").cast("double").isNotNull(), col("Price")).otherwise(None))

# Show the cleaned DataFrame
books_rating.show()

# Define a window specification to partition by the book and order by the price
window_spec = Window.partitionBy("Title").orderBy("Price")

# Fill the missing values in the price column with the first non-null value within the window
books_rating = books_rating.withColumn("Price", first("Price", True).over(window_spec))


# null_counts_price = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}

# # Printing the count of null values
# print("Null values in books_rating dataset:", null_counts_price)

+----------+--------------------+--------------+------------+-----+
|        Id|               Title|       User_id|review/score|Price|
+----------+--------------------+--------------+------------+-----+
|1882931173|Its Only Art If I...| AVCGYZL8FQQTD|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A30TK6U7DNS82R|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3UH4UZ4RSVO82|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2MVUWT453QH61|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A22X4XUPKF66MR|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2F6NONFUDB6UK|         4.0| NULL|
|0826414346|Dr. Seuss: Americ...|A14OJS0VWMOSWO|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A2RSSXTDZDUSH4|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A25MD5I2GUIW6W|         5.0| NULL|
|0826414346|Dr. Seuss: Americ...|A3VA4XFS5WNJO3|         4.0| NULL|
|0829814000|Wonderful Worship...| AZ0IOBU20TBOP|         5.0|19.40|
|0829814000|Wonderful Worship...|A373VVEU6Z9M0N|

In [9]:
# Dropping null values

books_rating = books_rating.na.drop()
books_data = books_data.na.drop()

# Create a dictionary to display the count of null values for each column
null_counts_rating = {col_name: books_rating.filter(col(col_name).isNull()).count() for col_name in books_rating.columns}
null_counts_data = {col_name: books_data.filter(col(col_name).isNull()).count() for col_name in books_data.columns}

# Printing the count of null values
print("Null values in books_rating dataset:", null_counts_rating)
print("Null values in books_data dataset:", null_counts_data)

books_rating.describe().show()
books_data.describe().show()

                                                                                

Null values in books_rating dataset: {'Id': 0, 'Title': 0, 'User_id': 0, 'review/score': 0, 'Price': 0}
Null values in books_data dataset: {'Title': 0, 'authors': 0, 'publisher': 0, 'categories': 0, 'publishedYear': 0}


                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|        review/score|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""Beauty Shop-Ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|www.whitbread.org...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------



In [10]:
from pyspark.sql.functions import udf, col, split, size, regexp_replace, initcap
from pyspark.sql.types import StringType
import re

# Define the UDF to remove special characters
def remove_special_characters(text):
    if text is not None:
        return re.sub(r'[^\w\s]', '', text)
    else:
        return None

# Register the UDF
remove_special_characters_udf = udf(remove_special_characters, StringType())

# Apply the UDF to remove special characters from the categories column in the books_data DataFrame
books_data = books_data.withColumn("categories", remove_special_characters_udf("categories"))

# Filter out rows where categories column has more than one word
books_data = books_data.filter(size(split(col("categories"), " ")) == 1)

# Remove extra spaces at the beginning of words in titles
books_data = books_data.withColumn("Title", regexp_replace(col("Title"), "\\b\\s+", ""))

# Capitalize each word in the titles
books_data = books_data.withColumn("Title", initcap(col("Title")))
books_rating = books_rating.withColumn("Title", initcap(col("Title")))
books_data = books_data.withColumn("categories", initcap(col("categories")))

# Define a function to remove integers and special characters from categories
def remove_integers_and_special_characters(text):
    if text is not None:
        return re.sub(r'[^a-zA-Z\s]', '', text)
    else:
        return None

# Register the UDF
remove_integers_and_special_characters_udf = udf(remove_integers_and_special_characters, StringType())

# Apply the UDF to remove integers and special characters from the categories column
books_data = books_data.withColumn("categories", remove_integers_and_special_characters_udf("categories"))

# Filter out rows where categories column contains non-alphabetic characters
books_data = books_data.filter(books_data["categories"].rlike("^[a-zA-Z\s]+$"))

In [11]:

# Renaming columns for convenience
books_rating = books_rating.withColumnRenamed("review/score", "rating")

# Show the DataFrame
books_rating.describe().show()



+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|summary|                  Id|               Title|             User_id|              rating|            Price|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+
|  count|              414180|              414180|              414180|              414180|           414180|
|   mean|1.0516446472336355E9|   1889.688679245283|                NULL|   4.239725294240715|21.62184188033609|
| stddev|1.0441895933760347E9|   124.2255524061775|                NULL|  1.2911124349740133|26.24759579737497|
|    min|          0002554232|"""beauty Shop-ph...|A00117421L76WVWG4...|     & Algorithms"""|             1.00|
|    max|          B000TGB9VE|Zulu Shaman: Drea...|       AZZZZW74AAX75|teach to understa...|           995.00|
+-------+--------------------+--------------------+--------------------+--------------------+-----------



In [12]:
# Count the number of ratings per user
user_id_counts = books_rating.groupBy("User_id").agg(count("*").alias("count"))
user_id_counts= user_id_counts.orderBy(user_id_counts["count"].desc())
user_id_counts.show(20)

result = user_id_counts.groupBy((col('count') / 10).cast('int').alias('range')).count().orderBy('range')

result.show()

                                                                                

+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 2105|
|   AFVQZQ8PW0L|  606|
| AG35NEEFCMQVR|  307|
|A1M8PP7MLHNBQB|  278|
|A1D2C0WDCSHUWZ|  271|
| AHD101501WCN1|  242|
|A2VKWLCNZF4ZVB|  205|
|A1NATT3PN24QWY|  200|
|A1K1JW1C5CUSUZ|  179|
|A1X8VZWTOG8IS6|  174|
|A1S3C5OFU508P3|  158|
|A3M174IC0VXOS2|  152|
|A2EDZH51XHFA9B|  147|
|A2VE83MZF98ITY|  143|
|A21NVBFIEQWDSG|  142|
|A2NJO6YE954DBH|  141|
|A2OJW07GQRNJUT|  129|
|A2F6N60Z96CAJI|  118|
|A1OX82JPAQLL60|  113|
|A281NPSIMI1C2R|  112|
+--------------+-----+
only showing top 20 rows





+-----+------+
|range| count|
+-----+------+
|    0|303448|
|    1|  1213|
|    2|   250|
|    3|   126|
|    4|    47|
|    5|    33|
|    6|    16|
|    7|    10|
|    8|     6|
|    9|     7|
|   10|     8|
|   11|     5|
|   12|     1|
|   14|     4|
|   15|     2|
|   17|     2|
|   20|     2|
|   24|     1|
|   27|     2|
|   30|     1|
+-----+------+
only showing top 20 rows



                                                                                

In [13]:
# Filter out users that has invalid user_id

user_id_counts = user_id_counts.filter(col("user_id").rlike("^[a-zA-Z0-9]+$"))
user_id_counts.show(20)

# Filter users who has given more than 50 ratings

filtered_user_ids = user_id_counts.filter(col("count") >= 1).select("User_id")

filtered_user_ids.describe().show()

                                                                                

+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 2105|
|   AFVQZQ8PW0L|  606|
| AG35NEEFCMQVR|  307|
|A1M8PP7MLHNBQB|  278|
|A1D2C0WDCSHUWZ|  271|
| AHD101501WCN1|  242|
|A2VKWLCNZF4ZVB|  205|
|A1NATT3PN24QWY|  200|
|A1K1JW1C5CUSUZ|  179|
|A1X8VZWTOG8IS6|  174|
|A1S3C5OFU508P3|  158|
|A3M174IC0VXOS2|  152|
|A2EDZH51XHFA9B|  147|
|A2VE83MZF98ITY|  143|
|A21NVBFIEQWDSG|  142|
|A2NJO6YE954DBH|  141|
|A2OJW07GQRNJUT|  129|
|A2F6N60Z96CAJI|  118|
|A1OX82JPAQLL60|  113|
|A281NPSIMI1C2R|  112|
+--------------+-----+
only showing top 20 rows



[Stage 147:>                                                        (0 + 1) / 1]

+-------+--------------------+
|summary|             User_id|
+-------+--------------------+
|  count|              305186|
|   mean|                NULL|
| stddev|                NULL|
|    min|A00117421L76WVWG4...|
|    max|       AZZZZW74AAX75|
+-------+--------------------+



                                                                                

In [14]:
# Join to filter the ratings
filtered_ratings = books_rating.join(filtered_user_ids, "User_id", "inner")

In [15]:
# Merging both datasets
books_merged = filtered_ratings.join(books_data, "Title", "inner")

In [16]:
# Ensure the rating column is numeric
books_final = books_merged.withColumn("rating", col("rating").cast("double"))
books_final_df = books_final.toPandas()

# Show the Pandas DataFrame
print(books_final_df)

# Save Pandas DataFrame to CSV file
books_final_df.to_csv('books_final.csv', index=False)
books_final.show()

                                                                                

            Title         User_id          Id  rating  Price  \
0         Herland  A101DG7P9E26PW  1419123548     4.0  17.12   
1         Herland  A101DG7P9E26PW  1421810182     4.0  17.12   
2      Spellbound  A10VOEBL5S337W  014250193X     4.0   5.99   
3       Plainsong  A111DVWFAZPOO1  0375705856     1.0  10.20   
4        Hannibal  A11ZDEVYIMC6AI  0440224675     2.0   7.67   
...           ...             ...         ...     ...    ...   
6595  Bittersweet   AY55R37A4ZOMG  0440224845     5.0   7.99   
6596  Bittersweet   AY55R37A4ZOMG  0440224845     5.0   7.99   
6597    Plainsong   AY8OPSBAM5Q0K  0375705856     5.0  10.20   
6598    Plainsong   AYLQRKADT9XRF  0375705856     1.0  10.20   
6599     Restoree   AYNX52MQO3IHB  0345351878     5.0   7.99   

                           authors        publisher  categories  publishedYear  
0     ['Charlotte Perkins Gilman']  Xist Publishing     Fiction           2015  
1     ['Charlotte Perkins Gilman']  Xist Publishing     Fiction      



+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|            Title|       User_id|        Id|rating|Price|             authors|           publisher|categories|publishedYear|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|          Herland|A101DG7P9E26PW|1419123548|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|          Herland|A101DG7P9E26PW|1421810182|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|       Spellbound|A10VOEBL5S337W|014250193X|   4.0| 5.99|  ['James Essinger']|               Delta|   History|         2007|
|        Plainsong|A111DVWFAZPOO1|0375705856|   1.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|       



In [17]:
# Extract books that have received more than 50 ratings

# number_rating = books_merged.groupBy("Title").agg(count("rating").alias("number_of_ratings"))
# number_rating= number_rating.orderBy(number_rating["number_of_ratings"].desc())
# number_rating.show(20)

# result1 = number_rating.groupBy((col('number_of_ratings') / 10).cast('int').alias('range')).count().orderBy('range')

# result1.show()

In [18]:
# # Filter titles with at least 50 ratings
# number_rating = number_rating.filter(col("number_of_ratings") >= 25).select("Title")


In [19]:
# # Join filtered number of ratings with the original DataFrame
# books_filtered_nr = books_merged.join(number_rating, "Title", "inner")
# books_filtered_nr.show()
# books_final = books_filtered_nr

In [20]:
# Drop duplicates based on user_id and title
#window_spec = Window.partitionBy(books_filtered_nr['User_id'], books_filtered_nr['Title']).orderBy("User_id")
#books_final = books_filtered_nr.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1).drop("rn")

In [21]:
# Show the contents of the DataFrame

books_final.show()



+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|            Title|       User_id|        Id|rating|Price|             authors|           publisher|categories|publishedYear|
+-----------------+--------------+----------+------+-----+--------------------+--------------------+----------+-------------+
|          Herland|A101DG7P9E26PW|1419123548|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|          Herland|A101DG7P9E26PW|1421810182|   4.0|17.12|['Charlotte Perki...|     Xist Publishing|   Fiction|         2015|
|       Spellbound|A10VOEBL5S337W|014250193X|   4.0| 5.99|  ['James Essinger']|               Delta|   History|         2007|
|        Plainsong|A111DVWFAZPOO1|0375705856|   1.0|10.20|      ['Kent Haruf']|             Vintage|   Fiction|         2001|
|         Hannibal|A11ZDEVYIMC6AI|0440224675|   2.0| 7.67|   ['Thomas Harris']|                Dell|   Fiction|       



In [22]:
# books_final.write.csv("/Users/soumitra7/Desktop/603", header=True, mode="overwrite")

In [23]:
books_final.describe().show()

                                                                                

+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+
|summary|Title|       User_id|                  Id|            rating|             Price|             authors|       publisher|categories|    publishedYear|
+-------+-----+--------------+--------------------+------------------+------------------+--------------------+----------------+----------+-----------------+
|  count| 6600|          6600|                6600|              6571|              6600|                6600|            6600|      6600|             6600|
|   mean| NULL|          NULL| 6.845429041481897E8| 3.721503576320195|18.180798484848523|                NULL|            NULL|      NULL|2007.634696969697|
| stddev| NULL|          NULL|4.6595010557118016E8|1.4230303776831768|55.546915116959966|                NULL|            NULL|      NULL|6.783271168969406|
|    min|  51a|A100I0T791DIKS|          0007104022|       

In [63]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd

# Assuming data is your PySpark DataFrame

# Convert PySpark DataFrame to Pandas DataFrame
temp_df = books_final.groupBy("rating").count().toPandas()

# Create trace1 for bar chart
trace1 = go.Bar(
    x=temp_df['rating'],
    y=temp_df['count'],
    text = temp_df['count'],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)', line=dict(color='rgb(0,0,0)', width=1.5))
)

# Layout for bar chart
layout_bar = go.Layout(
    template="plotly_dark",
    title='RATINGS COUNT',
    xaxis=dict(title='Rating'),
    yaxis=dict(title='Count')
)

# Create figure for bar chart
fig_bar = go.Figure(data=[trace1], layout=layout_bar)
fig_bar.show()


                                                                                

In [62]:
import matplotlib.pyplot as plt
import numpy as np
import plotly.graph_objs as go
import pandas as pd

def pie_plot(cnt_srs, title):
    labels = cnt_srs['rating']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig

# Plot pie chart
fig_pie = pie_plot(temp_df, 'Rating Distribution')
fig_pie.show()


In [79]:
from pyspark.sql.functions import col, when

# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
    .withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
    .groupBy().sum("Fiction", "Non-Fiction") \
    .withColumnRenamed("sum(Fiction)", "Fiction") \
    .withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")

# Convert the PySpark DataFrame to Pandas
genre_counts_pd = genre_counts.toPandas()

# Extract values for labels
fiction_label = genre_counts_pd.iloc[0]["Fiction"]
non_fiction_label = genre_counts_pd.iloc[0]["Non-Fiction"]

# Create the bar plot with Plotly
trace1 = go.Bar(
    x=["Fiction", "Non-Fiction"],
    y=[fiction_label, non_fiction_label],
    text=[fiction_label, non_fiction_label],  # Labels for each bar
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)
layout = go.Layout(template="plotly_dark", title = 'Count of ratings among Fiction and Non-Fiction Books', xaxis=dict(title='Categories'), yaxis=dict(title='Count'))
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


                                                                                

In [78]:
import plotly.graph_objs as go
from pyspark.sql.functions import col, when

# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Category", when(col("categories") == "Fiction", "Fiction").otherwise("Non-Fiction"))

# Compute the count of each genre
genre_counts_pie = genre_counts.groupBy('Category').count()

# Convert the PySpark DataFrame to Pandas
genre_counts_pd = genre_counts_pie.toPandas()

# Define the pie plot function
def pie_plot(cnt_srs, title):
    labels = cnt_srs['Category']
    values = cnt_srs['count']
    trace = go.Pie(
        labels=labels,
        values=values,
        hoverinfo='percent+value',
        textinfo='percent',
        textposition='inside',
        hole=0.7,
        showlegend=True,
        marker=dict(
            colors=plt.cm.viridis_r(np.linspace(0, 1, len(cnt_srs))),
            line=dict(color='#000000', width=2)
        )
    )
    layout = go.Layout(
        template="plotly_dark",
        title=title
    )
    fig = go.Figure(data=[trace], layout=layout)
    return fig

# Plot pie chart
fig_pie = pie_plot(genre_counts_pd, 'GENRE Distribution')
fig_pie.show()


24/05/09 15:14:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [83]:
!pip install bubbly

[33mDEPRECATION: Loading egg at /Users/soumitra7/anaconda3/lib/python3.11/site-packages/pyBWMD-0.0.1-py3.11.egg is deprecated. pip 23.3 will enforce this behaviour change. A possible replacement is to use pip for package installation..[0m[33m
[0mCollecting bubbly
  Downloading bubbly-1.0.2.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: bubbly
  Building wheel for bubbly (setup.py) ... [?25ldone
[?25h  Created wheel for bubbly: filename=bubbly-1.0.2-py3-none-any.whl size=5415 sha256=c800351303ddd4f40a679415fea7ed0e9bb7d351a60e867fa9d4795b0759a125
  Stored in directory: /Users/soumitra7/Library/Caches/pip/wheels/83/9a/47/387700a4f2acbdf0b606f3a389913db17982d52e25ee5f9187
Successfully built bubbly
Installing collected packages: bubbly
Successfully installed bubbly-1.0.2


In [96]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd

# Group by Genre and Year and calculate the mean
df1 = books_final.groupBy(['categories', 'publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()

# Create a bubble plot using Plotly
fig = make_subplots(rows=1, cols=1)

# Add traces for each genre
for genre in df1['categories'].unique():
    df_genre = df1[df1['categories'] == genre]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=genre
    ))

# Update layout
fig.update_layout(
    template="plotly_dark",
    title='Bestsellers Amazon',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)

# Show plot
fig.show()


24/05/09 16:01:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [100]:
from pyspark.sql import functions as F
from plotly.subplots import make_subplots
import plotly.graph_objs as go
import pandas as pd

# Group by Genre and Year and calculate the mean
df1 = books_final.groupBy(['publishedYear']).agg(F.mean('rating').alias('User Rating'), F.mean('Price').alias('Price')).toPandas()

# Create a bubble plot using Plotly
fig = make_subplots(rows=1, cols=1)

# Add traces for each genre
for year in df1['publishedYear'].unique():
    df_genre = df1[df1['publishedYear'] == year]
    fig.add_trace(go.Scatter(
        x=df_genre['User Rating'],
        y=df_genre['Price'],
        mode='markers',
        marker=dict(size=10),
        name=str(year)
    ))

# Update layout
fig.update_layout(
    template="plotly_dark",
    title='Bestsellers Amazon',
    xaxis_title='User Rating',
    yaxis_title='Avg Price',
    xaxis=dict(type='log'),  # Log scale for x-axis
    showlegend=True
)

# Show plot
fig.show()


                                                                                

In [27]:
from pyspark.sql.functions import col
# Split the categories column into Fiction and Non-Fiction
genre_counts = books_final.withColumn("Fiction", when(col("categories") == "Fiction", 1).otherwise(0)) \
    .withColumn("Non-Fiction", when(col("categories") != "Fiction", 1).otherwise(0)) \
    .groupBy().sum("Fiction", "Non-Fiction") \
    .withColumnRenamed("sum(Fiction)", "Fiction") \
    .withColumnRenamed("sum(Non-Fiction)", "Non-Fiction")

# Filter data for Fiction and Non-Fiction genres
fiction_data = books_final.filter(col("categories") == "Fiction")
non_fiction_data = books_final.filter(col("categories") == "Non Fiction")

# Count the occurrences of each user rating for Fiction and Non-Fiction genres
temp_df1 = fiction_data.groupBy("rating").count().orderBy("rating").toPandas()
temp_df2 = non_fiction_data.groupBy("rating").count().orderBy("rating").toPandas()

# Plotly imports
import plotly.graph_objs as go

# Create trace1 for Fiction
trace1 = go.Bar(
    x=temp_df1['rating'],
    y=temp_df1['count'],
    name="Fiction",
    marker=dict(color='rgb(249, 6, 6)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)

# Create trace2 for Non Fiction
trace3 = go.Bar(
    x=temp_df2['rating'],
    y=temp_df2['count'],
    name="Non Fiction",
    marker=dict(color='rgb(26, 118, 255)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)

# Define layout
layout = go.Layout(
    template="plotly_dark",
    title='RATING BY GENRE',
    xaxis=dict(title='Rating'),
    yaxis=dict(title='Count')
)

# Create figure and plot
fig = go.Figure(data=[trace1, trace3], layout=layout)
fig.show()


[Stage 309:>(20 + 2) / 22][Stage 310:==>(7 + 1) / 8][Stage 312:>  (0 + 5) / 9]8]

In [28]:
# Group by Author and count occurrences
top_authors = books_final.groupBy('authors').count() \
    .orderBy('count', ascending=False).limit(10)

# Convert the result to Pandas DataFrame for visualization
top_authors_pd = top_authors.toPandas()

# Create trace1
trace1 = go.Bar(
    x=top_authors_pd['authors'],
    y=top_authors_pd['count'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)

# Define layout
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 AUTHORS WITH MOST BESTSELLERS',
    xaxis=dict(title='Author', tickangle=45),
    yaxis=dict(title='COUNT')
)

# Create figure and plot
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


                                                                                

In [41]:



# Group by Name and calculate the mean reviews
top_reviews_books = books_final.groupBy('Title').mean('rating') \
    .orderBy('avg(rating)', ascending=False).limit(10)

# Convert the result to Pandas DataFrame for visualization
top_reviews_books_pd = top_reviews_books.toPandas()

# Create trace1
trace1 = go.Bar(
    x=top_reviews_books_pd['Title'],
    y=top_reviews_books_pd['avg(rating)'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)', width=1.5))
)

# Define layout
layout = go.Layout(
    template="plotly_dark",
    title='TOP 10 BESTSELLERS WITH HIGHEST AVERAGE REVIEWS',
    xaxis=dict(title='Book', tickangle=45),
    yaxis=dict(title='Average Reviews')
)

# Create figure and plot
fig = go.Figure(data=[trace1], layout=layout)
fig.show()


                                                                                

In [30]:
temp_df1_year = books_final.groupBy('publishedYear').mean()

# Convert the result to Pandas DataFrame for visualization
temp_df1_year_pd = temp_df1_year.toPandas()

# create trace1 for average reviews over the years
trace1_year = go.Bar(
    x=temp_df1_year_pd['publishedYear'],
    y=temp_df1_year_pd['avg(rating)'],
    marker=dict(color='rgb(255,165,0)',
                line=dict(color='rgb(0,0,0)',width=1.5))
)
layout_year = go.Layout(
    template="plotly_dark",
    title='AVERAGE REVIEWS OVER THE YEARS',
    xaxis=dict(title='Year'),
    yaxis=dict(title='Average Reviews')
)
fig_year = go.Figure(data=[trace1_year], layout=layout_year)
fig_year.show()


24/05/09 13:45:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [51]:
temp_df1_year.show()

[Stage 446:>                                                        (0 + 8) / 8]

+-------------+------------------+------------------+
|publishedYear|       avg(rating)|avg(publishedYear)|
+-------------+------------------+------------------+
|         1990| 4.333333333333333|            1990.0|
|         2003|3.8871473354231973|            2003.0|
|         2007| 3.914798206278027|            2007.0|
|         2018| 4.129310344827586|            2018.0|
|         2015| 3.909502262443439|            2015.0|
|         2006|  4.06280193236715|            2006.0|
|         2022|3.7983193277310923|            2022.0|
|         2013| 3.957317073170732|            2013.0|
|         1988| 4.464516129032258|            1988.0|
|         1994|3.6904761904761907|            1994.0|
|         2014|            4.3125|            2014.0|
|         2019|3.9272727272727272|            2019.0|
|         2004| 4.504424778761062|            2004.0|
|         1991|3.4545454545454546|            1991.0|
|         1989|              4.75|            1989.0|
|         1996|          3.5

                                                                                

In [None]:
# Pivoting the DataFrame
book_pivot = books_final.groupBy("Title").pivot("User_id").avg("rating")

# Fill NaN values with 0
book_pivot = book_pivot.na.fill(0)

# Convert pivot table to RDD of (Title, features) tuples
sparse_rdd = book_pivot.rdd.map(lambda row: (row[0], Vectors.dense(row[1:])))

# Define a schema for the RDD
schema = ["Title", "features"]

# Create a DataFrame from the RDD
sparse_matrix = spark.createDataFrame(sparse_rdd, schema)
sparse_matrix.show()

# Normalizing the features
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
norm_features = normalizer.transform(sparse_matrix)

# Creating a Bucketed Random Projection LSH model
brp = BucketedRandomProjectionLSH(inputCol="norm_features", outputCol="hashes", bucketLength=1.0, numHashTables=10)
model = brp.fit(norm_features)


In [None]:
# # Provide input book title from the user
user_input_title = input("Enter the title of the book: ")

# Capitalize each word in the input title
user_input_title = user_input_title.title()

In [None]:
# Check if the input book title is in the DataFrame
if book_pivot.filter(col("Title") == user_input_title).count() == 0:
    print(f"Book '{user_input_title}' not found.")
else:
    # Extracting the features of the input book
    input_book_features = norm_features.filter(col("Title") == user_input_title).select("norm_features").collect()[0][0]

# Approximate k nearest neighbors of the input book
knn = model.approxNearestNeighbors(norm_features, input_book_features, 6)
# Display the recommended books with their IDs
recommended_books = knn.filter(col("Title") != user_input_title).limit(5).collect()

for book in recommended_books:
    print(book[0])