## Create A spark Session

In [109]:
from  pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as f

In [110]:
spark = SparkSession.builder.appName("My_Spark_Project")\
.config("spark.memory.offHeap. enabled", "true") . config("spark.memory.offHeap. size", "30g") \
.getOrCreate()

## Data Wrangling
 **Tip**: In this section of the report, you will load in the data, check for cleanliness, and then trim and clean your dataset for analysis. Make sure that you document your steps carefully and justify your cleaning decisions. We will start with addressing General properities about the dataset.


In [111]:
file_path = r"C:\Users\abdel\Downloads\books_data.csv (1)\books_data.csv"
# Read the CSV file into a DataFrame
books_df = spark.read.csv(file_path, header=True, inferSchema=True)


### General Properties

> **Show Books Dataset**

In [112]:
books_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               Title|         description|             authors|               image|         previewLink|           publisher|       publishedDate|            infoLink|          categories|        ratingsCount|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Its Only Art If I...|                null|    ['Julie Strain']|http://books.goog...|http://books.goog...|                null|                1996|http://books.goog...|['Comics & Graphi...|                null|
|Dr. Seuss: Americ...|"Philip Nel takes...| like that of Lew...| has changed lang...| giving us new wo...| inspiring artist...|      ['Philip Nel']|http

In [113]:
books_data_df.printSchema()
print("Count of dataframe:",books_df.count())

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = false)
 |-- authors: string (nullable = false)
 |-- image: string (nullable = false)
 |-- previewLink: string (nullable = false)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- infoLink: string (nullable = false)
 |-- categories: string (nullable = true)
 |-- ratingsCount: string (nullable = true)

Count of dataframe: 212404


In [114]:
books_df.describe().show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|               Title|         description|             authors|               image|         previewLink|           publisher|       publishedDate|            infoLink|          categories|        ratingsCount|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|              212403|              144047|              181153|              161213|              188349|              139274|              186560|              188103|              171880|               63852|
|   mean|   3823.672941176471|  1.4285714285714286|              1578.4|              1184.0|            Infinity|      

> **Apply ETL => Transform Step** 

In [115]:
from pyspark.sql.functions import col
# Assuming books_data_df is your DataFrame
books_df = books_df.withColumn("ratingsCount", col("ratingsCount").cast("float"))
# Check the schema to verify the data type conversion
books_df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- previewLink: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: string (nullable = true)
 |-- infoLink: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- ratingsCount: float (nullable = true)



In [116]:
books_df = books_df.withColumnRenamed('previewLink', 'preview_Link') \
                     .withColumnRenamed('infoLink', 'info_Link')

print (books_df)

DataFrame[Title: string, description: string, authors: string, image: string, preview_Link: string, publisher: string, publishedDate: string, info_Link: string, categories: string, ratingsCount: float]


## Data Cleaning

> **Check Null values**

In [120]:
columns_to_check = books_df.columns
# Count null values in each column
null_counts = books_df.agg(*[spark_sum(col(c).isNull().cast("int")).alias(c + '_null_count') for c in columns_to_check])

# Collect the result as a single row
null_counts_single_row = null_counts.collect()[0]

# Show the null counts
for col_name in columns_to_check:
    print(f"Null count in '{col_name}': {null_counts_single_row[col_name + '_null_count']}")

Null count in 'Title': 1
Null count in 'description': 68357
Null count in 'authors': 31251
Null count in 'image': 51191
Null count in 'preview_Link': 24055
Null count in 'publisher': 73130
Null count in 'publishedDate': 25844
Null count in 'info_Link': 24301
Null count in 'categories': 40524
Null count in 'ratingsCount': 168972


In [121]:
# List of columns to find most frequent values
cols_to_check = ["Title", "description", "authors", "image", "previewLink", "publisher", "publishedDate", "infoLink", "categories", "ratingsCount"]

# Find the most frequent value in each column
most_frequent_values = []
for col_name in cols_to_check:
    mode_value = books_data_df.groupBy(col_name).count().orderBy(col("count").desc()).select(col_name).first()[0]
    most_frequent_values.append((col_name, mode_value))

# Print the most frequent value in each column
for col_name, value in most_frequent_values:
    print(f"Most frequent value in column '{col_name}': {value}")

Most frequent value in column 'Title': """Mom
Most frequent value in column 'description': Not Available
Most frequent value in column 'authors': Unknown
Most frequent value in column 'image': Not Available
Most frequent value in column 'previewLink': Not Available
Most frequent value in column 'publisher': None
Most frequent value in column 'publishedDate': None
Most frequent value in column 'infoLink': Not Available
Most frequent value in column 'categories': None
Most frequent value in column 'ratingsCount': None


> **Fill Null values**

* Fill numerical col with mean

In [122]:
from pyspark.sql.functions import col, count, when
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['ratingsCount'], outputCols=['ratingsCount']).setStrategy("mean")
books_df = imputer.fit(books_df).transform(books_df)

In [123]:
# Fill missing values with a default value
default_value = "Unknown"
books_df = books_df.fillna(default_value, subset=[ 'publisher','authors','categories'])
default_value2 = "Not Available"
books_df = books_df.fillna(default_value2, subset=['image', 'preview_Link', 'info_Link', 'description'])

In [124]:
from pyspark.sql.functions import substring, col
books_df = books_df.withColumn('Year', substring(col('publishedDate'), 1, 4))

> **Drop Null values in these coloumns**

In [125]:
books_df= books_df.dropna(subset=['Title'])
books_df = books_df.dropna(subset=['Year'])

In [126]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
total_rows = books_df.count()
null_percentages = []
for col_name in books_df.columns:
    null_count = books_df.where(col(col_name).isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    null_percentages.append((col_name, null_percentage))

# Print null percentages
for col_name, percentage in null_percentages:
    print(f"Null percentage in column '{col_name}': {percentage:.2f}%")

Null percentage in column 'Title': 0.00%
Null percentage in column 'description': 0.00%
Null percentage in column 'authors': 0.00%
Null percentage in column 'image': 0.00%
Null percentage in column 'preview_Link': 0.00%
Null percentage in column 'publisher': 0.00%
Null percentage in column 'publishedDate': 0.00%
Null percentage in column 'info_Link': 0.00%
Null percentage in column 'categories': 0.00%
Null percentage in column 'ratingsCount': 0.00%
Null percentage in column 'Year': 0.00%


> **Drop unused col**

In [127]:
# Drop the 'publishedDate' column
books_df = books_df.drop('publishedDate')
# Check the schema to verify the column has been dropped
books_df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = false)
 |-- authors: string (nullable = false)
 |-- image: string (nullable = false)
 |-- preview_Link: string (nullable = false)
 |-- publisher: string (nullable = false)
 |-- info_Link: string (nullable = false)
 |-- categories: string (nullable = false)
 |-- ratingsCount: float (nullable = true)
 |-- Year: string (nullable = true)



> **check Dublication** 

In [128]:
print("Before drop dublication:",books_df.count())
books_df.dropDuplicates()
print("After drop dublication:",books_df.count())

Before drop dublication: 186559
After drop dublication: 186559


> **Show of data after cleanning**

In [133]:
books_df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+
|               Title|         description|             authors|               image|        preview_Link|           publisher|           info_Link|          categories|ratingsCount|Year|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+----+
|Its Only Art If I...|       Not Available|    ['Julie Strain']|http://books.goog...|http://books.goog...|             Unknown|http://books.goog...|['Comics & Graphi...|   56.315205|1996|
|Dr. Seuss: Americ...|"Philip Nel takes...| like that of Lew...| has changed lang...| giving us new wo...| inspiring artist...|http://books.goog...|http://books.goog...|   56.315205|['Ph|
|Wonderful Worship...|This resource inc...|    ['David R. Ra

#### Some Insigths From data

In [183]:
books_df.groupBy("authors").count().show()
books_df.groupBy("publisher").count().show()


+--------------------+-----+
|             authors|count|
+--------------------+-----+
|  ['Barbara Melosh']|    1|
|    ['Donald Cline']|    1|
|     ['Dian Layton']|    2|
| ['Sergius Golowin']|    1|
|       ['Kotoyama,']|    1|
|   ['Joseph Kerman']|    1|
|     ['Kay Flowers']|    1|
|     ['John Rewald']|    3|
|"" To a very stro...|    1|
|['I. Ristic', 'Ia...|    1|
|['Andrew P. Tobias']|    3|
|['Judith Ennamora...|    2|
|['Jamgon Kongtrul...|    1|
|['Frank Miller', ...|    1|
|['National Resear...|    1|
| all of whom were...|    1|
|['William B. Park...|    1|
|      ['Max Fogiel']|    3|
|      ['Jules Bass']|    1|
|  ['Rebecca Harvin']|    1|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|           publisher|count|
+--------------------+-----+
|        Lorenz Books|   22|
|       The New Press|   15|
|Janes Information...|    7|
|National Committe...|    1|
|            Capstone|   81|
|          Soma Books|    5|
| perhaps the mos

In [182]:
from pyspark.sql import functions as F
# Count the number of unique states
unique_states_df = books_df.agg(F.countDistinct("Title").alias("unique_Titles"))
unique_states_df.show()
# Count the number of unique cities
unique_cities_df = books_df.agg(F.countDistinct("categories").alias("unique_categories"))
unique_cities_df.show()

+-------------+
|unique_Titles|
+-------------+
|       186555|
+-------------+

+-----------------+
|unique_categories|
+-----------------+
|            27802|
+-----------------+



In [190]:
books_df.groupBy("Title").count().show()
books_df.groupBy("categories").count().show()

+--------------------+-----+
|               Title|count|
+--------------------+-----+
|Isaac Asimov: Mas...|    1|
|     White Rock Ways|    1|
|The Face of the T...|    1|
|     Iridescent Soul|    1|
|L'Alchimiste (Cof...|    1|
|  The Book of Garlic|    1|
|A Jesse Stuart Ha...|    1|
|      Badenheim 1939|    1|
|        Pagan Babies|    1|
|The Self and its ...|    1|
|The Educated Chil...|    1|
|Future Perfect - ...|    1|
|The cornet of hor...|    1|
|Basic Arabic Work...|    1|
|Organizational Th...|    1|
|Oz and Beyond: Th...|    1|
|Fundamentals of I...|    1|
|We Love Baseball!...|    1|
|... Marine Mollus...|    1|
|Surviving Childho...|    1|
+--------------------+-----+
only showing top 20 rows

+----------------------+-----+
|            categories|count|
+----------------------+-----+
|   Toronto Globe an...|    1|
|       ['Arboviruses']|    1|
|  "[""Children's so...|   10|
|   Gopnik shows tha...|    1|
|  ['Melanchthon, Ph...|    1|
|   and always naked...|    1|

In [187]:
distinct_counts_df = books_df.agg(
    F.countDistinct("publisher").alias("distinct_publishers"),
    F.countDistinct("categories").alias("distinct_categories")
)

distinct_counts_df.show()

+-------------------+-------------------+
|distinct_publishers|distinct_categories|
+-------------------+-------------------+
|              34264|              28361|
+-------------------+-------------------+



> **Book Categories Analysis**

In [191]:
category_counts = books_df.groupBy("categories").count().orderBy(col("count").desc())
category_counts.show()

+--------------------+-----+
|          categories|count|
+--------------------+-----+
|         ['Fiction']|19970|
|             Unknown|15672|
|         ['History']| 7873|
|        ['Religion']| 7871|
|['Juvenile Fiction']| 6081|
|['Biography & Aut...| 5200|
|['Business & Econ...| 4659|
|       ['Computers']| 3656|
|  ['Social Science']| 3206|
|['Juvenile Nonfic...| 3184|
|         ['Science']| 2341|
|       ['Education']| 2291|
|         ['Cooking']| 2157|
|['Sports & Recrea...| 1971|
|         ['Medical']| 1828|
|['Family & Relati...| 1810|
|           ['Music']| 1798|
|             ['Art']| 1795|
|['Literary Critic...| 1786|
|['Language Arts &...| 1750|
+--------------------+-----+
only showing top 20 rows



> **Authors Analysis**

In [193]:
# Count books per author
author_counts = books_df.groupBy("authors").count().orderBy(col("count").desc())
author_counts.show()


+--------------------+-----+
|             authors|count|
+--------------------+-----+
|             Unknown| 6903|
| we issue the boo...|  252|
|       ['Rose Arny']|  235|
|['Library of Cong...|  178|
|['William Shakesp...|  177|
| ['Agatha Christie']|  133|
|['Erle Stanley Ga...|  116|
|"[""Louis L'Amour...|  115|
| ['Charles Dickens']|   79|
|['Edgar Rice Burr...|   71|
|   ['Carolyn Keene']|   70|
|   ['Ann M. Martin']|   66|
| ['Rudyard Kipling']|   64|
|    ['Isaac Asimov']|   64|
|       ['Zane Grey']|   63|
|    ['Nora Roberts']|   59|
|      ['Mark Twain']|   56|
|     ['Henry James']|   53|
|   ['Joseph Conrad']|   53|
|['Arthur James We...|   52|
+--------------------+-----+
only showing top 20 rows



> **Publisher Analysis**

In [195]:
# Count books per publisher
publisher_counts = books_df.groupBy("publisher").count().orderBy(col("count").desc())
publisher_counts.show()


+--------------------+-----+
|           publisher|count|
+--------------------+-----+
|             Unknown|48575|
|  Simon and Schuster| 2943|
|             Penguin| 2304|
|           Routledge| 2149|
|   John Wiley & Sons| 1671|
|      Harper Collins| 1504|
|Cambridge Univers...| 1451|
|           Macmillan| 1059|
|     Open Road Media|  974|
| Courier Corporation|  967|
|Houghton Mifflin ...|  836|
|             Vintage|  820|
|        Random House|  776|
|Springer Science ...|  739|
|           iUniverse|  689|
|Oxford University...|  640|
|Oxford University...|  638|
|         Hachette UK|  518|
|       HarperCollins|  495|
|           Harlequin|  495|
+--------------------+-----+
only showing top 20 rows



In [196]:
# Count books per year
year_counts = books_df.groupBy("Year").count().orderBy("Year")
year_counts.show()


+----+-----+
|Year|count|
+----+-----+
| "" |    6|
| """|    3|
| ""'|    1|
| ""-|    1|
| ""A|    8|
| ""C|    5|
| ""D|    2|
| ""E|    1|
| ""F|    2|
| ""G|    2|
| ""H|    5|
| ""I|    3|
| ""J|    3|
| ""L|    3|
| ""M|    3|
| ""N|    1|
| ""O|    2|
| ""P|    4|
| ""Q|    3|
| ""R|    2|
+----+-----+
only showing top 20 rows



In [202]:
from pyspark.sql.functions import avg

# Calculate average ratings per category
avg_ratings_per_category = books_df.groupBy("categories").agg(avg("ratingsCount").alias("avg_ratings")).orderBy("avg_ratings", ascending=False)
avg_ratings_per_category.show()

# Calculate average ratings per publisher
avg_ratings_per_publisher = books_df.groupBy("publisher").agg(avg("ratingsCount").alias("avg_ratings")).orderBy("avg_ratings", ascending=False)
avg_ratings_per_publisher.show()


+--------------------+------------------+
|          categories|       avg_ratings|
+--------------------+------------------+
| ['Charity-schools']|            2634.0|
|['Abnormalities, ...|2576.0788011550903|
|HMH Books For You...|            2020.0|
|    Houghton Mifflin|            2019.0|
|                Mack|            2019.0|
|Arthur A. Levine ...|            2014.0|
|Knopf Books for Y...|            2014.0|
|Sterling Publishe...|            2013.0|
|       Jonathan Cape|            2013.0|
|         Arena Press|            2013.0|
|      Signet Classic|            2013.0|
|    Penn State Press|            2012.0|
|Penguin Young Rea...|            2012.0|
|       Templar Books|            2012.0|
|            Raintree|            2012.0|
| Gryphon House, Inc.|            2012.0|
|Carnelian Heart P...|            2012.0|
|        Amacom Books|            2011.0|
|       Watermark Pub|            2011.0|
|Carson Dellosa Pu...|            2011.0|
+--------------------+------------

In [206]:
# Count books per author
books_per_author = books_df.groupBy("authors").count().orderBy(col("count").desc())

# Calculate average ratings per author
avg_ratings_per_author = books_df.groupBy("authors").agg(avg("ratingsCount").alias("avg_ratings")).orderBy("avg_ratings", ascending=False)

# Join to get both counts and average ratings
top_authors_analysis = books_per_author.join(avg_ratings_per_author, "authors")
top_authors_analysis.show()


+--------------------+-----+-----------------+
|             authors|count|      avg_ratings|
+--------------------+-----+-----------------+
|  ['Barbara Melosh']|    1|56.31520462036133|
|    ['Donald Cline']|    1|56.31520462036133|
|     ['Dian Layton']|    2|56.31520462036133|
| ['Sergius Golowin']|    1|56.31520462036133|
|       ['Kotoyama,']|    1|56.31520462036133|
|   ['Joseph Kerman']|    1|56.31520462036133|
|     ['Kay Flowers']|    1|56.31520462036133|
|     ['John Rewald']|    3|56.31520462036133|
|"" To a very stro...|    1|56.31520462036133|
|['I. Ristic', 'Ia...|    1|56.31520462036133|
|['Andrew P. Tobias']|    3|56.31520462036133|
|['Judith Ennamora...|    2|56.31520462036133|
|['Jamgon Kongtrul...|    1|56.31520462036133|
|['Frank Miller', ...|    1|56.31520462036133|
|['National Resear...|    1|56.31520462036133|
| all of whom were...|    1|56.31520462036133|
|['William B. Park...|    1|56.31520462036133|
|      ['Max Fogiel']|    3|56.31520462036133|
|      ['Jule

In [208]:
from pyspark.sql.functions import collect_set

# Common authors between books
common_authors_df = books_df.groupBy("Title").agg(collect_set("authors").alias("authors_list"))
common_authors_df.show()

# Common categories between books
common_categories_df = books_df.groupBy("Title").agg(collect_set("categories").alias("categories_list"))
common_categories_df.show()

# Common publishers between books
common_publishers_df = books_df.groupBy("Title").agg(collect_set("publisher").alias("publishers_list"))
common_publishers_df.show()



+--------------------+--------------------+
|               Title|        authors_list|
+--------------------+--------------------+
|""" We'll Always ...|[['Robert A. Nowl...|
|"""Always ready!"...|[&dq=%22Always+re...|
|"""Billboard"" Bo...|          [ country]|
|"""Carefree"" (R....|[['Allan Scott', ...|
|"""Catch 'em aliv...|[ John R. Abernat...|
|      """Cleanliness|[ (Pointers for l...|
|"""Could Be Worse...|[['James Stevenso...|
|"""Gentlemen pref...|[ forging a new a...|
|          """Gizelle|[Tells the story ...|
|"""Glory is a-com...|[['Martha Peterso...|
|"""I Do""...Weddi...|           [Unknown]|
|    """James Bowie""|[",,['Evelyn Brog...|
|"""Little Rainman...|[['Karen L. Simmo...|
|"""Nothing but pr...|           [Unknown]|
|"""Our Brown-Eyed...|[['Jeffrey McAndr...|
|"""Pet Shop Boys"...|   [['Chris Heath']]|
|"""Purse""onalize...|[['Andrews McMeel...|
|          """Sweeps"|         [ Barbados]|
|"""The Jukes"": A...|[ Disease and Her...|
|"""To whom the go...|[['Diana M

In [None]:
# Count books per author
books_per_author = books_df.groupBy("authors").count().orderBy(col("count").desc())

# Calculate average ratings per author
avg_ratings_per_author = books_df.groupBy("authors").agg(avg("ratingsCount").alias("avg_ratings")).orderBy("avg_ratings", ascending=False)

# Join to get both counts and average ratings
top_authors_analysis = books_per_author.join(avg_ratings_per_author, "authors")
top_authors_analysis.show()


### General Properties of second DS

In [136]:
file_path = r"C:\Users\abdel\Downloads\BigData_Project\Sales_Data\Books_rating.csv"
# Read the CSV file into a DataFrame
rating_df = spark.read.csv(file_path, header=True, inferSchema=True)


> **Show Rating Dataset**

In [137]:
rating_df.show()

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| null| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| null|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| null|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

In [138]:
rating_df.printSchema()
print("Count of dataframe:",rating_df.count())

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

Count of dataframe: 3000000


In [139]:
rating_df.describe().show()

+-------+--------------------+--------------------+--------------------+-------------------+-----------+-------------------+------------------+--------------------+--------------------+--------------------+
|summary|                  Id|               Title|               Price|            User_id|profileName| review/helpfulness|      review/score|         review/time|      review/summary|         review/text|
+-------+--------------------+--------------------+--------------------+-------------------+-----------+-------------------+------------------+--------------------+--------------------+--------------------+
|  count|             3000000|             2999792|              482421|            2437750|    2437800|            2999633|           2999870|             2999973|             2999935|             2999957|
|   mean|1.0568515696607149E9|   2012.796651763537|  21.767951161877054|  18.29299003322259|        NaN|3.285048033703448E8| 1656.860421970827|1.1270533345949814E9|        

## Data Cleaning 2

In [142]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum

null_counts = rating_df.agg(*[spark_sum(col(c).isNull().cast("int")).alias(c + '_null_count') for c in rating_df.columns])
null_counts_single_row = null_counts.collect()[0]
print("Number of null values in each column:")
for col_name in rating_df.columns:
    print(f"{col_name}: {null_counts_single_row[col_name + '_null_count']}")

Number of null values in each column:
Id: 0
Title: 208
Price: 2519269
User_id: 562250
profileName: 562200
review/helpfulness: 367
review/score: 18064
review/time: 4732
review/summary: 65
review/text: 43


In [158]:
rating_df= rating_df.dropna(subset=['Title'])
rating_df= rating_df.dropna(subset=['User_id'])


In [144]:
from pyspark.sql.functions import mean

# Calculate mean value for a column
mean_value = rating_df.select(mean(col('Price'))).collect()[0][0]

# Fill missing values with mean
rating_df = rating_df.fillna(mean_value, subset=['Price'])

In [145]:
# Fill null values in 'profileName' column with 'Anonymous'
default_value = "Anonymous"
rating_df = rating_df.fillna(default_value, subset=['profileName'])

In [146]:
# Fill null values in 'review/summary' column with 'No Summary'
default_value = "No Summary"
default_value2 = "No Text"
rating_df = rating_df.fillna(default_value, subset=['review/summary'])
rating_df = rating_df.fillna(default_value, subset=['review/text'])


In [151]:
from pyspark.sql.functions import col, when

# Define the columns to check
cols_to_check = ["review/helpfulness", "review/score", "review/time"]

# Create a dictionary to store mode values for each column
mode_values = {}

# Find the most frequent value for each column
for col_name in cols_to_check:
    mode_value = rating_df.groupBy(col_name).count() \
                         .orderBy(col("count").desc()) \
                         .select(col_name) \
                         .first()[0]
    mode_values[col_name] = mode_value

# Fill null values with the most frequent value for each column
for col_name, mode_value in mode_values.items():
    rating_df = rating_df.withColumn(col_name, when(col(col_name).isNull(), mode_value).otherwise(col(col_name)))

# Check the number of null values after filling
null_counts_after = {col_name: rating_df.where(col(col_name).isNull()).count() for col_name in cols_to_check}

# Show the updated counts of null values
for col_name, null_count in null_counts_after.items():
    print(f"Number of null values in '{col_name}': {null_count}")


Number of null values in 'review/helpfulness': 0
Number of null values in 'review/score': 0
Number of null values in 'review/time': 0


In [159]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
total_rows = rating_df.count()
null_percentages = []
for col_name in rating_df.columns:
    null_count = rating_df.where(col(col_name).isNull()).count()
    null_percentage = (null_count / total_rows) * 100
    null_percentages.append((col_name, null_percentage))

# Print null percentages
for col_name, percentage in null_percentages:
    print(f"Null percentage in column '{col_name}': {percentage:.2f}%")

Null percentage in column 'Id': 0.00%
Null percentage in column 'Title': 0.00%
Null percentage in column 'Price': 0.00%
Null percentage in column 'User_id': 0.00%
Null percentage in column 'profileName': 0.00%
Null percentage in column 'review/helpfulness': 0.00%
Null percentage in column 'review/score': 0.00%
Null percentage in column 'review/time': 0.00%
Null percentage in column 'review/summary': 0.00%
Null percentage in column 'review/text': 0.00%


> **Apply ETL => Transform Step** 

In [140]:
from pyspark.sql.functions import col
# Convert Price column to float
rating_df = rating_df.withColumn("Price", col("Price").cast("float"))
# Convert review/score column to float
rating_df = rating_df.withColumn("review/score", col("review/score").cast("float"))
# Convert review/time column to int
rating_df = rating_df.withColumn("review/time", col("review/time").cast("int"))
# Check the schema to verify the data type conversions
rating_df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: float (nullable = true)
 |-- review/time: integer (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)



In [165]:
rating_df = rating_df.withColumnRenamed('profileName', 'profile_Name') \
                             .withColumnRenamed('review/helpfulness', 'review_helpfulness')\
                             .withColumnRenamed('review/score', 'review_score')\
                             .withColumnRenamed('review/time', 'review_time') \
                             .withColumnRenamed('review/summary', 'review_summary')\
                             .withColumnRenamed('review/text', 'review_text')
rating_df.show()

+----------+--------------------+---------+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|    Price|       User_id|        profile_Name|review_helpfulness|review_score|review_time|      review_summary|         review_text|
+----------+--------------------+---------+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...|21.768013| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...|21.768013|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...|21.768013|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|082

#### Some Insigths From data

In [156]:
from pyspark.sql import functions as F
# Calculate the average age of customers
avg_age_df = rating_df.agg(F.avg("Price").alias("average_Price"))
avg_age_df.show()

+------------------+
|     average_Price|
+------------------+
|21.768013002900528|
+------------------+



In [166]:
# Calculate average review scores per price range
price_range_scores = rating_df.groupBy(F.round("Price", 0).alias("price_range")).agg(
    F.avg("review_score").alias("avg_score")
)

price_range_scores.show()

+-----------+-----------------+
|price_range|        avg_score|
+-----------+-----------------+
|       18.0| 4.30884070058382|
|       64.0|4.086956521739131|
|       82.0|3.846774193548387|
|      107.0|4.153846153846154|
|       47.0|4.208646616541353|
|        9.0|4.222615244180739|
|       58.0|4.098615916955017|
|        5.0|4.255224147169137|
|       39.0|4.094320486815416|
|      132.0|3.395348837209302|
|       30.0|4.159679878048781|
|       17.0|4.318933333333334|
|      105.0|4.149779735682819|
|      163.0|4.681818181818182|
|      117.0|4.323076923076923|
|      190.0|4.352941176470588|
|      187.0|4.478260869565218|
|       90.0|4.264044943820225|
|       26.0|4.242635523792923|
|       41.0|4.053484602917342|
+-----------+-----------------+
only showing top 20 rows



In [167]:
# Count the number of reviews per user
user_review_count = rating_df.groupBy('User_id').count().orderBy('count', ascending=False).show()


+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 5791|
|   AFVQZQ8PW0L| 3606|
|A1D2C0WDCSHUWZ| 3145|
| AHD101501WCN1| 1994|
|A1X8VZWTOG8IS6| 1803|
|A1K1JW1C5CUSUZ| 1455|
|A20EEWWSFMZ1PN| 1387|
|A1S3C5OFU508P3| 1307|
|A1N1YEMTI9DJ86| 1030|
|A2OJW07GQRNJUT|  999|
|A1L43KWWR05PCS|  960|
|A1G37DFO8MQW0M|  932|
|A2NJO6YE954DBH|  913|
|A3M174IC0VXOS2|  912|
|A1EKTLUL24HDG8|  900|
|A3MV1KKHX51FYT|  866|
|A3QVAKVRAH657N|  824|
|A1M8PP7MLHNBQB|  798|
|A319KYEIAZ3SON|  797|
|A1MC6BFHWY6WC3|  780|
+--------------+-----+
only showing top 20 rows

+--------------------+-----+
|  review_helpfulness|count|
+--------------------+-----+
|       #1 Bestse..."|    2|
|    & it will be."""|    7|
| &#34; red book ..."|    7|
| &#34;Attract Mon...|    2|
|     &#34;Author..."|    2|
| &#34;Copywriting...|   12|
|     &#34;Human ..."|    1|
| &#34;If I'm So S...|    3|
| &#34;Infinity's ...|    1|
| &#34;Jesus Died ...|    6|
| &#34;Kaleidoscop...|    1|
| &#34;Land Beyo

In [168]:
# Calculate average review score for different price ranges
price_scores = rating_df.groupBy('Price').avg('review_score').orderBy('Price').show()

+-----+------------------+
|Price| avg(review_score)|
+-----+------------------+
|  1.0| 4.120765832106038|
| 1.09|             4.385|
| 1.19|               3.0|
| 1.25|               4.0|
| 1.29|4.0144927536231885|
| 1.35| 4.082417582417582|
| 1.38|               4.0|
| 1.39|              3.75|
| 1.49|  4.23314606741573|
|  1.5| 4.035460992907802|
| 1.59| 4.215277777777778|
|  1.6| 4.555555555555555|
| 1.69|3.9153094462540716|
|  1.7| 4.666666666666667|
| 1.78|               5.0|
| 1.79| 4.211538461538462|
|  1.8| 4.514285714285714|
| 1.82| 4.205882352941177|
| 1.86|               5.0|
| 1.88|3.7142857142857144|
+-----+------------------+
only showing top 20 rows



In [170]:
from pyspark.sql import functions as F

# Assuming Books_rating_df is your Spark DataFrame
rating_df = rating_df.withColumn("review_date", F.to_date(F.from_unixtime("review_time")))

# Calculate review counts per month
reviews_per_month = rating_df.groupBy(F.month("review_date").alias("month")).count().orderBy("month")

reviews_per_month.show()

+-----+------+
|month| count|
+-----+------+
|    1|315708|
|    2|237857|
|    3|194236|
|    4|170404|
|    5|178797|
|    6|171487|
|    7|180524|
|    8|185697|
|    9|183201|
|   10|187978|
|   11|192820|
|   12|238846|
+-----+------+



In [176]:
from pyspark.sql.functions import year, month, to_timestamp

# Assuming 'review_time' is in seconds since epoch (Unix timestamp)
rating_df = rating_df.withColumn('review_time_timestamp', to_timestamp(rating_df['review_time']))

# Extract year and month from review time
rating_df = rating_df.withColumn('review_year', year(rating_df['review_time_timestamp']))
rating_df = rating_df.withColumn('review_month', month(rating_df['review_time_timestamp']))

# Drop the intermediate timestamp column if no longer needed
rating_df = rating_df.drop('review_time_timestamp')

# Show the updated DataFrame
rating_df.select('review_time', 'review_year', 'review_month').show()



+-----------+-----------+------------+
|review_time|review_year|review_month|
+-----------+-----------+------------+
|  940636800|       1999|          10|
| 1095724800|       2004|           9|
| 1078790400|       2004|           3|
| 1090713600|       2004|           7|
| 1107993600|       2005|           2|
| 1127174400|       2005|           9|
| 1100131200|       2004|          11|
| 1231200000|       2009|           1|
| 1209859200|       2008|           5|
| 1076371200|       2004|           2|
|  991440000|       2001|           6|
| 1291766400|       2010|          12|
| 1248307200|       2009|           7|
| 1222560000|       2008|           9|
| 1117065600|       2005|           5|
| 1119571200|       2005|           6|
| 1119225600|       2005|           6|
| 1115942400|       2005|           5|
| 1117065600|       2005|           5|
| 1130025600|       2005|          10|
+-----------+-----------+------------+
only showing top 20 rows



In [181]:
from pyspark.sql.functions import when

# Create price ranges and categorize items
rating_df = rating_df.withColumn('price_range', when(rating_df['Price'] < 50, 'Low')
                                  .when((rating_df['Price'] >= 50) & (rating_df['Price'] <= 100), 'Medium')
                                  .otherwise('High'))

# Analyze distribution of review scores across price ranges
price_range_scores = rating_df.groupBy('price_range').avg('review_score').orderBy('price_range').show()


+-----------+-----------------+
|price_range|avg(review_score)|
+-----------+-----------------+
|       High|4.133490109282058|
|        Low|2049.405812145332|
|     Medium|4.211609039984547|
+-----------+-----------------+



In [185]:
# Count the number of reviews per user and identify top reviewers
top_reviewers = rating_df.groupBy('User_id').count().orderBy('count', ascending=False).show(10)


+--------------+-----+
|       User_id|count|
+--------------+-----+
|A14OJS0VWMOSWO| 5791|
|   AFVQZQ8PW0L| 3606|
|A1D2C0WDCSHUWZ| 3145|
| AHD101501WCN1| 1994|
|A1X8VZWTOG8IS6| 1803|
|A1K1JW1C5CUSUZ| 1455|
|A20EEWWSFMZ1PN| 1387|
|A1S3C5OFU508P3| 1307|
|A1N1YEMTI9DJ86| 1030|
|A2OJW07GQRNJUT|  999|
+--------------+-----+
only showing top 10 rows



In [201]:
from pyspark.sql.functions import length

# Calculate the length of review text
rating_df = rating_df.withColumn('text_length', length(rating_df['review_text']))

# Analyze the distribution of review text lengths
text_length_stats = rating_df.select('text_length').describe()
text_length_stats.show()


+-------+-----------------+
|summary|      text_length|
+-------+-----------------+
|  count|          2437555|
|   mean|622.3102998701568|
| stddev|660.0410230129934|
|    min|                1|
|    max|            32551|
+-------+-----------------+

